aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c33
-rw-r--r--src/backend/utils/misc/guc_tables.c17
2 files changed, 40 insertions, 10 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b567b8b59e2..92204bd9cdf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
int logical_decoding_work_mem;
static const Size max_changes_in_memory = 4096; /* XXX for restore only */
+/* GUC variable */
+int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED;
+
/* ---------------------------------------
* primary reorderbuffer support routines
* ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
/*
* Check whether the logical_decoding_work_mem limit was reached, and if yes
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
- * disk until we reach under the memory limit.
+ * disk or send to the output plugin until we reach under the memory limit.
+ *
+ * If logical_decoding_mode is set to "immediate", stream or serialize the changes
+ * immediately.
*
* XXX At this point we select the transactions until we reach under the memory
* limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
- /* bail out if we haven't exceeded the memory limit */
- if (rb->size < logical_decoding_work_mem * 1024L)
+ /*
+ * Bail out if logical_decoding_mode is buffered and we haven't exceeded
+ * the memory limit.
+ */
+ if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED &&
+ rb->size < logical_decoding_work_mem * 1024L)
return;
/*
- * Loop until we reach under the memory limit. One might think that just
- * by evicting the largest (sub)transaction we will come under the memory
- * limit based on assumption that the selected transaction is at least as
- * large as the most recent change (which caused us to go over the memory
- * limit). However, that is not true because a user can reduce the
- * logical_decoding_work_mem to a smaller value before the most recent
+ * If logical_decoding_mode is immediate, loop until there's no change.
+ * Otherwise, loop until we reach under the memory limit. One might think
+ * that just by evicting the largest (sub)transaction we will come under
+ * the memory limit based on assumption that the selected transaction is
+ * at least as large as the most recent change (which caused us to go over
+ * the memory limit). However, that is not true because a user can reduce
+ * the logical_decoding_work_mem to a smaller value before the most recent
* change.
*/
- while (rb->size >= logical_decoding_work_mem * 1024L)
+ while (rb->size >= logical_decoding_work_mem * 1024L ||
+ (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE &&
+ rb->size > 0))
{
/*
* Pick the largest transaction (or subtransaction) and evict it from
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 436afe1d215..a37c9f98444 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry logical_decoding_mode_options[] = {
+ {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
+ {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
+ {NULL, 0, false}
+};
+
StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
"array length mismatch");
@@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
+ {
+ {"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+ gettext_noop("Allows streaming or serializing each change in logical decoding."),
+ NULL,
+ GUC_NOT_IN_SAMPLE
+ },
+ &logical_decoding_mode,
+ LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL