aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c41
1 files changed, 36 insertions, 5 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index df2ea94d468..9fd879ee0c8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -65,6 +65,13 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
+/*
+ * Private memory context for publication data, created in
+ * PGOutputData->context when starting pgoutput, and set to NULL when its
+ * parent context is reset via a dedicated MemoryContextCallback.
+ */
+static MemoryContext pubctx = NULL;
+
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -253,6 +260,15 @@ parse_output_parameters(List *options, PGOutputData *data)
}
/*
+ * Callback of PGOutputData->context in charge of cleaning pubctx.
+ */
+static void
+pgoutput_pubctx_reset_callback(void *arg)
+{
+ pubctx = NULL;
+}
+
+/*
* Initialize this plugin
*/
static void
@@ -261,12 +277,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
{
PGOutputData *data = palloc0(sizeof(PGOutputData));
static bool publication_callback_registered = false;
+ MemoryContextCallback *mcallback;
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
"logical replication output context",
ALLOCSET_DEFAULT_SIZES);
+ Assert(pubctx == NULL);
+ pubctx = AllocSetContextCreate(ctx->context,
+ "logical replication publication list context",
+ ALLOCSET_SMALL_SIZES);
+
+ mcallback = palloc0(sizeof(MemoryContextCallback));
+ mcallback->func = pgoutput_pubctx_reset_callback;
+ MemoryContextRegisterResetCallback(ctx->context, mcallback);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
@@ -786,8 +812,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
/*
* Shutdown the output plugin.
*
- * Note, we don't need to clean the data->context as it's child context
- * of the ctx->context so it will be cleaned up by logical decoding machinery.
+ * Note, we don't need to clean the data->context and pubctx as they are
+ * child contexts of the ctx->context so they will be cleaned up by logical
+ * decoding machinery.
*/
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
@@ -797,6 +824,9 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
}
+
+ /* Better safe than sorry */
+ pubctx = NULL;
}
/*
@@ -1071,9 +1101,10 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
/* Reload publications if needed before use. */
if (!publications_valid)
{
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- if (data->publications)
- list_free_deep(data->publications);
+ Assert(pubctx);
+
+ MemoryContextReset(pubctx);
+ oldctx = MemoryContextSwitchTo(pubctx);
data->publications = LoadPublications(data->publication_names);
MemoryContextSwitchTo(oldctx);