diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/rmgrdesc/standbydesc.c | 2 | ||||
-rw-r--r-- | src/backend/commands/alter.c | 16 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 39 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 8 | ||||
-rw-r--r-- | src/backend/utils/cache/inval.c | 125 |
5 files changed, 184 insertions, 6 deletions
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index d849f8e54ba..81eff5f31c4 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf, appendStringInfo(buf, " relmap db %u", msg->rm.dbId); else if (msg->id == SHAREDINVALSNAPSHOT_ID) appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else if (msg->id == SHAREDINVALRELSYNC_ID) + appendStringInfo(buf, " relsync %u", msg->rs.relid); else appendStringInfo(buf, " unrecognized id %d", msg->id); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 78c1d4e1b84..c801c869c1c 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) InvokeObjectPostAlterHook(classId, objectId, 0); + /* Do post catalog-update tasks */ + if (classId == PublicationRelationId) + { + Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup); + + /* + * Invalidate relsynccache entries. + * + * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a + * publication does not impact the publication status of tables. So, + * we don't need to invalidate relcache to rebuild the rd_pubdesc. + * Instead, we invalidate only the relsyncache. + */ + InvalidatePubRelSyncCache(pub->oid, pub->puballtables); + } + /* Release memory */ pfree(values); pfree(nulls); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 150a768d16f..3091d36ce98 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, return *invalid_column_list || *invalid_gen_col; } +/* + * Invalidate entries in the RelationSyncCache for relations included in the + * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA. + * + * If 'puballtables' is true, invalidate all cache entries. + */ +void +InvalidatePubRelSyncCache(Oid pubid, bool puballtables) +{ + if (puballtables) + { + CacheInvalidateRelSyncAll(); + } + else + { + List *relids = NIL; + List *schemarelids = NIL; + + /* + * For partitioned tables, we must invalidate all partitions and + * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as + * a target. However, WAL records for TRUNCATE specify both a root and + * its leaves. + */ + relids = GetPublicationRelations(pubid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubid, + PUBLICATION_PART_ALL); + + relids = list_concat_unique_oid(relids, schemarelids); + + /* Invalidate the relsyncache */ + foreach_oid(relid, relids) + CacheInvalidateRelSync(relid); + } + + return; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9063af6e1df..ed806c54300 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, CacheRegisterSyscacheCallback(PUBLICATIONOID, publication_invalidation_cb, (Datum) 0); + CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb, + (Datum) 0); publication_callback_registered = true; } @@ -1789,12 +1791,6 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) { publications_valid = false; - - /* - * Also invalidate per-relation cache so that next time the filtering info - * is checked it will be updated with the new publication settings. - */ - rel_sync_cache_publication_cb(arg, cacheid, hashvalue); } /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 700ccb6df9b..4eb67720737 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -271,6 +271,7 @@ int debug_discard_caches = 0; #define MAX_SYSCACHE_CALLBACKS 64 #define MAX_RELCACHE_CALLBACKS 10 +#define MAX_RELSYNC_CALLBACKS 10 static struct SYSCACHECALLBACK { @@ -292,6 +293,15 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static struct RELSYNCCALLBACK +{ + RelSyncCallbackFunction function; + Datum arg; +} relsync_callback_list[MAX_RELSYNC_CALLBACKS]; + +static int relsync_callback_count = 0; + + /* ---------------------------------------------------------------- * Invalidation subgroup support functions * ---------------------------------------------------------------- @@ -485,6 +495,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group, } /* + * Add a relsync inval entry + * + * We put these into the relcache subgroup for simplicity. This message is the + * same as AddRelcacheInvalidationMessage() except that it is for + * RelationSyncCache maintained by decoding plugin pgoutput. + */ +static void +AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group, + Oid dbId, Oid relId) +{ + SharedInvalidationMessage msg; + + /* Don't add a duplicate item. */ + ProcessMessageSubGroup(group, RelCacheMsgs, + if (msg->rc.id == SHAREDINVALRELSYNC_ID && + (msg->rc.relId == relId || + msg->rc.relId == InvalidOid)) + return); + + /* OK, add the item */ + msg.rc.id = SHAREDINVALRELSYNC_ID; + msg.rc.dbId = dbId; + msg.rc.relId = relId; + /* check AddCatcacheInvalidationMessage() for an explanation */ + VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); + + AddInvalidationMessage(group, RelCacheMsgs, &msg); +} + +/* * Add a snapshot inval entry * * We put these into the relcache subgroup for simplicity. @@ -612,6 +652,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) } /* + * RegisterRelsyncInvalidation + * + * As above, but register a relsynccache invalidation event. + */ +static void +RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) +{ + AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId); +} + +/* * RegisterSnapshotInvalidation * * Register an invalidation event for MVCC scans against a given catalog. @@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard) ccitem->function(ccitem->arg, InvalidOid); } + + for (i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, InvalidOid); + } } /* @@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) else if (msg->sn.dbId == MyDatabaseId) InvalidateCatalogSnapshot(); } + else if (msg->id == SHAREDINVALRELSYNC_ID) + { + /* We only care about our own database */ + if (msg->rs.dbId == MyDatabaseId) + CallRelSyncCallbacks(msg->rs.relid); + } else elog(FATAL, "unrecognized SI message ID: %d", msg->id); } @@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid) ReleaseSysCache(tup); } +/* + * CacheInvalidateRelSync + * Register invalidation of the cache in logical decoding output plugin + * for a database. + * + * This type of invalidation message is used for the specific purpose of output + * plugins. Processes which do not decode WALs would do nothing even when it + * receives the message. + */ +void +CacheInvalidateRelSync(Oid relid) +{ + RegisterRelsyncInvalidation(PrepareInvalidationState(), + MyDatabaseId, relid); +} + +/* + * CacheInvalidateRelSyncAll + * Register invalidation of the whole cache in logical decoding output + * plugin. + */ +void +CacheInvalidateRelSyncAll(void) +{ + CacheInvalidateRelSync(InvalidOid); +} /* * CacheInvalidateSmgr @@ -1764,6 +1854,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, } /* + * CacheRegisterRelSyncCallback + * Register the specified function to be called for all future + * relsynccache invalidation events. + * + * This function is intended to be call from the logical decoding output + * plugins. + */ +void +CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg) +{ + if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS) + elog(FATAL, "out of relsync_callback_list slots"); + + relsync_callback_list[relsync_callback_count].function = func; + relsync_callback_list[relsync_callback_count].arg = arg; + + ++relsync_callback_count; +} + +/* * CallSyscacheCallbacks * * This is exported so that CatalogCacheFlushCatalog can call it, saving @@ -1789,6 +1900,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) } /* + * CallSyscacheCallbacks + */ +void +CallRelSyncCallbacks(Oid relid) +{ + for (int i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, relid); + } +} + +/* * LogLogicalInvalidations * * Emit WAL for invalidations caused by the current command. |