aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execReplication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r--src/backend/executor/execReplication.c236
1 files changed, 176 insertions, 60 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd5778..1086cbc9624 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -23,6 +23,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
+#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
return skey_attoff;
}
+
+/*
+ * Helper function to check if it is necessary to re-fetch and lock the tuple
+ * due to concurrent modifications. This function should be called after
+ * invoking table_tuple_lock.
+ */
+static bool
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+{
+ bool refetch = false;
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ break;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ break;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+
+ return refetch;
+}
+
/*
* Search the relation 'rel' for tuple using the index.
*
@@ -260,34 +306,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
index_endscan(scan);
@@ -444,34 +464,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
table_endscan(scan);
@@ -481,6 +475,89 @@ retry:
}
/*
+ * Find the tuple that violates the passed unique index (conflictindex).
+ *
+ * If the conflicting tuple is found return true, otherwise false.
+ *
+ * We lock the tuple to avoid getting it deleted before the caller can fetch
+ * the required information. Note that if the tuple is deleted before a lock
+ * is acquired, we will retry to find the conflicting tuple again.
+ */
+static bool
+FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
+ Oid conflictindex, TupleTableSlot *slot,
+ TupleTableSlot **conflictslot)
+{
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointerData conflictTid;
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ *conflictslot = NULL;
+
+retry:
+ if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
+ &conflictTid, &slot->tts_tid,
+ list_make1_oid(conflictindex)))
+ {
+ if (*conflictslot)
+ ExecDropSingleTupleTableSlot(*conflictslot);
+
+ *conflictslot = NULL;
+ return false;
+ }
+
+ *conflictslot = table_slot_create(rel, NULL);
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
+ *conflictslot,
+ GetCurrentCommandId(false),
+ LockTupleShare,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
+
+ return true;
+}
+
+/*
+ * Check all the unique indexes in 'recheckIndexes' for conflict with the
+ * tuple in 'remoteslot' and report if found.
+ */
+static void
+CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
+ ConflictType type, List *recheckIndexes,
+ TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
+{
+ /* Check all the unique indexes for a conflict */
+ foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ {
+ TupleTableSlot *conflictslot;
+
+ if (list_member_oid(recheckIndexes, uniqueidx) &&
+ FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
+ &conflictslot))
+ {
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+ }
+}
+
+/*
* Insert tuple represented in the slot to the relation, update the indexes,
* and execute any constraints and per-row triggers.
*
@@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (!skip_tuple)
{
List *recheckIndexes = NIL;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
/* OK, store the tuple and create index entries for it */
simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, false, false,
- NULL, NIL, false);
+ slot, estate, false,
+ conflictindexes ? true : false,
+ &conflict,
+ conflictindexes, false);
+
+ /*
+ * Checks the conflict indexes to fetch the conflicting local tuple
+ * and reports the conflict. We perform this check here, instead of
+ * performing an additional index scan before the actual insertion and
+ * reporting the conflict if any conflicting tuples are found. This is
+ * to avoid the overhead of executing the extra scan for each INSERT
+ * operation, even when no conflict arises, which could introduce
+ * significant overhead to replication, particularly in cases where
+ * conflicts are rare.
+ *
+ * XXX OTOH, this could lead to clean-up effort for dead tuples added
+ * in heap and index in case of conflicts. But as conflicts shouldn't
+ * be a frequent thing so we preferred to save the performance
+ * overhead of extra scan before each insertion.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+ recheckIndexes, NULL, slot);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
{
List *recheckIndexes = NIL;
TU_UpdateIndexes update_indexes;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
&update_indexes);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, true, false,
- NULL, NIL,
+ slot, estate, true,
+ conflictindexes ? true : false,
+ &conflict, conflictindexes,
(update_indexes == TU_Summarizing));
+ /*
+ * Refer to the comments above the call to CheckAndReportConflict() in
+ * ExecSimpleRelationInsert to understand why this check is done at
+ * this point.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
+ recheckIndexes, searchslot, slot);
+
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo,
NULL, NULL,