aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execReplication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r--src/backend/executor/execReplication.c112
1 files changed, 79 insertions, 33 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 4f5083a598a..fa8628e3e1b 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -25,6 +25,7 @@
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
+#include "replication/logicalrelation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@@ -37,49 +38,63 @@
#include "utils/typcache.h"
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+ TypeCacheEntry **eq);
+
/*
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
* is setup to match 'rel' (*NOT* idxrel!).
*
- * Returns whether any column contains NULLs.
+ * Returns how many columns to use for the index scan.
+ *
+ * This is not generic routine, it expects the idxrel to be a btree, non-partial
+ * and have at least one column reference (i.e. cannot consist of only
+ * expressions).
*
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * By definition, replication identity of a rel meets all limitations associated
+ * with that. Note that any other index could also meet these limitations.
*/
-static bool
+static int
build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
TupleTableSlot *searchslot)
{
- int attoff;
+ int index_attoff;
+ int skey_attoff = 0;
bool isnull;
Datum indclassDatum;
oidvector *opclass;
int2vector *indkey = &idxrel->rd_index->indkey;
- bool hasnulls = false;
-
- Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
- RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
opclass = (oidvector *) DatumGetPointer(indclassDatum);
- /* Build scankey for every attribute in the index. */
- for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+ /* Build scankey for every non-expression attribute in the index. */
+ for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+ index_attoff++)
{
Oid operator;
+ Oid optype;
Oid opfamily;
RegProcedure regop;
- int pkattno = attoff + 1;
- int mainattno = indkey->values[attoff];
- Oid optype = get_opclass_input_type(opclass->values[attoff]);
+ int table_attno = indkey->values[index_attoff];
+
+ if (!AttributeNumberIsValid(table_attno))
+ {
+ /*
+ * XXX: Currently, we don't support expressions in the scan key,
+ * see code below.
+ */
+ continue;
+ }
/*
* Load the operator info. We need this to get the equality operator
* function for the scan key.
*/
- opfamily = get_opclass_family(opclass->values[attoff]);
+ optype = get_opclass_input_type(opclass->values[index_attoff]);
+ opfamily = get_opclass_family(opclass->values[index_attoff]);
operator = get_opfamily_member(opfamily, optype,
optype,
@@ -91,23 +106,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
regop = get_opcode(operator);
/* Initialize the scankey. */
- ScanKeyInit(&skey[attoff],
- pkattno,
+ ScanKeyInit(&skey[skey_attoff],
+ index_attoff + 1,
BTEqualStrategyNumber,
regop,
- searchslot->tts_values[mainattno - 1]);
+ searchslot->tts_values[table_attno - 1]);
- skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+ skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
/* Check for null value. */
- if (searchslot->tts_isnull[mainattno - 1])
- {
- hasnulls = true;
- skey[attoff].sk_flags |= SK_ISNULL;
- }
+ if (searchslot->tts_isnull[table_attno - 1])
+ skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+ skey_attoff++;
}
- return hasnulls;
+ /* There must always be at least one attribute for the index scan. */
+ Assert(skey_attoff > 0);
+
+ return skey_attoff;
}
/*
@@ -123,33 +140,58 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
TupleTableSlot *outslot)
{
ScanKeyData skey[INDEX_MAX_KEYS];
+ int skey_attoff;
IndexScanDesc scan;
SnapshotData snap;
TransactionId xwait;
Relation idxrel;
bool found;
+ TypeCacheEntry **eq = NULL;
+ bool isIdxSafeToSkipDuplicates;
/* Open the index. */
idxrel = index_open(idxoid, RowExclusiveLock);
- /* Start an index scan. */
+ isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+
InitDirtySnapshot(snap);
- scan = index_beginscan(rel, idxrel, &snap,
- IndexRelationGetNumberOfKeyAttributes(idxrel),
- 0);
/* Build scan key. */
- build_replindex_scan_key(skey, rel, idxrel, searchslot);
+ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+ /* Start an index scan. */
+ scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
retry:
found = false;
- index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+ index_rescan(scan, skey, skey_attoff, NULL, 0);
/* Try to find the tuple */
- if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+ while (index_getnext_slot(scan, ForwardScanDirection, outslot))
{
- found = true;
+ /*
+ * Avoid expensive equality check if the index is primary key or
+ * replica identity index.
+ */
+ if (!isIdxSafeToSkipDuplicates)
+ {
+ if (eq == NULL)
+ {
+#ifdef USE_ASSERT_CHECKING
+ /* apply assertions only once for the input idxoid */
+ IndexInfo *indexInfo = BuildIndexInfo(idxrel);
+
+ Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
+#endif
+
+ eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+ }
+
+ if (!tuples_equal(outslot, searchslot, eq))
+ continue;
+ }
+
ExecMaterializeSlot(outslot);
xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +206,10 @@ retry:
XactLockTableWait(xwait, NULL, NULL, XLTW_None);
goto retry;
}
+
+ /* Found our tuple and it's not locked */
+ found = true;
+ break;
}
/* Found tuple, try to lock it in the lockmode. */