diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 46 |
1 files changed, 37 insertions, 9 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index c8f5c7f8641..e178bd77abc 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,6 +82,13 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; +/* + * Private memory context for publication data, created in + * PGOutputData->context when starting pgoutput, and set to NULL when its + * parent context is reset via a dedicated MemoryContextCallback. + */ +static MemoryContext pubctx = NULL; + static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -386,6 +393,15 @@ parse_output_parameters(List *options, PGOutputData *data) } /* + * Callback of PGOutputData->context in charge of cleaning pubctx. + */ +static void +pgoutput_pubctx_reset_callback(void *arg) +{ + pubctx = NULL; +} + +/* * Initialize this plugin */ static void @@ -394,6 +410,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { PGOutputData *data = palloc0(sizeof(PGOutputData)); static bool publication_callback_registered = false; + MemoryContextCallback *mcallback; /* Create our memory context for private allocations. */ data->context = AllocSetContextCreate(ctx->context, @@ -404,6 +421,15 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, "logical replication cache context", ALLOCSET_DEFAULT_SIZES); + Assert(pubctx == NULL); + pubctx = AllocSetContextCreate(ctx->context, + "logical replication publication list context", + ALLOCSET_SMALL_SIZES); + + mcallback = palloc0(sizeof(MemoryContextCallback)); + mcallback->func = pgoutput_pubctx_reset_callback; + MemoryContextRegisterResetCallback(ctx->context, mcallback); + ctx->output_plugin_private = data; /* This plugin uses binary protocol. */ @@ -1728,9 +1754,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, /* * Shutdown the output plugin. * - * Note, we don't need to clean the data->context and data->cachectx as - * they are child contexts of the ctx->context so they will be cleaned up by - * logical decoding machinery. + * Note, we don't need to clean the data->context, data->cachectx and pubctx + * as they are child contexts of the ctx->context so they will be cleaned up + * by logical decoding machinery. */ static void pgoutput_shutdown(LogicalDecodingContext *ctx) @@ -1740,6 +1766,9 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) hash_destroy(RelationSyncCache); RelationSyncCache = NULL; } + + /* Better safe than sorry */ + pubctx = NULL; } /* @@ -2040,12 +2069,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Reload publications if needed before use. */ if (!publications_valid) { - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - if (data->publications) - { - list_free_deep(data->publications); - data->publications = NIL; - } + Assert(pubctx); + + MemoryContextReset(pubctx); + oldctx = MemoryContextSwitchTo(pubctx); + data->publications = LoadPublications(data->publication_names); MemoryContextSwitchTo(oldctx); publications_valid = true; |