diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 33 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 17 |
2 files changed, 40 insertions, 10 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b567b8b59e2..92204bd9cdf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* GUC variable */ +int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to - * disk until we reach under the memory limit. + * disk or send to the output plugin until we reach under the memory limit. + * + * If logical_decoding_mode is set to "immediate", stream or serialize the changes + * immediately. * * XXX At this point we select the transactions until we reach under the memory * limit, but we might also adapt a more elaborate eviction strategy - for example @@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Bail out if logical_decoding_mode is buffered and we haven't exceeded + * the memory limit. + */ + if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED && + rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the - * logical_decoding_work_mem to a smaller value before the most recent + * If logical_decoding_mode is immediate, loop until there's no change. + * Otherwise, loop until we reach under the memory limit. One might think + * that just by evicting the largest (sub)transaction we will come under + * the memory limit based on assumption that the selected transaction is + * at least as large as the most recent change (which caused us to go over + * the memory limit). However, that is not true because a user can reduce + * the logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while (rb->size >= logical_decoding_work_mem * 1024L || + (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE && + rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 436afe1d215..a37c9f98444 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = { {NULL, 0, false} }; +static const struct config_enum_entry logical_decoding_mode_options[] = { + {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false}, + {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false}, + {NULL, 0, false} +}; + StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2), "array length mismatch"); @@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Allows streaming or serializing each change in logical decoding."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &logical_decoding_mode, + LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL |