diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/execReplication.c | 112 | ||||
-rw-r--r-- | src/backend/replication/logical/relation.c | 212 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 | ||||
-rw-r--r-- | src/include/replication/logicalrelation.h | 4 | ||||
-rw-r--r-- | src/test/subscription/meson.build | 1 |
5 files changed, 317 insertions, 68 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 4f5083a598a..fa8628e3e1b 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -25,6 +25,7 @@ #include "nodes/nodeFuncs.h" #include "parser/parse_relation.h" #include "parser/parsetree.h" +#include "replication/logicalrelation.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -37,49 +38,63 @@ #include "utils/typcache.h" +static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, + TypeCacheEntry **eq); + /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that * is setup to match 'rel' (*NOT* idxrel!). * - * Returns whether any column contains NULLs. + * Returns how many columns to use for the index scan. + * + * This is not generic routine, it expects the idxrel to be a btree, non-partial + * and have at least one column reference (i.e. cannot consist of only + * expressions). * - * This is not generic routine, it expects the idxrel to be replication - * identity of a rel and meet all limitations associated with that. + * By definition, replication identity of a rel meets all limitations associated + * with that. Note that any other index could also meet these limitations. */ -static bool +static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot) { - int attoff; + int index_attoff; + int skey_attoff = 0; bool isnull; Datum indclassDatum; oidvector *opclass; int2vector *indkey = &idxrel->rd_index->indkey; - bool hasnulls = false; - - Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) || - RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel)); indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull); Assert(!isnull); opclass = (oidvector *) DatumGetPointer(indclassDatum); - /* Build scankey for every attribute in the index. */ - for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) + /* Build scankey for every non-expression attribute in the index. */ + for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); + index_attoff++) { Oid operator; + Oid optype; Oid opfamily; RegProcedure regop; - int pkattno = attoff + 1; - int mainattno = indkey->values[attoff]; - Oid optype = get_opclass_input_type(opclass->values[attoff]); + int table_attno = indkey->values[index_attoff]; + + if (!AttributeNumberIsValid(table_attno)) + { + /* + * XXX: Currently, we don't support expressions in the scan key, + * see code below. + */ + continue; + } /* * Load the operator info. We need this to get the equality operator * function for the scan key. */ - opfamily = get_opclass_family(opclass->values[attoff]); + optype = get_opclass_input_type(opclass->values[index_attoff]); + opfamily = get_opclass_family(opclass->values[index_attoff]); operator = get_opfamily_member(opfamily, optype, optype, @@ -91,23 +106,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, regop = get_opcode(operator); /* Initialize the scankey. */ - ScanKeyInit(&skey[attoff], - pkattno, + ScanKeyInit(&skey[skey_attoff], + index_attoff + 1, BTEqualStrategyNumber, regop, - searchslot->tts_values[mainattno - 1]); + searchslot->tts_values[table_attno - 1]); - skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; + skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff]; /* Check for null value. */ - if (searchslot->tts_isnull[mainattno - 1]) - { - hasnulls = true; - skey[attoff].sk_flags |= SK_ISNULL; - } + if (searchslot->tts_isnull[table_attno - 1]) + skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL); + + skey_attoff++; } - return hasnulls; + /* There must always be at least one attribute for the index scan. */ + Assert(skey_attoff > 0); + + return skey_attoff; } /* @@ -123,33 +140,58 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot) { ScanKeyData skey[INDEX_MAX_KEYS]; + int skey_attoff; IndexScanDesc scan; SnapshotData snap; TransactionId xwait; Relation idxrel; bool found; + TypeCacheEntry **eq = NULL; + bool isIdxSafeToSkipDuplicates; /* Open the index. */ idxrel = index_open(idxoid, RowExclusiveLock); - /* Start an index scan. */ + isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); + InitDirtySnapshot(snap); - scan = index_beginscan(rel, idxrel, &snap, - IndexRelationGetNumberOfKeyAttributes(idxrel), - 0); /* Build scan key. */ - build_replindex_scan_key(skey, rel, idxrel, searchslot); + skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + + /* Start an index scan. */ + scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0); retry: found = false; - index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0); + index_rescan(scan, skey, skey_attoff, NULL, 0); /* Try to find the tuple */ - if (index_getnext_slot(scan, ForwardScanDirection, outslot)) + while (index_getnext_slot(scan, ForwardScanDirection, outslot)) { - found = true; + /* + * Avoid expensive equality check if the index is primary key or + * replica identity index. + */ + if (!isIdxSafeToSkipDuplicates) + { + if (eq == NULL) + { +#ifdef USE_ASSERT_CHECKING + /* apply assertions only once for the input idxoid */ + IndexInfo *indexInfo = BuildIndexInfo(idxrel); + + Assert(IsIndexUsableForReplicaIdentityFull(indexInfo)); +#endif + + eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); + } + + if (!tuples_equal(outslot, searchslot, eq)) + continue; + } + ExecMaterializeSlot(outslot); xwait = TransactionIdIsValid(snap.xmin) ? @@ -164,6 +206,10 @@ retry: XactLockTableWait(xwait, NULL, NULL, XLTW_None); goto retry; } + + /* Found our tuple and it's not locked */ + found = true; + break; } /* Found tuple, try to lock it in the lockmode. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 55bfa078711..57ad22b48a1 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -17,8 +17,10 @@ #include "postgres.h" +#include "access/genam.h" #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_am_d.h" #include "catalog/pg_subscription_rel.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -50,6 +52,9 @@ typedef struct LogicalRepPartMapEntry LogicalRepRelMapEntry relmapentry; } LogicalRepPartMapEntry; +static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, + AttrMap *attrMap); + /* * Relcache invalidation callback for our relation map cache. */ @@ -439,6 +444,15 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) */ logicalrep_rel_mark_updatable(entry); + /* + * Finding a usable index is an infrequent task. It occurs when an + * operation is first performed on the relation, or after invalidation + * of the relation cache entry (such as ANALYZE or CREATE/DROP index + * on the relation). + */ + entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel, + entry->attrmap); + entry->localrelvalid = true; } @@ -697,10 +711,204 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, /* Set if the table's replica identity is enough to apply update/delete. */ logicalrep_rel_mark_updatable(entry); - entry->localrelvalid = true; - /* state and statelsn are left set to 0. */ MemoryContextSwitchTo(oldctx); + /* + * Finding a usable index is an infrequent task. It occurs when an + * operation is first performed on the relation, or after invalidation of + * the relation cache entry (such as ANALYZE or CREATE/DROP index on the + * relation). + * + * We also prefer to run this code on the oldctx so that we do not leak + * anything in the LogicalRepPartMapContext (hence CacheMemoryContext). + */ + entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, + entry->attrmap); + + entry->localrelvalid = true; + return entry; } + +/* + * Returns true if the given index consists only of expressions such as: + * CREATE INDEX idx ON table(foo(col)); + * + * Returns false even if there is one column reference: + * CREATE INDEX idx ON table(foo(col), col_2); + */ +static bool +IsIndexOnlyOnExpression(IndexInfo *indexInfo) +{ + for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++) + { + AttrNumber attnum = indexInfo->ii_IndexAttrNumbers[i]; + + if (AttributeNumberIsValid(attnum)) + return false; + } + + return true; +} + +/* + * Returns true if the attrmap contains the leftmost column of the index. + * Otherwise returns false. + * + * attrmap is a map of local attributes to remote ones. We can consult this + * map to check whether the local index attribute has a corresponding remote + * attribute. + */ +static bool +RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap) +{ + AttrNumber keycol; + + Assert(indexInfo->ii_NumIndexAttrs >= 1); + + keycol = indexInfo->ii_IndexAttrNumbers[0]; + if (!AttributeNumberIsValid(keycol)) + return false; + + if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol)) + return false; + + return attrmap->attnums[AttrNumberGetAttrOffset(keycol)] >= 0; +} + +/* + * Returns the oid of an index that can be used by the apply worker to scan + * the relation. The index must be btree, non-partial, and have at least + * one column reference (i.e. cannot consist of only expressions). These + * limitations help to keep the index scan similar to PK/RI index scans. + * + * Note that the limitations of index scans for replica identity full only + * adheres to a subset of the limitations of PK/RI. For example, we support + * columns that are marked as [NULL] or we are not interested in the [NOT + * DEFERRABLE] aspect of constraints here. It works for us because we always + * compare the tuples for non-PK/RI index scans. See + * RelationFindReplTupleByIndex(). + * + * XXX: There are no fundamental problems for supporting non-btree indexes. + * We mostly need to relax the limitations in RelationFindReplTupleByIndex(). + * For partial indexes, the required changes are likely to be larger. If + * none of the tuples satisfy the expression for the index scan, we fall-back + * to sequential execution, which might not be a good idea in some cases. + * + * We also skip indexes if the remote relation does not contain the leftmost + * column of the index. This is because in most such cases sequential scan is + * favorable over index scan. + * + * We expect to call this function when REPLICA IDENTITY FULL is defined for + * the remote relation. + * + * If no suitable index is found, returns InvalidOid. + */ +static Oid +FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap) +{ + List *idxlist = RelationGetIndexList(localrel); + ListCell *lc; + + foreach(lc, idxlist) + { + Oid idxoid = lfirst_oid(lc); + bool isUsableIdx; + bool containsLeftMostCol; + Relation idxRel; + IndexInfo *idxInfo; + + idxRel = index_open(idxoid, AccessShareLock); + idxInfo = BuildIndexInfo(idxRel); + isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo); + containsLeftMostCol = + RemoteRelContainsLeftMostColumnOnIdx(idxInfo, attrmap); + index_close(idxRel, AccessShareLock); + + /* Return the first eligible index found */ + if (isUsableIdx && containsLeftMostCol) + return idxoid; + } + + return InvalidOid; +} + +/* + * Returns true if the index is usable for replica identity full. For details, + * see FindUsableIndexForReplicaIdentityFull. + */ +bool +IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo) +{ + bool is_btree = (indexInfo->ii_Am == BTREE_AM_OID); + bool is_partial = (indexInfo->ii_Predicate != NIL); + bool is_only_on_expression = IsIndexOnlyOnExpression(indexInfo); + + return is_btree && !is_partial && !is_only_on_expression; +} + +/* + * Get replica identity index or if it is not defined a primary key. + * + * If neither is defined, returns InvalidOid + */ +Oid +GetRelationIdentityOrPK(Relation rel) +{ + Oid idxoid; + + idxoid = RelationGetReplicaIndex(rel); + + if (!OidIsValid(idxoid)) + idxoid = RelationGetPrimaryKeyIndex(rel); + + return idxoid; +} + +/* + * Returns the index oid if we can use an index for subscriber. Otherwise, + * returns InvalidOid. + */ +static Oid +FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, + AttrMap *attrMap) +{ + Oid idxoid; + + /* + * We never need index oid for partitioned tables, always rely on leaf + * partition's index. + */ + if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return InvalidOid; + + /* + * Simple case, we already have a primary key or a replica identity index. + */ + idxoid = GetRelationIdentityOrPK(localrel); + if (OidIsValid(idxoid)) + return idxoid; + + if (remoterel->replident == REPLICA_IDENTITY_FULL) + { + /* + * We are looking for one more opportunity for using an index. If + * there are any indexes defined on the local relation, try to pick a + * suitable index. + * + * The index selection safely assumes that all the columns are going + * to be available for the index scan given that remote relation has + * replica identity full. + * + * Note that we are not using the planner to find the cheapest method + * to scan the relation as that would require us to either use lower + * level planner functions which would be a maintenance burden in the + * long run or use the full-fledged planner which could cause + * overhead. + */ + return FindUsableIndexForReplicaIdentityFull(localrel, attrMap); + } + + return InvalidOid; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c7d1734a174..10f97119727 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -400,12 +400,15 @@ static void apply_handle_insert_internal(ApplyExecutionData *edata, static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup); + LogicalRepTupleData *newtup, + Oid localindexoid); static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot); + TupleTableSlot *remoteslot, + Oid localindexoid); static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, + Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot); static void apply_handle_tuple_routing(ApplyExecutionData *edata, @@ -2351,24 +2354,6 @@ apply_handle_type(StringInfo s) } /* - * Get replica identity index or if it is not defined a primary key. - * - * If neither is defined, returns InvalidOid - */ -static Oid -GetRelationIdentityOrPK(Relation rel) -{ - Oid idxoid; - - idxoid = RelationGetReplicaIndex(rel); - - if (!OidIsValid(idxoid)) - idxoid = RelationGetPrimaryKeyIndex(rel); - - return idxoid; -} - -/* * Check that we (the subscription owner) have sufficient privileges on the * target relation to perform the given operation. */ @@ -2627,7 +2612,7 @@ apply_handle_update(StringInfo s) remoteslot, &newtup, CMD_UPDATE); else apply_handle_update_internal(edata, edata->targetRelInfo, - remoteslot, &newtup); + remoteslot, &newtup, rel->localindexoid); finish_edata(edata); @@ -2648,7 +2633,8 @@ static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup) + LogicalRepTupleData *newtup, + Oid localindexoid) { EState *estate = edata->estate; LogicalRepRelMapEntry *relmapentry = edata->targetRel; @@ -2663,6 +2649,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, found = FindReplTupleInLocalRel(estate, localrel, &relmapentry->remoterel, + localindexoid, remoteslot, &localslot); ExecClearTuple(remoteslot); @@ -2767,7 +2754,7 @@ apply_handle_delete(StringInfo s) remoteslot, NULL, CMD_DELETE); else apply_handle_delete_internal(edata, edata->targetRelInfo, - remoteslot); + remoteslot, rel->localindexoid); finish_edata(edata); @@ -2787,7 +2774,8 @@ apply_handle_delete(StringInfo s) static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, - TupleTableSlot *remoteslot) + TupleTableSlot *remoteslot, + Oid localindexoid) { EState *estate = edata->estate; Relation localrel = relinfo->ri_RelationDesc; @@ -2799,7 +2787,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); ExecOpenIndices(relinfo, false); - found = FindReplTupleInLocalRel(estate, localrel, remoterel, + found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid, remoteslot, &localslot); /* If found delete it. */ @@ -2833,17 +2821,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * Try to find a tuple received from the publication side (in 'remoteslot') in * the corresponding local relation using either replica identity index, - * primary key or if needed, sequential scan. + * primary key, index or if needed, sequential scan. * * Local tuple, if found, is returned in '*localslot'. */ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, + Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot) { - Oid idxoid; bool found; /* @@ -2854,12 +2842,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, *localslot = table_slot_create(localrel, &estate->es_tupleTable); - idxoid = GetRelationIdentityOrPK(localrel); - Assert(OidIsValid(idxoid) || + Assert(OidIsValid(localidxoid) || (remoterel->replident == REPLICA_IDENTITY_FULL)); - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(localrel, idxoid, + if (OidIsValid(localidxoid)) + found = RelationFindReplTupleByIndex(localrel, localidxoid, LockTupleExclusive, remoteslot, *localslot); else @@ -2960,7 +2947,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, case CMD_DELETE: apply_handle_delete_internal(edata, partrelinfo, - remoteslot_part); + remoteslot_part, + part_entry->localindexoid); break; case CMD_UPDATE: @@ -2980,6 +2968,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(estate, partrel, &part_entry->remoterel, + part_entry->localindexoid, remoteslot_part, &localslot); if (!found) { @@ -3076,7 +3065,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* DELETE old tuple found in the old partition. */ apply_handle_delete_internal(edata, partrelinfo, - localslot); + localslot, + part_entry->localindexoid); /* INSERT new tuple into the new partition. */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 9c34054bb78..921b9974db7 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -13,6 +13,7 @@ #define LOGICALRELATION_H #include "access/attmap.h" +#include "catalog/index.h" #include "replication/logicalproto.h" typedef struct LogicalRepRelMapEntry @@ -31,6 +32,7 @@ typedef struct LogicalRepRelMapEntry Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ + Oid localindexoid; /* which index to use, or InvalidOid if none */ /* Sync state. */ char state; @@ -46,5 +48,7 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r Relation partrel, AttrMap *map); extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); +extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo); +extern Oid GetRelationIdentityOrPK(Relation rel); #endif /* LOGICALRELATION_H */ diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 3db0fdfd96b..f85bf92b6f2 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -38,6 +38,7 @@ tests += { 't/029_on_error.pl', 't/030_origin.pl', 't/031_column_list.pl', + 't/032_subscribe_use_index.pl', 't/100_bugs.pl', ], }, |