diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/catalog/system_views.sql | 1 | ||||
-rw-r--r-- | src/backend/commands/analyze.c | 4 | ||||
-rw-r--r-- | src/backend/executor/execReplication.c | 251 | ||||
-rw-r--r-- | src/backend/partitioning/partbounds.c | 57 | ||||
-rw-r--r-- | src/backend/replication/logical/conflict.c | 22 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 186 | ||||
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 14 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 15 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 4 | ||||
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 18 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.dat | 6 | ||||
-rw-r--r-- | src/include/executor/executor.h | 14 | ||||
-rw-r--r-- | src/include/replication/conflict.h | 3 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 5 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 3 | ||||
-rw-r--r-- | src/test/subscription/t/035_conflicts.pl | 66 |
18 files changed, 587 insertions, 92 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f6eca09ee15..77c693f630e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1399,6 +1399,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.confl_insert_exists, ss.confl_update_origin_differs, ss.confl_update_exists, + ss.confl_update_deleted, ss.confl_update_missing, ss.confl_delete_origin_differs, ss.confl_delete_missing, diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 7111d5d5334..40d66537ad7 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -690,8 +690,8 @@ do_analyze_rel(Relation onerel, const VacuumParams params, * only do it for inherited stats. (We're never called for not-inherited * stats on partitioned tables anyway.) * - * Reset the changes_since_analyze counter only if we analyzed all - * columns; otherwise, there is still work for auto-analyze to do. + * Reset the mod_since_analyze counter only if we analyzed all columns; + * otherwise, there is still work for auto-analyze to do. */ if (!inh) pgstat_report_analyze(onerel, totalrows, totaldeadrows, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f262e7a66f7..68184f5d671 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -36,7 +38,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +223,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +279,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -306,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, continue; /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + + /* * If one value is NULL and other is not, then they are certainly not * equal */ @@ -380,7 +393,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -456,6 +469,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex) } /* + * If the tuple is recently dead and was deleted by a transaction with a newer + * commit timestamp than previously recorded, update the associated transaction + * ID, commit time, and origin. This helps ensure that conflict detection uses + * the most recent and relevant deletion metadata. + */ +static void +update_most_recent_deletion_info(TupleTableSlot *scanslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + TimestampTz *delete_time, + RepOriginId *delete_origin) +{ + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + bool recently_dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + hslot = (BufferHeapTupleTableSlot *) scanslot; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates either + * tuples whose inserting transaction was aborted (meaning there is no + * commit timestamp or origin), or tuples deleted by a transaction older + * than oldestxmin, making it safe to ignore them during conflict + * detection (See comments atop worker.c for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + recently_dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!recently_dead) + return; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + if (!TransactionIdIsValid(xmax)) + return; + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + TimestampDifferenceExceeds(*delete_time, localts, 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } +} + +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions + * with IDs >= 'oldestxmin' are considered recently dead and are eligible for + * conflict detection. + * + * Instead of stopping at the first match, we scan all matching dead tuples to + * identify most recent deletion. This is crucial because only the latest + * deletion is relevant for resolving conflicts. + * + * For example, consider a scenario on the subscriber where a row is deleted, + * re-inserted, and then deleted again only on the subscriber: + * + * - (pk, 1) - deleted at 9:00, + * - (pk, 1) - deleted at 9:02, + * + * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01. + * + * If we mistakenly return the older deletion (9:00), the system may wrongly + * apply the remote update using a last-update-wins strategy. Instead, we must + * recognize the more recent deletion at 9:02 and skip the update. See + * comments atop worker.c for details. Note, as of now, conflict resolution + * is not implemented. Consequently, the system may incorrectly report the + * older tuple as the conflicted one, leading to misleading results. + * + * The commit timestamp of the deleting transaction is used to determine which + * tuple was deleted most recently. + */ +bool +RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* + * If the relation has a replica identity key or a primary key that is + * unusable for locating deleted tuples (see + * IsIndexUsableForFindingDeletedTuple), a full table scan becomes + * necessary. In such cases, comparing the entire tuple is not required, + * since the remote tuple might not include all column values. Instead, + * the indexed columns alone are suffcient to identify the target tuple + * (see logicalrep_rel_mark_updatable). + */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* + * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate + * the deleted tuple. + */ +bool +RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, + TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + Relation idxrel; + ScanKeyData skey[INDEX_MAX_KEYS]; + int skey_attoff; + IndexScanDesc scan; + TupleTableSlot *scanslot; + TypeCacheEntry **eq = NULL; + bool isIdxSafeToSkipDuplicates; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + Assert(OidIsValid(idxoid)); + + *delete_xid = InvalidTransactionId; + *delete_time = 0; + *delete_origin = InvalidRepOriginId; + + isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); + + scanslot = table_slot_create(rel, NULL); + + idxrel = index_open(idxoid, RowExclusiveLock); + + /* Build scan key. */ + skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + + /* + * Start an index scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0); + + index_rescan(scan, skey, skey_attoff, NULL, 0); + + /* Try to find the tuple */ + while (index_getnext_slot(scan, ForwardScanDirection, scanslot)) + { + /* + * Avoid expensive equality check if the index is primary key or + * replica identity index. + */ + if (!isIdxSafeToSkipDuplicates) + { + if (eq == NULL) + eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts); + + if (!tuples_equal(scanslot, searchslot, eq, NULL)) + continue; + } + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + index_endscan(scan); + + index_close(idxrel, NoLock); + + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* * Find the tuple that violates the passed unique index (conflictindex). * * If the conflicting tuple is found return true, otherwise false. diff --git a/src/backend/partitioning/partbounds.c b/src/backend/partitioning/partbounds.c index 4bdc2941efb..822cf4ec451 100644 --- a/src/backend/partitioning/partbounds.c +++ b/src/backend/partitioning/partbounds.c @@ -1007,9 +1007,6 @@ partition_bounds_copy(PartitionBoundInfo src, int ndatums; int nindexes; int partnatts; - bool hash_part; - int natts; - Datum *boundDatums; dest = (PartitionBoundInfo) palloc(sizeof(PartitionBoundInfoData)); @@ -1023,7 +1020,7 @@ partition_bounds_copy(PartitionBoundInfo src, dest->datums = (Datum **) palloc(sizeof(Datum *) * ndatums); - if (src->kind != NULL) + if (src->kind != NULL && ndatums > 0) { PartitionRangeDatumKind *boundKinds; @@ -1058,36 +1055,40 @@ partition_bounds_copy(PartitionBoundInfo src, * For hash partitioning, datums array will have two elements - modulus * and remainder. */ - hash_part = (key->strategy == PARTITION_STRATEGY_HASH); - natts = hash_part ? 2 : partnatts; - boundDatums = palloc(ndatums * natts * sizeof(Datum)); - - for (i = 0; i < ndatums; i++) + if (ndatums > 0) { - int j; - - dest->datums[i] = &boundDatums[i * natts]; + bool hash_part = (key->strategy == PARTITION_STRATEGY_HASH); + int natts = hash_part ? 2 : partnatts; + Datum *boundDatums = palloc(ndatums * natts * sizeof(Datum)); - for (j = 0; j < natts; j++) + for (i = 0; i < ndatums; i++) { - bool byval; - int typlen; + int j; - if (hash_part) - { - typlen = sizeof(int32); /* Always int4 */ - byval = true; /* int4 is pass-by-value */ - } - else + dest->datums[i] = &boundDatums[i * natts]; + + for (j = 0; j < natts; j++) { - byval = key->parttypbyval[j]; - typlen = key->parttyplen[j]; - } + if (dest->kind == NULL || + dest->kind[i][j] == PARTITION_RANGE_DATUM_VALUE) + { + bool byval; + int typlen; - if (dest->kind == NULL || - dest->kind[i][j] == PARTITION_RANGE_DATUM_VALUE) - dest->datums[i][j] = datumCopy(src->datums[i][j], - byval, typlen); + if (hash_part) + { + typlen = sizeof(int32); /* Always int4 */ + byval = true; /* int4 is pass-by-value */ + } + else + { + byval = key->parttypbyval[j]; + typlen = key->parttyplen[j]; + } + dest->datums[i][j] = datumCopy(src->datums[i][j], + byval, typlen); + } + } } } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 97c4e26b586..2fd3e8bbda5 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -29,6 +29,7 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", + [CT_UPDATE_DELETED] = "update_deleted", [CT_DELETE_MISSING] = "delete_missing", [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; @@ -176,6 +177,7 @@ errcode_apply_conflict(ConflictType type) case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: case CT_DELETE_ORIGIN_DIFFERS: + case CT_UPDATE_DELETED: case CT_DELETE_MISSING: return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE); } @@ -261,6 +263,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_UPDATE_MISSING: appendStringInfoString(&err_detail, _("Could not find the row to be updated.")); break; diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2f0c08b8fbd..37738440113 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1059,14 +1059,14 @@ ValidateSlotSyncParams(int elevel) { /* * Logical slot sync/creation requires wal_level >= logical. - * - * Since altering the wal_level requires a server restart, so error out in - * this case regardless of elevel provided by caller. */ if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, + { + ereport(elevel, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + return false; + } /* * A physical replication slot(primary_slot_name) is required on the diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b59221c4d06..89e241c8392 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -138,9 +138,9 @@ * Each apply worker that enabled retain_dead_tuples option maintains a * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to * prevent dead rows from being removed prematurely when the apply worker still - * needs them to detect conflicts reliably. This helps to retain the required - * commit_ts module information, which further helps to detect - * update_origin_differs and delete_origin_differs conflicts reliably, as + * needs them to detect update_deleted conflicts. Additionally, this helps to + * retain the required commit_ts module information, which further helps to + * detect update_origin_differs and delete_origin_differs conflicts reliably, as * otherwise, vacuum freeze could remove the required information. * * The logical replication launcher manages an internal replication slot named @@ -185,10 +185,10 @@ * transactions that occurred concurrently with the tuple DELETE, any * subsequent UPDATE from a remote node should have a later timestamp. In such * cases, it is acceptable to detect an update_missing scenario and convert the - * UPDATE to an INSERT when applying it. But, detecting concurrent remote - * transactions with earlier timestamps than the DELETE is necessary, as the - * UPDATEs in remote transactions should be ignored if their timestamp is - * earlier than that of the dead tuples. + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender @@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot); +static bool FindDeletedTupleInLocalRel(Relation localrel, + Oid localidxoid, + TupleTableSlot *remoteslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, @@ -2912,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + /* + * Detecting whether the tuple was recently deleted or never existed + * is crucial to avoid misleading the user during confict handling. + */ + if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, newslot, list_make1(&conflicttuple)); + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot, + list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3143,6 +3163,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, } /* + * Determine whether the index can reliably locate the deleted tuple in the + * local relation. + * + * An index may exclude deleted tuples if it was re-indexed or re-created during + * change application. Therefore, an index is considered usable only if the + * conflict detection slot.xmin (conflict_detection_xmin) is greater than the + * index tuple's xmin. This ensures that any tuples deleted prior to the index + * creation or re-indexing are not relevant for conflict detection in the + * current apply worker. + * + * Note that indexes may also be excluded if they were modified by other DDL + * operations, such as ALTER INDEX. However, this is acceptable, as the + * likelihood of such DDL changes coinciding with the need to scan dead + * tuples for the update_deleted is low. + */ +static bool +IsIndexUsableForFindingDeletedTuple(Oid localindexoid, + TransactionId conflict_detection_xmin) +{ + HeapTuple index_tuple; + TransactionId index_xmin; + + index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid)); + + if (!HeapTupleIsValid(index_tuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", localindexoid); + + /* + * No need to check for a frozen transaction ID, as + * TransactionIdPrecedes() manages it internally, treating it as falling + * behind the conflict_detection_xmin. + */ + index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data); + + ReleaseSysCache(index_tuple); + + return TransactionIdPrecedes(index_xmin, conflict_detection_xmin); +} + +/* + * Attempts to locate a deleted tuple in the local relation that matches the + * values of the tuple received from the publication side (in 'remoteslot'). + * The search is performed using either the replica identity index, primary + * key, other available index, or a sequential scan if necessary. + * + * Returns true if the deleted tuple is found. If found, the transaction ID, + * origin, and commit timestamp of the deletion are stored in '*delete_xid', + * '*delete_origin', and '*delete_time' respectively. + */ +static bool +FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, + TupleTableSlot *remoteslot, + TransactionId *delete_xid, RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TransactionId oldestxmin; + ReplicationSlot *slot; + + /* + * Return false if either dead tuples are not retained or commit timestamp + * data is not available. + */ + if (!MySubscription->retaindeadtuples || !track_commit_timestamp) + return false; + + /* + * For conflict detection, we use the conflict slot's xmin value instead + * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as + * a threshold to identify tuples that were recently deleted. These tuples + * are not visible to concurrent transactions, but we log an + * update_deleted conflict if such a tuple matches the remote update being + * applied. + * + * Although GetOldestNonRemovableTransactionId() can return a value older + * than the slot's xmin, for our current purpose it is acceptable to treat + * tuples deleted by transactions prior to slot.xmin as update_missing + * conflicts. + * + * Ideally, we would use oldest_nonremovable_xid, which is directly + * maintained by the leader apply worker. However, this value is not + * available to table synchronization or parallel apply workers, making + * slot.xmin a practical alternative in those contexts. + */ + slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true); + + Assert(slot); + + SpinLockAcquire(&slot->mutex); + oldestxmin = slot->data.xmin; + SpinLockRelease(&slot->mutex); + + Assert(TransactionIdIsValid(oldestxmin)); + + if (OidIsValid(localidxoid) && + IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) + return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid, + remoteslot, oldestxmin, + delete_xid, delete_origin, + delete_time); + else + return RelationFindDeletedTupleInfoSeq(localrel, remoteslot, + oldestxmin, delete_xid, + delete_origin, delete_time); +} + +/* * This handles insert, update, delete on a partitioned table. */ static void @@ -3260,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + /* + * Detecting whether the tuple was recently deleted or + * never existed is crucial to avoid misleading the user + * during confict handling. + */ + if (FindDeletedTupleInLocalRel(partrel, + part_entry->localindexoid, + remoteslot_part, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ ReportApplyConflict(estate, partrelinfo, LOG, - CT_UPDATE_MISSING, remoteslot_part, - newslot, list_make1(&conflicttuple)); + type, remoteslot_part, newslot, + list_make1(&conflicttuple)); return; } @@ -4172,8 +4315,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) { /* * It is sufficient to manage non-removable transaction ID for a - * subscription by the main apply worker to detect conflicts reliably even - * for table sync or parallel apply workers. + * subscription by the main apply worker to detect update_deleted reliably + * even for table sync or parallel apply workers. */ if (!am_leader_apply_worker()) return false; @@ -4374,10 +4517,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * We expect the publisher and subscriber clocks to be in sync using time * sync service like NTP. Otherwise, we will advance this worker's * oldest_nonremovable_xid prematurely, leading to the removal of rows - * required to detect conflicts reliably. This check primarily addresses - * scenarios where the publisher's clock falls behind; if the publisher's - * clock is ahead, subsequent transactions will naturally bear later - * commit timestamps, conforming to the design outlined atop worker.c. + * required to detect update_deleted reliably. This check primarily + * addresses scenarios where the publisher's clock falls behind; if the + * publisher's clock is ahead, subsequent transactions will naturally bear + * later commit timestamps, conforming to the design outlined atop + * worker.c. * * XXX Consider waiting for the publisher's clock to catch up with the * subscriber's before proceeding to the next phase. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1c12ddbae49..c756c2bebaa 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2197,15 +2197,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 47af743990f..afce1a8e1f0 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1128,12 +1128,15 @@ set_backtrace(ErrorData *edata, int num_skip) nframes = backtrace(buf, lengthof(buf)); strfrms = backtrace_symbols(buf, nframes); - if (strfrms == NULL) - return; - - for (int i = num_skip; i < nframes; i++) - appendStringInfo(&errtrace, "\n%s", strfrms[i]); - free(strfrms); + if (strfrms != NULL) + { + for (int i = num_skip; i < nframes; i++) + appendStringInfo(&errtrace, "\n%s", strfrms[i]); + free(strfrms); + } + else + appendStringInfoString(&errtrace, + "insufficient memory for backtrace generation"); } #else appendStringInfoString(&errtrace, diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 8a5dd24e6c9..0e9d2e23947 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -517,7 +517,7 @@ StreamLogicalLog(void) } /* - * Read the header of the XLogData message, enclosed in the CopyData + * Read the header of the WALData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest * of the header is ignored. */ @@ -605,7 +605,7 @@ StreamLogicalLog(void) /* * We're doing a client-initiated clean exit and have sent CopyDone to * the server. Drain any messages, so we don't miss a last-minute - * ErrorResponse. The walsender stops generating XLogData records once + * ErrorResponse. The walsender stops generating WALData records once * it sees CopyDone, so expect this to finish quickly. After CopyDone, * it's too late for sendFeedback(), even if this were to take a long * time. Hence, use synchronous-mode PQgetCopyData(). diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d6b7f117fa3..f2b54d3c501 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -38,8 +38,8 @@ static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); -static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, - XLogRecPtr *blockpos); +static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos); static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos); static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos); @@ -831,7 +831,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, } else if (copybuf[0] == 'w') { - if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)) + if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos)) goto error; /* @@ -1041,11 +1041,11 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } /* - * Process XLogData message. + * Process WALData message. */ static bool -ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, - XLogRecPtr *blockpos) +ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos) { int xlogoff; int bytes_left; @@ -1054,13 +1054,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, /* * Once we've decided we don't want to receive any more, just ignore any - * subsequent XLogData messages. + * subsequent WALData messages. */ if (!(still_sending)) return true; /* - * Read the header of the XLogData message, enclosed in the CopyData + * Read the header of the WALData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest of * the header is ignored. */ @@ -1162,7 +1162,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, return false; } still_sending = false; - return true; /* ignore the rest of this XLogData packet */ + return true; /* ignore the rest of this WALData packet */ } } } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 5173d422d46..750a9d8a09b 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202507231 +#define CATALOG_VERSION_NO 202508041 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3ee8fed7e53..118d6da1ace 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5688,9 +5688,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 104b059544d..a71502efeed 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "datatype/timestamp.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -759,7 +760,18 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); - +extern bool RelationFindDeletedTupleInfoSeq(Relation rel, + TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); +extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, + TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 6c59125f256..ff3cb8416ec 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -32,6 +32,9 @@ typedef enum /* The updated row value violates unique constraint */ CT_UPDATE_EXISTS, + /* The row to be updated was concurrently deleted by a different origin */ + CT_UPDATE_DELETED, + /* The row to be updated is missing */ CT_UPDATE_MISSING, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 0c7b8440a61..7c0204dd6f4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -87,8 +87,9 @@ typedef struct LogicalRepWorker bool parallel_apply; /* - * The changes made by this and later transactions must be retained to - * ensure reliable conflict detection during the apply phase. + * Changes made by this transaction and subsequent ones must be preserved. + * This ensures that update_deleted conflicts can be accurately detected + * during the apply phase of logical replication by this worker. * * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index dce8c672b40..6509fda77a9 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2179,13 +2179,14 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_insert_exists, ss.confl_update_origin_differs, ss.confl_update_exists, + ss.confl_update_deleted, ss.confl_update_missing, ss.confl_delete_origin_differs, ss.confl_delete_missing, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 976d53a870e..36aeb14c563 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -150,7 +150,9 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert'); # Setup a bidirectional logical replication between node_A & node_B ############################################################################### -# Initialize nodes. +# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect +# the conflict when attempting to update a row that was previously modified by +# a different origin. # node_A. Increase the log_min_messages setting to DEBUG2 to debug test # failures. Disable autovacuum to avoid generating xid that could affect the @@ -158,7 +160,8 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert'); my $node_A = $node_publisher; $node_A->append_conf( 'postgresql.conf', - qq{autovacuum = off + qq{track_commit_timestamp = on + autovacuum = off log_min_messages = 'debug2'}); $node_A->restart; @@ -270,6 +273,8 @@ $node_A->psql('postgres', ############################################################################### # Check that dead tuples on node A cannot be cleaned by VACUUM until the # concurrent transactions on Node B have been applied and flushed on Node A. +# Also, check that an update_deleted conflict is detected when updating a row +# that was deleted by a different origin. ############################################################################### # Insert a record @@ -288,6 +293,8 @@ $node_A->poll_query_until('postgres', "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'" ); +my $log_location = -s $node_B->logfile; + $node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); @@ -299,10 +306,30 @@ ok( $stderr =~ qr/1 are dead but not yet removable/, 'the deleted column is non-removable'); +# Ensure the DELETE is replayed on Node B +$node_A->wait_for_catchup($subname_BA); + +# Check the conflict detected on Node B +my $logfile = slurp_file($node_B->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.* +.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .* +.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'delete target row was modified in tab'); + +$log_location = -s $node_A->logfile; + $node_A->safe_psql( 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); $node_B->wait_for_catchup($subname_AB); +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.* +.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .* +.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'update target row was deleted in tab'); + # Remember the next transaction ID to be assigned my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); @@ -325,6 +352,41 @@ ok( $stderr =~ 'the deleted column is removed'); ############################################################################### +# Ensure that the deleted tuple needed to detect an update_deleted conflict is +# accessible via a sequential table scan. +############################################################################### + +# Drop the primary key from tab on node A and set REPLICA IDENTITY to FULL to +# enforce sequential scanning of the table. +$node_A->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL"); +$node_B->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL"); +$node_A->safe_psql('postgres', "ALTER TABLE tab DROP CONSTRAINT tab_pkey;"); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +# Wait for the apply worker to stop +$node_A->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'" +); + +$node_B->safe_psql('postgres', "UPDATE tab SET b = 4 WHERE a = 2;"); +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 2;"); + +$log_location = -s $node_A->logfile; + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.* +.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .* +.*Remote tuple \(2, 4\); replica identity full \(2, 2\)/, + 'update target row was deleted in tab'); + +############################################################################### # Check that the replication slot pg_conflict_detection is dropped after # removing all the subscriptions. ############################################################################### |