aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeModifyTable.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2022-03-28 16:45:58 +0200
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2022-03-28 16:47:48 +0200
commit7103ebb7aae8ab8076b7e85f335ceb8fe799097c (patch)
tree0bc2faf176b58d2546de40c3c36d93a4cdf1aafe /src/backend/executor/nodeModifyTable.c
parentae63017bdb316b16a9f201b10f1221598111d6c5 (diff)
downloadpostgresql-7103ebb7aae8ab8076b7e85f335ceb8fe799097c.tar.gz
postgresql-7103ebb7aae8ab8076b7e85f335ceb8fe799097c.zip
Add support for MERGE SQL command
MERGE performs actions that modify rows in the target table using a source table or query. MERGE provides a single SQL statement that can conditionally INSERT/UPDATE/DELETE rows -- a task that would otherwise require multiple PL statements. For example, MERGE INTO target AS t USING source AS s ON t.tid = s.sid WHEN MATCHED AND t.balance > s.delta THEN UPDATE SET balance = t.balance - s.delta WHEN MATCHED THEN DELETE WHEN NOT MATCHED AND s.delta > 0 THEN INSERT VALUES (s.sid, s.delta) WHEN NOT MATCHED THEN DO NOTHING; MERGE works with regular tables, partitioned tables and inheritance hierarchies, including column and row security enforcement, as well as support for row and statement triggers and transition tables therein. MERGE is optimized for OLTP and is parameterizable, though also useful for large scale ETL/ELT. MERGE is not intended to be used in preference to existing single SQL commands for INSERT, UPDATE or DELETE since there is some overhead. MERGE can be used from PL/pgSQL. MERGE does not support targetting updatable views or foreign tables, and RETURNING clauses are not allowed either. These limitations are likely fixable with sufficient effort. Rewrite rules are also not supported, but it's not clear that we'd want to support them. Author: Pavan Deolasee <pavan.deolasee@gmail.com> Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Author: Amit Langote <amitlangote09@gmail.com> Author: Simon Riggs <simon.riggs@enterprisedb.com> Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com> Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions) Reviewed-by: Peter Geoghegan <pg@bowt.ie> (earlier versions) Reviewed-by: Robert Haas <robertmhaas@gmail.com> (earlier versions) Reviewed-by: Japin Li <japinli@hotmail.com> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com> Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Discussion: https://postgr.es/m/CANP8+jKitBSrB7oTgT9CY2i1ObfOt36z0XMraQc+Xrz8QB0nXA@mail.gmail.com Discussion: https://postgr.es/m/CAH2-WzkJdBuxj9PO=2QaO9-3h3xGbQPZ34kJH=HukRekwM-GZg@mail.gmail.com Discussion: https://postgr.es/m/20201231134736.GA25392@alvherre.pgsql
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r--src/backend/executor/nodeModifyTable.c936
1 files changed, 900 insertions, 36 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 701fe052967..171575cd73b 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -24,11 +24,20 @@
* values plus row-locating info for UPDATE and MERGE cases, or just the
* row-locating info for DELETE cases.
*
+ * MERGE runs a join between the source relation and the target
+ * table; if any WHEN NOT MATCHED clauses are present, then the
+ * join is an outer join. In this case, any unmatched tuples will
+ * have NULL row-locating info, and only INSERT can be run. But for
+ * matched tuples, then row-locating info is used to determine the
+ * tuple to UPDATE or DELETE. When all clauses are WHEN MATCHED,
+ * then an inner join is used, so all tuples contain row-locating info.
+ *
* If the query specifies RETURNING, then the ModifyTable returns a
* RETURNING tuple after completing each row insert, update, or delete.
* It must be called again to continue the operation. Without RETURNING,
* we just loop within the node until all the work is done, then
- * return NULL. This avoids useless call/return overhead.
+ * return NULL. This avoids useless call/return overhead. (MERGE does
+ * not support RETURNING.)
*/
#include "postgres.h"
@@ -78,6 +87,17 @@ typedef struct ModifyTableContext
*/
TupleTableSlot *planSlot;
+ /*
+ * During EvalPlanQual, project and return the new version of the new
+ * tuple
+ */
+ TupleTableSlot *(*GetUpdateNewTuple) (ResultRelInfo *resultRelInfo,
+ TupleTableSlot *epqslot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
+ /* MERGE specific */
+ MergeActionState *relaction; /* MERGE action in progress */
/*
* Information about the changes that were made concurrently to a tuple
@@ -140,6 +160,28 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
ResultRelInfo *targetRelInfo,
TupleTableSlot *slot,
ResultRelInfo **partRelInfo);
+static TupleTableSlot *internalGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
+static TupleTableSlot *ExecMerge(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid,
+ bool canSetTag);
+static void ExecInitMerge(ModifyTableState *mtstate, EState *estate);
+static bool ExecMergeMatched(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid,
+ bool canSetTag);
+static void ExecMergeNotMatched(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ bool canSetTag);
+static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
/*
* Verify that the tuples to be produced by INSERT match the
@@ -616,21 +658,32 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
TupleTableSlot *planSlot,
TupleTableSlot *oldSlot)
{
- ProjectionInfo *newProj = relinfo->ri_projectNew;
- ExprContext *econtext;
-
/* Use a few extra Asserts to protect against outside callers */
Assert(relinfo->ri_projectNewInfoValid);
Assert(planSlot != NULL && !TTS_EMPTY(planSlot));
Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot));
+ return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL);
+}
+
+/*
+ * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE.
+ */
+static TupleTableSlot *
+internalGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction)
+{
+ ProjectionInfo *newProj = relinfo->ri_projectNew;
+ ExprContext *econtext;
+
econtext = newProj->pi_exprContext;
econtext->ecxt_outertuple = planSlot;
econtext->ecxt_scantuple = oldSlot;
return ExecProject(newProj);
}
-
/* ----------------------------------------------------------------
* ExecInsert
*
@@ -847,9 +900,17 @@ ExecInsert(ModifyTableContext *context,
* partition, we should instead check UPDATE policies, because we are
* executing policies defined on the target table, and not those
* defined on the child partitions.
+ *
+ * If we're running MERGE, we refer to the action that we're executing
+ * to know if we're doing an INSERT or UPDATE to a partition table.
*/
- wco_kind = (mtstate->operation == CMD_UPDATE) ?
- WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ if (mtstate->operation == CMD_UPDATE)
+ wco_kind = WCO_RLS_UPDATE_CHECK;
+ else if (mtstate->operation == CMD_MERGE)
+ wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ?
+ WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ else
+ wco_kind = WCO_RLS_INSERT_CHECK;
/*
* ExecWithCheckOptions() will skip any WCOs which are not of the kind
@@ -1453,7 +1514,13 @@ ldelete:;
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent delete")));
- /* tuple already deleted; nothing to do */
+
+ /*
+ * tuple already deleted; nothing to do. But MERGE might want
+ * to handle it differently. We've already filled-in
+ * actionInfo with sufficient information for MERGE to look
+ * at.
+ */
return NULL;
default:
@@ -1659,7 +1726,8 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
elog(ERROR, "failed to fetch tuple being updated");
/* and project the new tuple to retry the UPDATE with */
context->cpUpdateRetrySlot =
- ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot);
+ context->GetUpdateNewTuple(resultRelInfo, epqslot, oldSlot,
+ context->relaction);
return false;
}
}
@@ -1718,7 +1786,8 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
return ExecBRUpdateTriggers(context->estate, context->epqstate,
- resultRelInfo, tupleid, oldtuple, slot);
+ resultRelInfo, tupleid, oldtuple, slot,
+ &context->tmfd);
return true;
}
@@ -1865,6 +1934,13 @@ lreplace:;
}
/*
+ * No luck, a retry is needed. If running MERGE, we do not do so
+ * here; instead let it handle that on its own rules.
+ */
+ if (context->relaction != NULL)
+ return TM_Updated;
+
+ /*
* ExecCrossPartitionUpdate installed an updated version of the new
* tuple in the retry slot; start over.
*/
@@ -2109,8 +2185,8 @@ redo_act:
/*
* If ExecUpdateAct reports that a cross-partition update was done,
- * then the returning tuple has been projected and there's nothing
- * else for us to do.
+ * then the RETURNING tuple (if any) has been projected and there's
+ * nothing else for us to do.
*/
if (updateCxt.crossPartUpdate)
return context->cpUpdateReturningSlot;
@@ -2337,9 +2413,9 @@ ExecOnConflictUpdate(ModifyTableContext *context,
* to break.
*
* It is the user's responsibility to prevent this situation from
- * occurring. These problems are why SQL-2003 similarly specifies
- * that for SQL MERGE, an exception must be raised in the event of
- * an attempt to update the same row twice.
+ * occurring. These problems are why the SQL standard similarly
+ * specifies that for SQL MERGE, an exception must be raised in
+ * the event of an attempt to update the same row twice.
*/
xminDatum = slot_getsysattr(existing,
MinTransactionIdAttributeNumber,
@@ -2350,7 +2426,9 @@ ExecOnConflictUpdate(ModifyTableContext *context,
if (TransactionIdIsCurrentTransactionId(xmin))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
- errmsg("ON CONFLICT DO UPDATE command cannot affect row a second time"),
+ /* translator: %s is a SQL command name */
+ errmsg("%s command cannot affect row a second time",
+ "ON CONFLICT DO UPDATE"),
errhint("Ensure that no rows proposed for insertion within the same command have duplicate constrained values.")));
/* This shouldn't happen */
@@ -2490,6 +2568,705 @@ ExecOnConflictUpdate(ModifyTableContext *context,
return true;
}
+/*
+ * Perform MERGE.
+ */
+static TupleTableSlot *
+ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid, bool canSetTag)
+{
+ bool matched;
+
+ /*-----
+ * If we are dealing with a WHEN MATCHED case (tupleid is valid), we
+ * execute the first action for which the additional WHEN MATCHED AND
+ * quals pass. If an action without quals is found, that action is
+ * executed.
+ *
+ * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at
+ * the given WHEN NOT MATCHED actions in sequence until one passes.
+ *
+ * Things get interesting in case of concurrent update/delete of the
+ * target tuple. Such concurrent update/delete is detected while we are
+ * executing a WHEN MATCHED action.
+ *
+ * A concurrent update can:
+ *
+ * 1. modify the target tuple so that it no longer satisfies the
+ * additional quals attached to the current WHEN MATCHED action
+ *
+ * In this case, we are still dealing with a WHEN MATCHED case.
+ * We recheck the list of WHEN MATCHED actions from the start and
+ * choose the first one that satisfies the new target tuple.
+ *
+ * 2. modify the target tuple so that the join quals no longer pass and
+ * hence the source tuple no longer has a match.
+ *
+ * In this case, the source tuple no longer matches the target tuple,
+ * so we now instead find a qualifying WHEN NOT MATCHED action to
+ * execute.
+ *
+ * XXX Hmmm, what if the updated tuple would now match one that was
+ * considered NOT MATCHED so far?
+ *
+ * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED.
+ *
+ * ExecMergeMatched takes care of following the update chain and
+ * re-finding the qualifying WHEN MATCHED action, as long as the updated
+ * target tuple still satisfies the join quals, i.e., it remains a WHEN
+ * MATCHED case. If the tuple gets deleted or the join quals fail, it
+ * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
+ * always make progress by following the update chain and we never switch
+ * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
+ * livelock.
+ */
+ matched = tupleid != NULL;
+ if (matched)
+ matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag);
+
+ /*
+ * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched()
+ * returned "false", indicating the previously MATCHED tuple no longer
+ * matches.
+ */
+ if (!matched)
+ ExecMergeNotMatched(context, resultRelInfo, canSetTag);
+
+ /* No RETURNING support yet */
+ return NULL;
+}
+
+/*
+ * Check and execute the first qualifying MATCHED action. The current target
+ * tuple is identified by tupleid.
+ *
+ * We start from the first WHEN MATCHED action and check if the WHEN quals
+ * pass, if any. If the WHEN quals for the first action do not pass, we
+ * check the second, then the third and so on. If we reach to the end, no
+ * action is taken and we return true, indicating that no further action is
+ * required for this tuple.
+ *
+ * If we do find a qualifying action, then we attempt to execute the action.
+ *
+ * If the tuple is concurrently updated, EvalPlanQual is run with the updated
+ * tuple to recheck the join quals. Note that the additional quals associated
+ * with individual actions are evaluated by this routine via ExecQual, while
+ * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the
+ * updated tuple still passes the join quals, then we restart from the first
+ * action to look for a qualifying action. Otherwise, we return false --
+ * meaning that a NOT MATCHED action must now be executed for the current
+ * source tuple.
+ */
+static bool
+ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid, bool canSetTag)
+{
+ ModifyTableState *mtstate = context->mtstate;
+ TupleTableSlot *newslot;
+ EState *estate = context->estate;
+ ExprContext *econtext = mtstate->ps.ps_ExprContext;
+ bool isNull;
+ EPQState *epqstate = &mtstate->mt_epqstate;
+ ListCell *l;
+
+ /*
+ * If there are no WHEN MATCHED actions, we are done.
+ */
+ if (resultRelInfo->ri_matchedMergeAction == NIL)
+ return true;
+
+ /*
+ * Make tuple and any needed join variables available to ExecQual and
+ * ExecProject. The target's existing tuple is installed in the scantuple.
+ * Again, this target relation's slot is required only in the case of a
+ * MATCHED tuple and UPDATE/DELETE actions.
+ */
+ econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot;
+ econtext->ecxt_innertuple = context->planSlot;
+ econtext->ecxt_outertuple = NULL;
+
+lmerge_matched:;
+
+ /*
+ * This routine is only invoked for matched rows, and we must have found
+ * the tupleid of the target row in that case; fetch that tuple.
+ *
+ * We use SnapshotAny for this because we might get called again after
+ * EvalPlanQual returns us a new tuple, which may not be visible to our
+ * MVCC snapshot.
+ */
+
+ if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+ tupleid,
+ SnapshotAny,
+ resultRelInfo->ri_oldTupleSlot))
+ elog(ERROR, "failed to fetch the target tuple");
+
+ foreach(l, resultRelInfo->ri_matchedMergeAction)
+ {
+ MergeActionState *relaction = (MergeActionState *) lfirst(l);
+ CmdType commandType = relaction->mas_action->commandType;
+ List *recheckIndexes = NIL;
+ TM_Result result;
+ UpdateContext updateCxt = {0};
+
+ /*
+ * Test condition, if any.
+ *
+ * In the absence of any condition, we perform the action
+ * unconditionally (no need to check separately since ExecQual() will
+ * return true if there are no conditions to evaluate).
+ */
+ if (!ExecQual(relaction->mas_whenqual, econtext))
+ continue;
+
+ /*
+ * Check if the existing target tuple meets the USING checks of
+ * UPDATE/DELETE RLS policies. If those checks fail, we throw an
+ * error.
+ *
+ * The WITH CHECK quals are applied in ExecUpdate() and hence we need
+ * not do anything special to handle them.
+ *
+ * NOTE: We must do this after WHEN quals are evaluated, so that we
+ * check policies only when they matter.
+ */
+ if (resultRelInfo->ri_WithCheckOptions)
+ {
+ ExecWithCheckOptions(commandType == CMD_UPDATE ?
+ WCO_RLS_MERGE_UPDATE_CHECK : WCO_RLS_MERGE_DELETE_CHECK,
+ resultRelInfo,
+ resultRelInfo->ri_oldTupleSlot,
+ context->mtstate->ps.state);
+ }
+
+ /* Perform stated action */
+ switch (commandType)
+ {
+ case CMD_UPDATE:
+
+ /*
+ * Project the output tuple, and use that to update the table.
+ * We don't need to filter out junk attributes, because the
+ * UPDATE action's targetlist doesn't have any.
+ */
+ newslot = ExecProject(relaction->mas_proj);
+
+ context->relaction = relaction;
+ context->GetUpdateNewTuple = mergeGetUpdateNewTuple;
+ context->cpUpdateRetrySlot = NULL;
+
+ if (!ExecUpdatePrologue(context, resultRelInfo,
+ tupleid, NULL, newslot))
+ {
+ result = TM_Ok;
+ break;
+ }
+ ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate);
+ result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
+ newslot, mtstate->canSetTag, &updateCxt);
+ if (result == TM_Ok && updateCxt.updated)
+ {
+ ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
+ tupleid, NULL, newslot, recheckIndexes);
+ mtstate->mt_merge_updated += 1;
+ }
+
+ break;
+
+ case CMD_DELETE:
+ context->relaction = relaction;
+ if (!ExecDeletePrologue(context, resultRelInfo, tupleid,
+ NULL, NULL))
+ {
+ result = TM_Ok;
+ break;
+ }
+ result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
+ if (result == TM_Ok)
+ {
+ ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
+ false);
+ mtstate->mt_merge_deleted += 1;
+ }
+ break;
+
+ case CMD_NOTHING:
+ /* Doing nothing is always OK */
+ result = TM_Ok;
+ break;
+
+ default:
+ elog(ERROR, "unknown action in MERGE WHEN MATCHED clause");
+ }
+
+ switch (result)
+ {
+ case TM_Ok:
+ /* all good; perform final actions */
+ if (canSetTag)
+ (estate->es_processed)++;
+
+ break;
+
+ case TM_SelfModified:
+
+ /*
+ * The SQL standard disallows this for MERGE.
+ */
+ if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
+ ereport(ERROR,
+ (errcode(ERRCODE_CARDINALITY_VIOLATION),
+ /* translator: %s is a SQL command name */
+ errmsg("%s command cannot affect row a second time",
+ "MERGE"),
+ errhint("Ensure that not more than one source row matches any one target row.")));
+ /* This shouldn't happen */
+ elog(ERROR, "attempted to update or delete invisible tuple");
+ break;
+
+ case TM_Deleted:
+ if (IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to concurrent delete")));
+
+ /*
+ * If the tuple was already deleted, return to let caller
+ * handle it under NOT MATCHED clauses.
+ */
+ return false;
+
+ case TM_Updated:
+ {
+ Relation resultRelationDesc;
+ TupleTableSlot *epqslot,
+ *inputslot;
+ LockTupleMode lockmode;
+
+ /*
+ * The target tuple was concurrently updated by some other
+ * transaction.
+ */
+
+ /*
+ * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate()
+ * must have detected that the tuple was concurrently
+ * updated, so we restart the search for an appropriate
+ * WHEN MATCHED clause to process the updated tuple.
+ *
+ * In this case, ExecDelete() would already have performed
+ * EvalPlanQual() on the latest version of the tuple,
+ * which in turn would already have been loaded into
+ * ri_oldTupleSlot, so no need to do either of those
+ * things.
+ *
+ * XXX why do we not check the WHEN NOT MATCHED list in
+ * this case?
+ */
+ if (!TupIsNull(context->cpUpdateRetrySlot))
+ goto lmerge_matched;
+
+ /*
+ * Otherwise, we run the EvalPlanQual() with the new
+ * version of the tuple. If EvalPlanQual() does not return
+ * a tuple, then we switch to the NOT MATCHED list of
+ * actions. If it does return a tuple and the join qual is
+ * still satisfied, then we just need to recheck the
+ * MATCHED actions, starting from the top, and execute the
+ * first qualifying action.
+ */
+ resultRelationDesc = resultRelInfo->ri_RelationDesc;
+ lockmode = ExecUpdateLockMode(estate, resultRelInfo);
+
+ inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,
+ resultRelInfo->ri_RangeTableIndex);
+
+ result = table_tuple_lock(resultRelationDesc, tupleid,
+ estate->es_snapshot,
+ inputslot, estate->es_output_cid,
+ lockmode, LockWaitBlock,
+ TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+ &context->tmfd);
+ switch (result)
+ {
+ case TM_Ok:
+ epqslot = EvalPlanQual(epqstate,
+ resultRelationDesc,
+ resultRelInfo->ri_RangeTableIndex,
+ inputslot);
+
+ /*
+ * If we got no tuple, or the tuple we get has a
+ * NULL ctid, go back to caller: this one is not a
+ * MATCHED tuple anymore, so they can retry with
+ * NOT MATCHED actions.
+ */
+ if (TupIsNull(epqslot))
+ return false;
+
+ (void) ExecGetJunkAttribute(epqslot,
+ resultRelInfo->ri_RowIdAttNo,
+ &isNull);
+ if (isNull)
+ return false;
+
+ /*
+ * When a tuple was updated and migrated to
+ * another partition concurrently, the current
+ * MERGE implementation can't follow. There's
+ * probably a better way to handle this case, but
+ * it'd require recognizing the relation to which
+ * the tuple moved, and setting our current
+ * resultRelInfo to that.
+ */
+ if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be deleted was already moved to another partition due to concurrent update")));
+
+ /*
+ * A non-NULL ctid means that we are still dealing
+ * with MATCHED case. Restart the loop so that we
+ * apply all the MATCHED rules again, to ensure
+ * that the first qualifying WHEN MATCHED action
+ * is executed.
+ *
+ * Update tupleid to that of the new tuple, for
+ * the refetch we do at the top.
+ */
+ ItemPointerCopy(&context->tmfd.ctid, tupleid);
+ goto lmerge_matched;
+
+ case TM_Deleted:
+
+ /*
+ * tuple already deleted; tell caller to run NOT
+ * MATCHED actions
+ */
+ return false;
+
+ case TM_SelfModified:
+
+ /*
+ * This can be reached when following an update
+ * chain from a tuple updated by another session,
+ * reaching a tuple that was already updated in
+ * this transaction. If previously modified by
+ * this command, ignore the redundant update,
+ * otherwise error out.
+ *
+ * See also response to TM_SelfModified in
+ * ExecUpdate().
+ */
+ if (context->tmfd.cmax != estate->es_output_cid)
+ ereport(ERROR,
+ (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
+ errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"),
+ errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
+ return false;
+
+ default:
+ /* see table_tuple_lock call in ExecDelete() */
+ elog(ERROR, "unexpected table_tuple_lock status: %u",
+ result);
+ return false;
+ }
+ }
+
+ case TM_Invisible:
+ case TM_WouldBlock:
+ case TM_BeingModified:
+ /* these should not occur */
+ elog(ERROR, "unexpected tuple operation result: %d", result);
+ break;
+ }
+
+ /*
+ * We've activated one of the WHEN clauses, so we don't search
+ * further. This is required behaviour, not an optimization.
+ */
+ break;
+ }
+
+ /*
+ * Successfully executed an action or no qualifying action was found.
+ */
+ return true;
+}
+
+/*
+ * Execute the first qualifying NOT MATCHED action.
+ */
+static void
+ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ bool canSetTag)
+{
+ ModifyTableState *mtstate = context->mtstate;
+ ExprContext *econtext = mtstate->ps.ps_ExprContext;
+ List *actionStates = NIL;
+ ListCell *l;
+
+ /*
+ * For INSERT actions, the root relation's merge action is OK since the
+ * INSERT's targetlist and the WHEN conditions can only refer to the
+ * source relation and hence it does not matter which result relation we
+ * work with.
+ *
+ * XXX does this mean that we can avoid creating copies of actionStates on
+ * partitioned tables, for not-matched actions?
+ */
+ actionStates = resultRelInfo->ri_notMatchedMergeAction;
+
+ /*
+ * Make source tuple available to ExecQual and ExecProject. We don't need
+ * the target tuple, since the WHEN quals and targetlist can't refer to
+ * the target columns.
+ */
+ econtext->ecxt_scantuple = NULL;
+ econtext->ecxt_innertuple = context->planSlot;
+ econtext->ecxt_outertuple = NULL;
+
+ foreach(l, actionStates)
+ {
+ MergeActionState *action = (MergeActionState *) lfirst(l);
+ CmdType commandType = action->mas_action->commandType;
+ TupleTableSlot *newslot;
+
+ /*
+ * Test condition, if any.
+ *
+ * In the absence of any condition, we perform the action
+ * unconditionally (no need to check separately since ExecQual() will
+ * return true if there are no conditions to evaluate).
+ */
+ if (!ExecQual(action->mas_whenqual, econtext))
+ continue;
+
+ /* Perform stated action */
+ switch (commandType)
+ {
+ case CMD_INSERT:
+
+ /*
+ * Project the tuple. In case of a partitioned table, the
+ * projection was already built to use the root's descriptor,
+ * so we don't need to map the tuple here.
+ */
+ newslot = ExecProject(action->mas_proj);
+ context->relaction = action;
+
+ (void) ExecInsert(context, mtstate->rootResultRelInfo, newslot,
+ canSetTag, NULL, NULL);
+ mtstate->mt_merge_inserted += 1;
+ break;
+ case CMD_NOTHING:
+ /* Do nothing */
+ break;
+ default:
+ elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause");
+ }
+
+ /*
+ * We've activated one of the WHEN clauses, so we don't search
+ * further. This is required behaviour, not an optimization.
+ */
+ break;
+ }
+}
+
+/*
+ * Initialize state for execution of MERGE.
+ */
+void
+ExecInitMerge(ModifyTableState *mtstate, EState *estate)
+{
+ ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
+ ResultRelInfo *rootRelInfo = mtstate->rootResultRelInfo;
+ ResultRelInfo *resultRelInfo;
+ ExprContext *econtext;
+ ListCell *lc;
+ int i;
+
+ if (node->mergeActionLists == NIL)
+ return;
+
+ mtstate->mt_merge_subcommands = 0;
+
+ if (mtstate->ps.ps_ExprContext == NULL)
+ ExecAssignExprContext(estate, &mtstate->ps);
+ econtext = mtstate->ps.ps_ExprContext;
+
+ /*
+ * Create a MergeActionState for each action on the mergeActionList and
+ * add it to either a list of matched actions or not-matched actions.
+ *
+ * Similar logic appears in ExecInitPartitionInfo(), so if changing
+ * anything here, do so there too.
+ */
+ i = 0;
+ foreach(lc, node->mergeActionLists)
+ {
+ List *mergeActionList = lfirst(lc);
+ TupleDesc relationDesc;
+ ListCell *l;
+
+ resultRelInfo = mtstate->resultRelInfo + i;
+ i++;
+ relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+
+ /* initialize slots for MERGE fetches from this rel */
+ if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+ ExecInitMergeTupleSlots(mtstate, resultRelInfo);
+
+ foreach(l, mergeActionList)
+ {
+ MergeAction *action = (MergeAction *) lfirst(l);
+ MergeActionState *action_state;
+ TupleTableSlot *tgtslot;
+ TupleDesc tgtdesc;
+ List **list;
+
+ /*
+ * Build action merge state for this rel. (For partitions,
+ * equivalent code exists in ExecInitPartitionInfo.)
+ */
+ action_state = makeNode(MergeActionState);
+ action_state->mas_action = action;
+ action_state->mas_whenqual = ExecInitQual((List *) action->qual,
+ &mtstate->ps);
+
+ /*
+ * We create two lists - one for WHEN MATCHED actions and one for
+ * WHEN NOT MATCHED actions - and stick the MergeActionState into
+ * the appropriate list.
+ */
+ if (action_state->mas_action->matched)
+ list = &resultRelInfo->ri_matchedMergeAction;
+ else
+ list = &resultRelInfo->ri_notMatchedMergeAction;
+ *list = lappend(*list, action_state);
+
+ switch (action->commandType)
+ {
+ case CMD_INSERT:
+ ExecCheckPlanOutput(rootRelInfo->ri_RelationDesc,
+ action->targetList);
+
+ /*
+ * If the MERGE targets a partitioned table, any INSERT
+ * actions must be routed through it, not the child
+ * relations. Initialize the routing struct and the root
+ * table's "new" tuple slot for that, if not already done.
+ * The projection we prepare, for all relations, uses the
+ * root relation descriptor, and targets the plan's root
+ * slot. (This is consistent with the fact that we
+ * checked the plan output to match the root relation,
+ * above.)
+ */
+ if (rootRelInfo->ri_RelationDesc->rd_rel->relkind ==
+ RELKIND_PARTITIONED_TABLE)
+ {
+ if (mtstate->mt_partition_tuple_routing == NULL)
+ {
+ /*
+ * Initialize planstate for routing if not already
+ * done.
+ *
+ * Note that the slot is managed as a standalone
+ * slot belonging to ModifyTableState, so we pass
+ * NULL for the 2nd argument.
+ */
+ mtstate->mt_root_tuple_slot =
+ table_slot_create(rootRelInfo->ri_RelationDesc,
+ NULL);
+ mtstate->mt_partition_tuple_routing =
+ ExecSetupPartitionTupleRouting(estate,
+ rootRelInfo->ri_RelationDesc);
+ }
+ tgtslot = mtstate->mt_root_tuple_slot;
+ tgtdesc = RelationGetDescr(rootRelInfo->ri_RelationDesc);
+ }
+ else
+ {
+ /* not partitioned? use the stock relation and slot */
+ tgtslot = resultRelInfo->ri_newTupleSlot;
+ tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+ }
+
+ action_state->mas_proj =
+ ExecBuildProjectionInfo(action->targetList, econtext,
+ tgtslot,
+ &mtstate->ps,
+ tgtdesc);
+
+ mtstate->mt_merge_subcommands |= MERGE_INSERT;
+ break;
+ case CMD_UPDATE:
+ action_state->mas_proj =
+ ExecBuildUpdateProjection(action->targetList,
+ true,
+ action->updateColnos,
+ relationDesc,
+ econtext,
+ resultRelInfo->ri_newTupleSlot,
+ &mtstate->ps);
+ mtstate->mt_merge_subcommands |= MERGE_UPDATE;
+ break;
+ case CMD_DELETE:
+ mtstate->mt_merge_subcommands |= MERGE_DELETE;
+ break;
+ case CMD_NOTHING:
+ break;
+ default:
+ elog(ERROR, "unknown operation");
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * Initializes the tuple slots in a ResultRelInfo for any MERGE action.
+ *
+ * We mark 'projectNewInfoValid' even though the projections themselves
+ * are not initialized here.
+ */
+void
+ExecInitMergeTupleSlots(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo)
+{
+ EState *estate = mtstate->ps.state;
+
+ Assert(!resultRelInfo->ri_projectNewInfoValid);
+
+ resultRelInfo->ri_oldTupleSlot =
+ table_slot_create(resultRelInfo->ri_RelationDesc,
+ &estate->es_tupleTable);
+ resultRelInfo->ri_newTupleSlot =
+ table_slot_create(resultRelInfo->ri_RelationDesc,
+ &estate->es_tupleTable);
+ resultRelInfo->ri_projectNewInfoValid = true;
+}
+
+/*
+ * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It
+ * computes the updated tuple by projecting from the current merge action's
+ * projection.
+ */
+static TupleTableSlot *
+mergeGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction)
+{
+ ExprContext *econtext = relaction->mas_proj->pi_exprContext;
+
+ econtext->ecxt_scantuple = oldSlot;
+ econtext->ecxt_innertuple = planSlot;
+
+ return ExecProject(relaction->mas_proj);
+}
/*
* Process BEFORE EACH STATEMENT triggers
@@ -2514,6 +3291,14 @@ fireBSTriggers(ModifyTableState *node)
case CMD_DELETE:
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecBSInsertTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -2547,6 +3332,17 @@ fireASTriggers(ModifyTableState *node)
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
node->mt_transition_capture);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecASDeleteTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecASUpdateTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecASInsertTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -2749,7 +3545,28 @@ ExecModifyTable(PlanState *pstate)
datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno,
&isNull);
if (isNull)
+ {
+ /*
+ * For commands other than MERGE, any tuples having InvalidOid
+ * for tableoid are errors. For MERGE, we may need to handle
+ * them as WHEN NOT MATCHED clauses if any, so do that.
+ *
+ * Note that we use the node's toplevel resultRelInfo, not any
+ * specific partition's.
+ */
+ if (operation == CMD_MERGE)
+ {
+ EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
+
+ context.planSlot = planSlot;
+ context.lockmode = 0;
+
+ ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag);
+ continue; /* no RETURNING support yet */
+ }
+
elog(ERROR, "tableoid is NULL");
+ }
resultoid = DatumGetObjectId(datum);
/* If it's not the same as last time, we need to locate the rel */
@@ -2784,13 +3601,14 @@ ExecModifyTable(PlanState *pstate)
oldtuple = NULL;
/*
- * For UPDATE/DELETE, fetch the row identity info for the tuple to be
- * updated/deleted. For a heap relation, that's a TID; otherwise we
- * may have a wholerow junk attr that carries the old tuple in toto.
- * Keep this in step with the part of ExecInitModifyTable that sets up
- * ri_RowIdAttNo.
+ * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple
+ * to be updated/deleted/merged. For a heap relation, that's a TID;
+ * otherwise we may have a wholerow junk attr that carries the old
+ * tuple in toto. Keep this in step with the part of
+ * ExecInitModifyTable that sets up ri_RowIdAttNo.
*/
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ if (operation == CMD_UPDATE || operation == CMD_DELETE ||
+ operation == CMD_MERGE)
{
char relkind;
Datum datum;
@@ -2806,9 +3624,30 @@ ExecModifyTable(PlanState *pstate)
datum = ExecGetJunkAttribute(slot,
resultRelInfo->ri_RowIdAttNo,
&isNull);
- /* shouldn't ever get a null result... */
+
+ /*
+ * For commands other than MERGE, any tuples having a null row
+ * identifier are errors. For MERGE, we may need to handle
+ * them as WHEN NOT MATCHED clauses if any, so do that.
+ *
+ * Note that we use the node's toplevel resultRelInfo, not any
+ * specific partition's.
+ */
if (isNull)
+ {
+ if (operation == CMD_MERGE)
+ {
+ EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
+
+ context.planSlot = planSlot;
+ context.lockmode = 0;
+
+ ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag);
+ continue; /* no RETURNING support yet */
+ }
+
elog(ERROR, "ctid is NULL");
+ }
tupleid = (ItemPointer) DatumGetPointer(datum);
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
@@ -2898,8 +3737,10 @@ ExecModifyTable(PlanState *pstate)
oldSlot))
elog(ERROR, "failed to fetch tuple being updated");
}
- slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot,
- oldSlot);
+ slot = internalGetUpdateNewTuple(resultRelInfo, planSlot,
+ oldSlot, NULL);
+ context.GetUpdateNewTuple = internalGetUpdateNewTuple;
+ context.relaction = NULL;
/* Now apply the update. */
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
@@ -2911,6 +3752,10 @@ ExecModifyTable(PlanState *pstate)
true, false, node->canSetTag, NULL, NULL);
break;
+ case CMD_MERGE:
+ slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag);
+ break;
+
default:
elog(ERROR, "unknown operation");
break;
@@ -3044,6 +3889,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->resultRelInfo = (ResultRelInfo *)
palloc(nrels * sizeof(ResultRelInfo));
+ mtstate->mt_merge_inserted = 0;
+ mtstate->mt_merge_updated = 0;
+ mtstate->mt_merge_deleted = 0;
+
/*----------
* Resolve the target relation. This is the same as:
*
@@ -3147,12 +3996,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
- * For UPDATE/DELETE, find the appropriate junk attr now, either a
- * 'ctid' or 'wholerow' attribute depending on relkind. For foreign
+ * For UPDATE/DELETE/MERGE, find the appropriate junk attr now, either
+ * a 'ctid' or 'wholerow' attribute depending on relkind. For foreign
* tables, the FDW might have created additional junk attr(s), but
* those are no concern of ours.
*/
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ if (operation == CMD_UPDATE || operation == CMD_DELETE ||
+ operation == CMD_MERGE)
{
char relkind;
@@ -3169,19 +4019,28 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
else if (relkind == RELKIND_FOREIGN_TABLE)
{
/*
+ * We don't support MERGE with foreign tables for now. (It's
+ * problematic because the implementation uses CTID.)
+ */
+ Assert(operation != CMD_MERGE);
+
+ /*
* When there is a row-level trigger, there should be a
* wholerow attribute. We also require it to be present in
- * UPDATE, so we can get the values of unchanged columns.
+ * UPDATE and MERGE, so we can get the values of unchanged
+ * columns.
*/
resultRelInfo->ri_RowIdAttNo =
ExecFindJunkAttributeInTlist(subplan->targetlist,
"wholerow");
- if (mtstate->operation == CMD_UPDATE &&
+ if ((mtstate->operation == CMD_UPDATE || mtstate->operation == CMD_MERGE) &&
!AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo))
elog(ERROR, "could not find junk wholerow column");
}
else
{
+ /* No support for MERGE */
+ Assert(operation != CMD_MERGE);
/* Other valid target relkinds must provide wholerow */
resultRelInfo->ri_RowIdAttNo =
ExecFindJunkAttributeInTlist(subplan->targetlist,
@@ -3193,10 +4052,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
- * If this is an inherited update/delete, there will be a junk attribute
- * named "tableoid" present in the subplan's targetlist. It will be used
- * to identify the result relation for a given tuple to be
- * updated/deleted.
+ * If this is an inherited update/delete/merge, there will be a junk
+ * attribute named "tableoid" present in the subplan's targetlist. It
+ * will be used to identify the result relation for a given tuple to be
+ * updated/deleted/merged.
*/
mtstate->mt_resultOidAttno =
ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid");
@@ -3209,8 +4068,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
/*
* Build state for tuple routing if it's a partitioned INSERT. An UPDATE
- * might need this too, but only if it actually moves tuples between
- * partitions; in that case setup is done by ExecCrossPartitionUpdate.
+ * or MERGE might need this too, but only if it actually moves tuples
+ * between partitions; in that case setup is done by
+ * ExecCrossPartitionUpdate.
*/
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
operation == CMD_INSERT)
@@ -3379,6 +4239,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
arowmarks = lappend(arowmarks, aerm);
}
+ /* For a MERGE command, initialize its state */
+ if (mtstate->operation == CMD_MERGE)
+ ExecInitMerge(mtstate, estate);
+
EvalPlanQualSetPlan(&mtstate->mt_epqstate, subplan, arowmarks);
/*