aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/cache/relcache.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r--src/backend/utils/cache/relcache.c98
1 files changed, 68 insertions, 30 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2707fed12f4..fccffce5729 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -66,6 +66,7 @@
#include "catalog/schemapg.h"
#include "catalog/storage.h"
#include "commands/policy.h"
+#include "commands/publicationcmds.h"
#include "commands/trigger.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@@ -2419,8 +2420,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
bms_free(relation->rd_pkattr);
bms_free(relation->rd_idattr);
bms_free(relation->rd_hotblockingattr);
- if (relation->rd_pubactions)
- pfree(relation->rd_pubactions);
+ if (relation->rd_pubdesc)
+ pfree(relation->rd_pubdesc);
if (relation->rd_options)
pfree(relation->rd_options);
if (relation->rd_indextuple)
@@ -5523,38 +5524,57 @@ RelationGetExclusionInfo(Relation indexRelation,
}
/*
- * Get publication actions for the given relation.
+ * Get the publication information for the given relation.
+ *
+ * Traverse all the publications which the relation is in to get the
+ * publication actions and validate the row filter expressions for such
+ * publications if any. We consider the row filter expression as invalid if it
+ * references any column which is not part of REPLICA IDENTITY.
+ *
+ * To avoid fetching the publication information repeatedly, we cache the
+ * publication actions and row filter validation information.
*/
-struct PublicationActions *
-GetRelationPublicationActions(Relation relation)
+void
+RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
{
List *puboids;
ListCell *lc;
MemoryContext oldcxt;
Oid schemaid;
- PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
+ List *ancestors = NIL;
+ Oid relid = RelationGetRelid(relation);
/*
* If not publishable, it publishes no actions. (pgoutput_change() will
* ignore it.)
*/
if (!is_publishable_relation(relation))
- return pubactions;
+ {
+ memset(pubdesc, 0, sizeof(PublicationDesc));
+ pubdesc->rf_valid_for_update = true;
+ pubdesc->rf_valid_for_delete = true;
+ return;
+ }
+
+ if (relation->rd_pubdesc)
+ {
+ memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc));
+ return;
+ }
- if (relation->rd_pubactions)
- return memcpy(pubactions, relation->rd_pubactions,
- sizeof(PublicationActions));
+ memset(pubdesc, 0, sizeof(PublicationDesc));
+ pubdesc->rf_valid_for_update = true;
+ pubdesc->rf_valid_for_delete = true;
/* Fetch the publication membership info. */
- puboids = GetRelationPublications(RelationGetRelid(relation));
+ puboids = GetRelationPublications(relid);
schemaid = RelationGetNamespace(relation);
puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid));
if (relation->rd_rel->relispartition)
{
/* Add publications that the ancestors are in too. */
- List *ancestors = get_partition_ancestors(RelationGetRelid(relation));
- ListCell *lc;
+ ancestors = get_partition_ancestors(relid);
foreach(lc, ancestors)
{
@@ -5582,35 +5602,53 @@ GetRelationPublicationActions(Relation relation)
pubform = (Form_pg_publication) GETSTRUCT(tup);
- pubactions->pubinsert |= pubform->pubinsert;
- pubactions->pubupdate |= pubform->pubupdate;
- pubactions->pubdelete |= pubform->pubdelete;
- pubactions->pubtruncate |= pubform->pubtruncate;
+ pubdesc->pubactions.pubinsert |= pubform->pubinsert;
+ pubdesc->pubactions.pubupdate |= pubform->pubupdate;
+ pubdesc->pubactions.pubdelete |= pubform->pubdelete;
+ pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+
+ /*
+ * Check if all columns referenced in the filter expression are part of
+ * the REPLICA IDENTITY index or not.
+ *
+ * If the publication is FOR ALL TABLES then it means the table has no
+ * row filters and we can skip the validation.
+ */
+ if (!pubform->puballtables &&
+ (pubform->pubupdate || pubform->pubdelete) &&
+ contain_invalid_rfcolumn(pubid, relation, ancestors,
+ pubform->pubviaroot))
+ {
+ if (pubform->pubupdate)
+ pubdesc->rf_valid_for_update = false;
+ if (pubform->pubdelete)
+ pubdesc->rf_valid_for_delete = false;
+ }
ReleaseSysCache(tup);
/*
- * If we know everything is replicated, there is no point to check for
- * other publications.
+ * If we know everything is replicated and the row filter is invalid
+ * for update and delete, there is no point to check for other
+ * publications.
*/
- if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete && pubactions->pubtruncate)
+ if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
+ pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
+ !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
break;
}
- if (relation->rd_pubactions)
+ if (relation->rd_pubdesc)
{
- pfree(relation->rd_pubactions);
- relation->rd_pubactions = NULL;
+ pfree(relation->rd_pubdesc);
+ relation->rd_pubdesc = NULL;
}
- /* Now save copy of the actions in the relcache entry. */
+ /* Now save copy of the descriptor in the relcache entry. */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
- relation->rd_pubactions = palloc(sizeof(PublicationActions));
- memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
+ relation->rd_pubdesc = palloc(sizeof(PublicationDesc));
+ memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc));
MemoryContextSwitchTo(oldcxt);
-
- return pubactions;
}
/*
@@ -6163,7 +6201,7 @@ load_relcache_init_file(bool shared)
rel->rd_pkattr = NULL;
rel->rd_idattr = NULL;
rel->rd_hotblockingattr = NULL;
- rel->rd_pubactions = NULL;
+ rel->rd_pubdesc = NULL;
rel->rd_statvalid = false;
rel->rd_statlist = NIL;
rel->rd_fkeyvalid = false;