aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execIndexing.c17
-rw-r--r--src/backend/executor/execMain.c7
-rw-r--r--src/backend/executor/execReplication.c236
-rw-r--r--src/backend/executor/nodeModifyTable.c5
4 files changed, 192 insertions, 73 deletions
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 9f05b3654c1..403a3f40551 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
ii = BuildIndexInfo(indexDesc);
/*
- * If the indexes are to be used for speculative insertion, add extra
- * information required by unique index entries.
+ * If the indexes are to be used for speculative insertion or conflict
+ * detection in logical replication, add extra information required by
+ * unique index entries.
*/
if (speculative && ii->ii_Unique)
BuildSpeculativeIndexInfo(indexDesc, ii);
@@ -519,14 +520,18 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
*
* Note that this doesn't lock the values in any way, so it's
* possible that a conflicting tuple is inserted immediately
- * after this returns. But this can be used for a pre-check
- * before insertion.
+ * after this returns. This can be used for either a pre-check
+ * before insertion or a re-check after finding a conflict.
+ *
+ * 'tupleid' should be the TID of the tuple that has been recently
+ * inserted (or can be invalid if we haven't inserted a new tuple yet).
+ * This tuple will be excluded from conflict checking.
* ----------------------------------------------------------------
*/
bool
ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate, ItemPointer conflictTid,
- List *arbiterIndexes)
+ ItemPointer tupleid, List *arbiterIndexes)
{
int i;
int numIndices;
@@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
satisfiesConstraint =
check_exclusion_or_unique_constraint(heapRelation, indexRelation,
- indexInfo, &invalidItemPtr,
+ indexInfo, tupleid,
values, isnull, estate, false,
CEOUC_WAIT, true,
conflictTid);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4d7c92d63c1..29e186fa73d 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -88,11 +88,6 @@ static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
AclMode requiredPerms);
static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
-static char *ExecBuildSlotValueDescription(Oid reloid,
- TupleTableSlot *slot,
- TupleDesc tupdesc,
- Bitmapset *modifiedCols,
- int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
/* end of local decls */
@@ -2210,7 +2205,7 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
* column involved, that subset will be returned with a key identifying which
* columns they are.
*/
-static char *
+char *
ExecBuildSlotValueDescription(Oid reloid,
TupleTableSlot *slot,
TupleDesc tupdesc,
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd5778..1086cbc9624 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -23,6 +23,7 @@
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
+#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
return skey_attoff;
}
+
+/*
+ * Helper function to check if it is necessary to re-fetch and lock the tuple
+ * due to concurrent modifications. This function should be called after
+ * invoking table_tuple_lock.
+ */
+static bool
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+{
+ bool refetch = false;
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ break;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ break;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+
+ return refetch;
+}
+
/*
* Search the relation 'rel' for tuple using the index.
*
@@ -260,34 +306,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
index_endscan(scan);
@@ -444,34 +464,8 @@ retry:
PopActiveSnapshot();
- switch (res)
- {
- case TM_Ok:
- break;
- case TM_Updated:
- /* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- goto retry;
- case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- goto retry;
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- break;
- default:
- elog(ERROR, "unexpected table_tuple_lock status: %u", res);
- break;
- }
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
}
table_endscan(scan);
@@ -481,6 +475,89 @@ retry:
}
/*
+ * Find the tuple that violates the passed unique index (conflictindex).
+ *
+ * If the conflicting tuple is found return true, otherwise false.
+ *
+ * We lock the tuple to avoid getting it deleted before the caller can fetch
+ * the required information. Note that if the tuple is deleted before a lock
+ * is acquired, we will retry to find the conflicting tuple again.
+ */
+static bool
+FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
+ Oid conflictindex, TupleTableSlot *slot,
+ TupleTableSlot **conflictslot)
+{
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointerData conflictTid;
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ *conflictslot = NULL;
+
+retry:
+ if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
+ &conflictTid, &slot->tts_tid,
+ list_make1_oid(conflictindex)))
+ {
+ if (*conflictslot)
+ ExecDropSingleTupleTableSlot(*conflictslot);
+
+ *conflictslot = NULL;
+ return false;
+ }
+
+ *conflictslot = table_slot_create(rel, NULL);
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
+ *conflictslot,
+ GetCurrentCommandId(false),
+ LockTupleShare,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ if (should_refetch_tuple(res, &tmfd))
+ goto retry;
+
+ return true;
+}
+
+/*
+ * Check all the unique indexes in 'recheckIndexes' for conflict with the
+ * tuple in 'remoteslot' and report if found.
+ */
+static void
+CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
+ ConflictType type, List *recheckIndexes,
+ TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
+{
+ /* Check all the unique indexes for a conflict */
+ foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
+ {
+ TupleTableSlot *conflictslot;
+
+ if (list_member_oid(recheckIndexes, uniqueidx) &&
+ FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
+ &conflictslot))
+ {
+ RepOriginId origin;
+ TimestampTz committs;
+ TransactionId xmin;
+
+ GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
+ ReportApplyConflict(estate, resultRelInfo, ERROR, type,
+ searchslot, conflictslot, remoteslot,
+ uniqueidx, xmin, origin, committs);
+ }
+ }
+}
+
+/*
* Insert tuple represented in the slot to the relation, update the indexes,
* and execute any constraints and per-row triggers.
*
@@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (!skip_tuple)
{
List *recheckIndexes = NIL;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
/* OK, store the tuple and create index entries for it */
simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, false, false,
- NULL, NIL, false);
+ slot, estate, false,
+ conflictindexes ? true : false,
+ &conflict,
+ conflictindexes, false);
+
+ /*
+ * Checks the conflict indexes to fetch the conflicting local tuple
+ * and reports the conflict. We perform this check here, instead of
+ * performing an additional index scan before the actual insertion and
+ * reporting the conflict if any conflicting tuples are found. This is
+ * to avoid the overhead of executing the extra scan for each INSERT
+ * operation, even when no conflict arises, which could introduce
+ * significant overhead to replication, particularly in cases where
+ * conflicts are rare.
+ *
+ * XXX OTOH, this could lead to clean-up effort for dead tuples added
+ * in heap and index in case of conflicts. But as conflicts shouldn't
+ * be a frequent thing so we preferred to save the performance
+ * overhead of extra scan before each insertion.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+ recheckIndexes, NULL, slot);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
{
List *recheckIndexes = NIL;
TU_UpdateIndexes update_indexes;
+ List *conflictindexes;
+ bool conflict = false;
/* Compute stored generated columns */
if (rel->rd_att->constr &&
@@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
&update_indexes);
+ conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, true, false,
- NULL, NIL,
+ slot, estate, true,
+ conflictindexes ? true : false,
+ &conflict, conflictindexes,
(update_indexes == TU_Summarizing));
+ /*
+ * Refer to the comments above the call to CheckAndReportConflict() in
+ * ExecSimpleRelationInsert to understand why this check is done at
+ * this point.
+ */
+ if (conflict)
+ CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
+ recheckIndexes, searchslot, slot);
+
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo,
NULL, NULL,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4913e493199..8bf4c80d4a0 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context,
/* Perform a speculative insertion. */
uint32 specToken;
ItemPointerData conflictTid;
+ ItemPointerData invalidItemPtr;
bool specConflict;
List *arbiterIndexes;
+ ItemPointerSetInvalid(&invalidItemPtr);
arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes;
/*
@@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context,
CHECK_FOR_INTERRUPTS();
specConflict = false;
if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate,
- &conflictTid, arbiterIndexes))
+ &conflictTid, &invalidItemPtr,
+ arbiterIndexes))
{
/* committed conflict tuple found */
if (onconflict == ONCONFLICT_UPDATE)