diff options
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r-- | src/backend/executor/execReplication.c | 251 |
1 files changed, 247 insertions, 4 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f262e7a66f7..68184f5d671 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -36,7 +38,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +223,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +279,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -306,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, continue; /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + + /* * If one value is NULL and other is not, then they are certainly not * equal */ @@ -380,7 +393,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -456,6 +469,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex) } /* + * If the tuple is recently dead and was deleted by a transaction with a newer + * commit timestamp than previously recorded, update the associated transaction + * ID, commit time, and origin. This helps ensure that conflict detection uses + * the most recent and relevant deletion metadata. + */ +static void +update_most_recent_deletion_info(TupleTableSlot *scanslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + TimestampTz *delete_time, + RepOriginId *delete_origin) +{ + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + bool recently_dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + hslot = (BufferHeapTupleTableSlot *) scanslot; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates either + * tuples whose inserting transaction was aborted (meaning there is no + * commit timestamp or origin), or tuples deleted by a transaction older + * than oldestxmin, making it safe to ignore them during conflict + * detection (See comments atop worker.c for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + recently_dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!recently_dead) + return; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + if (!TransactionIdIsValid(xmax)) + return; + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + TimestampDifferenceExceeds(*delete_time, localts, 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } +} + +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions + * with IDs >= 'oldestxmin' are considered recently dead and are eligible for + * conflict detection. + * + * Instead of stopping at the first match, we scan all matching dead tuples to + * identify most recent deletion. This is crucial because only the latest + * deletion is relevant for resolving conflicts. + * + * For example, consider a scenario on the subscriber where a row is deleted, + * re-inserted, and then deleted again only on the subscriber: + * + * - (pk, 1) - deleted at 9:00, + * - (pk, 1) - deleted at 9:02, + * + * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01. + * + * If we mistakenly return the older deletion (9:00), the system may wrongly + * apply the remote update using a last-update-wins strategy. Instead, we must + * recognize the more recent deletion at 9:02 and skip the update. See + * comments atop worker.c for details. Note, as of now, conflict resolution + * is not implemented. Consequently, the system may incorrectly report the + * older tuple as the conflicted one, leading to misleading results. + * + * The commit timestamp of the deleting transaction is used to determine which + * tuple was deleted most recently. + */ +bool +RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* + * If the relation has a replica identity key or a primary key that is + * unusable for locating deleted tuples (see + * IsIndexUsableForFindingDeletedTuple), a full table scan becomes + * necessary. In such cases, comparing the entire tuple is not required, + * since the remote tuple might not include all column values. Instead, + * the indexed columns alone are suffcient to identify the target tuple + * (see logicalrep_rel_mark_updatable). + */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* + * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate + * the deleted tuple. + */ +bool +RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, + TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + Relation idxrel; + ScanKeyData skey[INDEX_MAX_KEYS]; + int skey_attoff; + IndexScanDesc scan; + TupleTableSlot *scanslot; + TypeCacheEntry **eq = NULL; + bool isIdxSafeToSkipDuplicates; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + Assert(OidIsValid(idxoid)); + + *delete_xid = InvalidTransactionId; + *delete_time = 0; + *delete_origin = InvalidRepOriginId; + + isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); + + scanslot = table_slot_create(rel, NULL); + + idxrel = index_open(idxoid, RowExclusiveLock); + + /* Build scan key. */ + skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + + /* + * Start an index scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0); + + index_rescan(scan, skey, skey_attoff, NULL, 0); + + /* Try to find the tuple */ + while (index_getnext_slot(scan, ForwardScanDirection, scanslot)) + { + /* + * Avoid expensive equality check if the index is primary key or + * replica identity index. + */ + if (!isIdxSafeToSkipDuplicates) + { + if (eq == NULL) + eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts); + + if (!tuples_equal(scanslot, searchslot, eq, NULL)) + continue; + } + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + index_endscan(scan); + + index_close(idxrel, NoLock); + + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* * Find the tuple that violates the passed unique index (conflictindex). * * If the conflicting tuple is found return true, otherwise false. |