diff options
-rw-r--r-- | contrib/test_decoding/expected/decoding_into_rel.out | 2 | ||||
-rw-r--r-- | contrib/test_decoding/sql/decoding_into_rel.sql | 4 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 15 | ||||
-rw-r--r-- | src/include/storage/proc.h | 2 |
4 files changed, 19 insertions, 4 deletions
diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out index 2671258f5d9..be759caa31d 100644 --- a/contrib/test_decoding/expected/decoding_into_rel.out +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -1,6 +1,8 @@ -- test that we can insert the result of a get_changes call into a -- logged relation. That's really not a good idea in practical terms, -- but provides a nice test. +-- predictability +SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); ?column? ---------- diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql index 3704821bcc3..54670fd39e7 100644 --- a/contrib/test_decoding/sql/decoding_into_rel.sql +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -1,6 +1,10 @@ -- test that we can insert the result of a get_changes call into a -- logged relation. That's really not a good idea in practical terms, -- but provides a nice test. + +-- predictability +SET synchronous_commit = on; + SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -- slot works diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8c318cd4b51..8fd66356267 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -146,10 +146,19 @@ StartupDecodingContext(List *output_plugin_options, * logical decoding backend which doesn't need to be checked individually * when computing the xmin horizon because the xmin is enforced via * replication slots. + * + * We can only do so if we're outside of a transaction (i.e. the case when + * streaming changes via walsender), otherwise a already setup + * snapshot/xid would end up being ignored. That's not a particularly + * bothersome restriction since the SQL interface can't be used for + * streaming anyway. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING; - LWLockRelease(ProcArrayLock); + if (!IsTransactionOrTransactionBlock()) + { + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING; + LWLockRelease(ProcArrayLock); + } ctx->slot = slot; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c23f4da5b60..4ad4164927e 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -43,7 +43,7 @@ struct XidCache #define PROC_IN_ANALYZE 0x04 /* currently running analyze */ #define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical - * decoding */ + * decoding outside xact */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \ |