aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/rmgrdesc/standbydesc.c2
-rw-r--r--src/backend/commands/alter.c16
-rw-r--r--src/backend/commands/publicationcmds.c39
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c8
-rw-r--r--src/backend/utils/cache/inval.c125
5 files changed, 184 insertions, 6 deletions
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54ba..81eff5f31c4 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+ else if (msg->id == SHAREDINVALRELSYNC_ID)
+ appendStringInfo(buf, " relsync %u", msg->rs.relid);
else
appendStringInfo(buf, " unrecognized id %d", msg->id);
}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b84..c801c869c1c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
InvokeObjectPostAlterHook(classId, objectId, 0);
+ /* Do post catalog-update tasks */
+ if (classId == PublicationRelationId)
+ {
+ Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+ /*
+ * Invalidate relsynccache entries.
+ *
+ * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+ * publication does not impact the publication status of tables. So,
+ * we don't need to invalidate relcache to rebuild the rd_pubdesc.
+ * Instead, we invalidate only the relsyncache.
+ */
+ InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
+ }
+
/* Release memory */
pfree(values);
pfree(nulls);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16f..3091d36ce98 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
return *invalid_column_list || *invalid_gen_col;
}
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
+{
+ if (puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List *relids = NIL;
+ List *schemarelids = NIL;
+
+ /*
+ * For partitioned tables, we must invalidate all partitions and
+ * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
+ * a target. However, WAL records for TRUNCATE specify both a root and
+ * its leaves.
+ */
+ relids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(pubid,
+ PUBLICATION_PART_ALL);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ /* Invalidate the relsyncache */
+ foreach_oid(relid, relids)
+ CacheInvalidateRelSync(relid);
+ }
+
+ return;
+}
+
/* check_functions_in_node callback */
static bool
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..ed806c54300 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
CacheRegisterSyscacheCallback(PUBLICATIONOID,
publication_invalidation_cb,
(Datum) 0);
+ CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+ (Datum) 0);
publication_callback_registered = true;
}
@@ -1789,12 +1791,6 @@ static void
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
{
publications_valid = false;
-
- /*
- * Also invalidate per-relation cache so that next time the filtering info
- * is checked it will be updated with the new publication settings.
- */
- rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
/*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9b..4eb67720737 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int debug_discard_caches = 0;
#define MAX_SYSCACHE_CALLBACKS 64
#define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
static struct SYSCACHECALLBACK
{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0;
+static struct RELSYNCCALLBACK
+{
+ RelSyncCallbackFunction function;
+ Datum arg;
+} relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int relsync_callback_count = 0;
+
+
/* ----------------------------------------------------------------
* Invalidation subgroup support functions
* ----------------------------------------------------------------
@@ -485,6 +495,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
}
/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+ Oid dbId, Oid relId)
+{
+ SharedInvalidationMessage msg;
+
+ /* Don't add a duplicate item. */
+ ProcessMessageSubGroup(group, RelCacheMsgs,
+ if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+ (msg->rc.relId == relId ||
+ msg->rc.relId == InvalidOid))
+ return);
+
+ /* OK, add the item */
+ msg.rc.id = SHAREDINVALRELSYNC_ID;
+ msg.rc.dbId = dbId;
+ msg.rc.relId = relId;
+ /* check AddCatcacheInvalidationMessage() for an explanation */
+ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+ AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
+/*
* Add a snapshot inval entry
*
* We put these into the relcache subgroup for simplicity.
@@ -612,6 +652,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
}
/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+ AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+/*
* RegisterSnapshotInvalidation
*
* Register an invalidation event for MVCC scans against a given catalog.
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
ccitem->function(ccitem->arg, InvalidOid);
}
+
+ for (i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, InvalidOid);
+ }
}
/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
else if (msg->sn.dbId == MyDatabaseId)
InvalidateCatalogSnapshot();
}
+ else if (msg->id == SHAREDINVALRELSYNC_ID)
+ {
+ /* We only care about our own database */
+ if (msg->rs.dbId == MyDatabaseId)
+ CallRelSyncCallbacks(msg->rs.relid);
+ }
else
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
ReleaseSysCache(tup);
}
+/*
+ * CacheInvalidateRelSync
+ * Register invalidation of the cache in logical decoding output plugin
+ * for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+ RegisterRelsyncInvalidation(PrepareInvalidationState(),
+ MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ * Register invalidation of the whole cache in logical decoding output
+ * plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+ CacheInvalidateRelSync(InvalidOid);
+}
/*
* CacheInvalidateSmgr
@@ -1764,6 +1854,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
}
/*
+ * CacheRegisterRelSyncCallback
+ * Register the specified function to be called for all future
+ * relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+ Datum arg)
+{
+ if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+ elog(FATAL, "out of relsync_callback_list slots");
+
+ relsync_callback_list[relsync_callback_count].function = func;
+ relsync_callback_list[relsync_callback_count].arg = arg;
+
+ ++relsync_callback_count;
+}
+
+/*
* CallSyscacheCallbacks
*
* This is exported so that CatalogCacheFlushCatalog can call it, saving
@@ -1789,6 +1900,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
}
/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+ for (int i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, relid);
+ }
+}
+
+/*
* LogLogicalInvalidations
*
* Emit WAL for invalidations caused by the current command.