diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 1 | ||||
-rw-r--r-- | src/backend/commands/analyze.c | 4 | ||||
-rw-r--r-- | src/backend/executor/execReplication.c | 251 | ||||
-rw-r--r-- | src/backend/partitioning/partbounds.c | 57 | ||||
-rw-r--r-- | src/backend/replication/logical/conflict.c | 22 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 186 | ||||
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 14 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 15 |
9 files changed, 487 insertions, 71 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f6eca09ee15..77c693f630e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1399,6 +1399,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.confl_insert_exists, ss.confl_update_origin_differs, ss.confl_update_exists, + ss.confl_update_deleted, ss.confl_update_missing, ss.confl_delete_origin_differs, ss.confl_delete_missing, diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 7111d5d5334..40d66537ad7 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -690,8 +690,8 @@ do_analyze_rel(Relation onerel, const VacuumParams params, * only do it for inherited stats. (We're never called for not-inherited * stats on partitioned tables anyway.) * - * Reset the changes_since_analyze counter only if we analyzed all - * columns; otherwise, there is still work for auto-analyze to do. + * Reset the mod_since_analyze counter only if we analyzed all columns; + * otherwise, there is still work for auto-analyze to do. */ if (!inh) pgstat_report_analyze(onerel, totalrows, totaldeadrows, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f262e7a66f7..68184f5d671 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -36,7 +38,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +223,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +279,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -306,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, continue; /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + + /* * If one value is NULL and other is not, then they are certainly not * equal */ @@ -380,7 +393,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -456,6 +469,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex) } /* + * If the tuple is recently dead and was deleted by a transaction with a newer + * commit timestamp than previously recorded, update the associated transaction + * ID, commit time, and origin. This helps ensure that conflict detection uses + * the most recent and relevant deletion metadata. + */ +static void +update_most_recent_deletion_info(TupleTableSlot *scanslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + TimestampTz *delete_time, + RepOriginId *delete_origin) +{ + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + bool recently_dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + hslot = (BufferHeapTupleTableSlot *) scanslot; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates either + * tuples whose inserting transaction was aborted (meaning there is no + * commit timestamp or origin), or tuples deleted by a transaction older + * than oldestxmin, making it safe to ignore them during conflict + * detection (See comments atop worker.c for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + recently_dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!recently_dead) + return; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + if (!TransactionIdIsValid(xmax)) + return; + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + TimestampDifferenceExceeds(*delete_time, localts, 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } +} + +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions + * with IDs >= 'oldestxmin' are considered recently dead and are eligible for + * conflict detection. + * + * Instead of stopping at the first match, we scan all matching dead tuples to + * identify most recent deletion. This is crucial because only the latest + * deletion is relevant for resolving conflicts. + * + * For example, consider a scenario on the subscriber where a row is deleted, + * re-inserted, and then deleted again only on the subscriber: + * + * - (pk, 1) - deleted at 9:00, + * - (pk, 1) - deleted at 9:02, + * + * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01. + * + * If we mistakenly return the older deletion (9:00), the system may wrongly + * apply the remote update using a last-update-wins strategy. Instead, we must + * recognize the more recent deletion at 9:02 and skip the update. See + * comments atop worker.c for details. Note, as of now, conflict resolution + * is not implemented. Consequently, the system may incorrectly report the + * older tuple as the conflicted one, leading to misleading results. + * + * The commit timestamp of the deleting transaction is used to determine which + * tuple was deleted most recently. + */ +bool +RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* + * If the relation has a replica identity key or a primary key that is + * unusable for locating deleted tuples (see + * IsIndexUsableForFindingDeletedTuple), a full table scan becomes + * necessary. In such cases, comparing the entire tuple is not required, + * since the remote tuple might not include all column values. Instead, + * the indexed columns alone are suffcient to identify the target tuple + * (see logicalrep_rel_mark_updatable). + */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* + * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate + * the deleted tuple. + */ +bool +RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, + TupleTableSlot *searchslot, + TransactionId oldestxmin, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + Relation idxrel; + ScanKeyData skey[INDEX_MAX_KEYS]; + int skey_attoff; + IndexScanDesc scan; + TupleTableSlot *scanslot; + TypeCacheEntry **eq = NULL; + bool isIdxSafeToSkipDuplicates; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + Assert(OidIsValid(idxoid)); + + *delete_xid = InvalidTransactionId; + *delete_time = 0; + *delete_origin = InvalidRepOriginId; + + isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); + + scanslot = table_slot_create(rel, NULL); + + idxrel = index_open(idxoid, RowExclusiveLock); + + /* Build scan key. */ + skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + + /* + * Start an index scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. Tuples from transactions + * not yet committed or those just committed prior to the scan are + * excluded in update_most_recent_deletion_info(). + */ + scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0); + + index_rescan(scan, skey, skey_attoff, NULL, 0); + + /* Try to find the tuple */ + while (index_getnext_slot(scan, ForwardScanDirection, scanslot)) + { + /* + * Avoid expensive equality check if the index is primary key or + * replica identity index. + */ + if (!isIdxSafeToSkipDuplicates) + { + if (eq == NULL) + eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts); + + if (!tuples_equal(scanslot, searchslot, eq, NULL)) + continue; + } + + update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid, + delete_time, delete_origin); + } + + index_endscan(scan); + + index_close(idxrel, NoLock); + + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + +/* * Find the tuple that violates the passed unique index (conflictindex). * * If the conflicting tuple is found return true, otherwise false. diff --git a/src/backend/partitioning/partbounds.c b/src/backend/partitioning/partbounds.c index 4bdc2941efb..822cf4ec451 100644 --- a/src/backend/partitioning/partbounds.c +++ b/src/backend/partitioning/partbounds.c @@ -1007,9 +1007,6 @@ partition_bounds_copy(PartitionBoundInfo src, int ndatums; int nindexes; int partnatts; - bool hash_part; - int natts; - Datum *boundDatums; dest = (PartitionBoundInfo) palloc(sizeof(PartitionBoundInfoData)); @@ -1023,7 +1020,7 @@ partition_bounds_copy(PartitionBoundInfo src, dest->datums = (Datum **) palloc(sizeof(Datum *) * ndatums); - if (src->kind != NULL) + if (src->kind != NULL && ndatums > 0) { PartitionRangeDatumKind *boundKinds; @@ -1058,36 +1055,40 @@ partition_bounds_copy(PartitionBoundInfo src, * For hash partitioning, datums array will have two elements - modulus * and remainder. */ - hash_part = (key->strategy == PARTITION_STRATEGY_HASH); - natts = hash_part ? 2 : partnatts; - boundDatums = palloc(ndatums * natts * sizeof(Datum)); - - for (i = 0; i < ndatums; i++) + if (ndatums > 0) { - int j; - - dest->datums[i] = &boundDatums[i * natts]; + bool hash_part = (key->strategy == PARTITION_STRATEGY_HASH); + int natts = hash_part ? 2 : partnatts; + Datum *boundDatums = palloc(ndatums * natts * sizeof(Datum)); - for (j = 0; j < natts; j++) + for (i = 0; i < ndatums; i++) { - bool byval; - int typlen; + int j; - if (hash_part) - { - typlen = sizeof(int32); /* Always int4 */ - byval = true; /* int4 is pass-by-value */ - } - else + dest->datums[i] = &boundDatums[i * natts]; + + for (j = 0; j < natts; j++) { - byval = key->parttypbyval[j]; - typlen = key->parttyplen[j]; - } + if (dest->kind == NULL || + dest->kind[i][j] == PARTITION_RANGE_DATUM_VALUE) + { + bool byval; + int typlen; - if (dest->kind == NULL || - dest->kind[i][j] == PARTITION_RANGE_DATUM_VALUE) - dest->datums[i][j] = datumCopy(src->datums[i][j], - byval, typlen); + if (hash_part) + { + typlen = sizeof(int32); /* Always int4 */ + byval = true; /* int4 is pass-by-value */ + } + else + { + byval = key->parttypbyval[j]; + typlen = key->parttyplen[j]; + } + dest->datums[i][j] = datumCopy(src->datums[i][j], + byval, typlen); + } + } } } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 97c4e26b586..2fd3e8bbda5 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -29,6 +29,7 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", + [CT_UPDATE_DELETED] = "update_deleted", [CT_DELETE_MISSING] = "delete_missing", [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; @@ -176,6 +177,7 @@ errcode_apply_conflict(ConflictType type) case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: case CT_DELETE_ORIGIN_DIFFERS: + case CT_UPDATE_DELETED: case CT_DELETE_MISSING: return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE); } @@ -261,6 +263,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_UPDATE_MISSING: appendStringInfoString(&err_detail, _("Could not find the row to be updated.")); break; diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2f0c08b8fbd..37738440113 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1059,14 +1059,14 @@ ValidateSlotSyncParams(int elevel) { /* * Logical slot sync/creation requires wal_level >= logical. - * - * Since altering the wal_level requires a server restart, so error out in - * this case regardless of elevel provided by caller. */ if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, + { + ereport(elevel, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + return false; + } /* * A physical replication slot(primary_slot_name) is required on the diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b59221c4d06..89e241c8392 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -138,9 +138,9 @@ * Each apply worker that enabled retain_dead_tuples option maintains a * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to * prevent dead rows from being removed prematurely when the apply worker still - * needs them to detect conflicts reliably. This helps to retain the required - * commit_ts module information, which further helps to detect - * update_origin_differs and delete_origin_differs conflicts reliably, as + * needs them to detect update_deleted conflicts. Additionally, this helps to + * retain the required commit_ts module information, which further helps to + * detect update_origin_differs and delete_origin_differs conflicts reliably, as * otherwise, vacuum freeze could remove the required information. * * The logical replication launcher manages an internal replication slot named @@ -185,10 +185,10 @@ * transactions that occurred concurrently with the tuple DELETE, any * subsequent UPDATE from a remote node should have a later timestamp. In such * cases, it is acceptable to detect an update_missing scenario and convert the - * UPDATE to an INSERT when applying it. But, detecting concurrent remote - * transactions with earlier timestamps than the DELETE is necessary, as the - * UPDATEs in remote transactions should be ignored if their timestamp is - * earlier than that of the dead tuples. + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender @@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot); +static bool FindDeletedTupleInLocalRel(Relation localrel, + Oid localidxoid, + TupleTableSlot *remoteslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, @@ -2912,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + /* + * Detecting whether the tuple was recently deleted or never existed + * is crucial to avoid misleading the user during confict handling. + */ + if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, newslot, list_make1(&conflicttuple)); + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot, + list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3143,6 +3163,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, } /* + * Determine whether the index can reliably locate the deleted tuple in the + * local relation. + * + * An index may exclude deleted tuples if it was re-indexed or re-created during + * change application. Therefore, an index is considered usable only if the + * conflict detection slot.xmin (conflict_detection_xmin) is greater than the + * index tuple's xmin. This ensures that any tuples deleted prior to the index + * creation or re-indexing are not relevant for conflict detection in the + * current apply worker. + * + * Note that indexes may also be excluded if they were modified by other DDL + * operations, such as ALTER INDEX. However, this is acceptable, as the + * likelihood of such DDL changes coinciding with the need to scan dead + * tuples for the update_deleted is low. + */ +static bool +IsIndexUsableForFindingDeletedTuple(Oid localindexoid, + TransactionId conflict_detection_xmin) +{ + HeapTuple index_tuple; + TransactionId index_xmin; + + index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid)); + + if (!HeapTupleIsValid(index_tuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", localindexoid); + + /* + * No need to check for a frozen transaction ID, as + * TransactionIdPrecedes() manages it internally, treating it as falling + * behind the conflict_detection_xmin. + */ + index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data); + + ReleaseSysCache(index_tuple); + + return TransactionIdPrecedes(index_xmin, conflict_detection_xmin); +} + +/* + * Attempts to locate a deleted tuple in the local relation that matches the + * values of the tuple received from the publication side (in 'remoteslot'). + * The search is performed using either the replica identity index, primary + * key, other available index, or a sequential scan if necessary. + * + * Returns true if the deleted tuple is found. If found, the transaction ID, + * origin, and commit timestamp of the deletion are stored in '*delete_xid', + * '*delete_origin', and '*delete_time' respectively. + */ +static bool +FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, + TupleTableSlot *remoteslot, + TransactionId *delete_xid, RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TransactionId oldestxmin; + ReplicationSlot *slot; + + /* + * Return false if either dead tuples are not retained or commit timestamp + * data is not available. + */ + if (!MySubscription->retaindeadtuples || !track_commit_timestamp) + return false; + + /* + * For conflict detection, we use the conflict slot's xmin value instead + * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as + * a threshold to identify tuples that were recently deleted. These tuples + * are not visible to concurrent transactions, but we log an + * update_deleted conflict if such a tuple matches the remote update being + * applied. + * + * Although GetOldestNonRemovableTransactionId() can return a value older + * than the slot's xmin, for our current purpose it is acceptable to treat + * tuples deleted by transactions prior to slot.xmin as update_missing + * conflicts. + * + * Ideally, we would use oldest_nonremovable_xid, which is directly + * maintained by the leader apply worker. However, this value is not + * available to table synchronization or parallel apply workers, making + * slot.xmin a practical alternative in those contexts. + */ + slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true); + + Assert(slot); + + SpinLockAcquire(&slot->mutex); + oldestxmin = slot->data.xmin; + SpinLockRelease(&slot->mutex); + + Assert(TransactionIdIsValid(oldestxmin)); + + if (OidIsValid(localidxoid) && + IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) + return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid, + remoteslot, oldestxmin, + delete_xid, delete_origin, + delete_time); + else + return RelationFindDeletedTupleInfoSeq(localrel, remoteslot, + oldestxmin, delete_xid, + delete_origin, delete_time); +} + +/* * This handles insert, update, delete on a partitioned table. */ static void @@ -3260,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + /* + * Detecting whether the tuple was recently deleted or + * never existed is crucial to avoid misleading the user + * during confict handling. + */ + if (FindDeletedTupleInLocalRel(partrel, + part_entry->localindexoid, + remoteslot_part, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ ReportApplyConflict(estate, partrelinfo, LOG, - CT_UPDATE_MISSING, remoteslot_part, - newslot, list_make1(&conflicttuple)); + type, remoteslot_part, newslot, + list_make1(&conflicttuple)); return; } @@ -4172,8 +4315,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) { /* * It is sufficient to manage non-removable transaction ID for a - * subscription by the main apply worker to detect conflicts reliably even - * for table sync or parallel apply workers. + * subscription by the main apply worker to detect update_deleted reliably + * even for table sync or parallel apply workers. */ if (!am_leader_apply_worker()) return false; @@ -4374,10 +4517,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * We expect the publisher and subscriber clocks to be in sync using time * sync service like NTP. Otherwise, we will advance this worker's * oldest_nonremovable_xid prematurely, leading to the removal of rows - * required to detect conflicts reliably. This check primarily addresses - * scenarios where the publisher's clock falls behind; if the publisher's - * clock is ahead, subsequent transactions will naturally bear later - * commit timestamps, conforming to the design outlined atop worker.c. + * required to detect update_deleted reliably. This check primarily + * addresses scenarios where the publisher's clock falls behind; if the + * publisher's clock is ahead, subsequent transactions will naturally bear + * later commit timestamps, conforming to the design outlined atop + * worker.c. * * XXX Consider waiting for the publisher's clock to catch up with the * subscriber's before proceeding to the next phase. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1c12ddbae49..c756c2bebaa 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2197,15 +2197,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 47af743990f..afce1a8e1f0 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1128,12 +1128,15 @@ set_backtrace(ErrorData *edata, int num_skip) nframes = backtrace(buf, lengthof(buf)); strfrms = backtrace_symbols(buf, nframes); - if (strfrms == NULL) - return; - - for (int i = num_skip; i < nframes; i++) - appendStringInfo(&errtrace, "\n%s", strfrms[i]); - free(strfrms); + if (strfrms != NULL) + { + for (int i = num_skip; i < nframes; i++) + appendStringInfo(&errtrace, "\n%s", strfrms[i]); + free(strfrms); + } + else + appendStringInfoString(&errtrace, + "insufficient memory for backtrace generation"); } #else appendStringInfoString(&errtrace, |