aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeModifyTable.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2018-04-02 21:12:47 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2018-04-02 21:12:47 +0100
commit354f13855e6381d288dfaa52bcd4f2cb0fd4a5eb (patch)
tree92710660450acee59be62dea485cc26ab147f332 /src/backend/executor/nodeModifyTable.c
parente6597dc3533946b98acba7871bd4ca1f7a3d4c1d (diff)
downloadpostgresql-354f13855e6381d288dfaa52bcd4f2cb0fd4a5eb.tar.gz
postgresql-354f13855e6381d288dfaa52bcd4f2cb0fd4a5eb.zip
Modified files for MERGE
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r--src/backend/executor/nodeModifyTable.c384
1 files changed, 339 insertions, 45 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 1b09868ff8e..b03db64e8e1 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -42,6 +42,7 @@
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
+#include "executor/nodeMerge.h"
#include "executor/nodeModifyTable.h"
#include "foreign/fdwapi.h"
#include "miscadmin.h"
@@ -62,17 +63,17 @@ static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
EState *estate,
bool canSetTag,
TupleTableSlot **returning);
-static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
- EState *estate,
- PartitionTupleRouting *proute,
- ResultRelInfo *targetRelInfo,
- TupleTableSlot *slot);
static ResultRelInfo *getTargetResultRelInfo(ModifyTableState *node);
static void ExecSetupChildParentMapForTcs(ModifyTableState *mtstate);
static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
int whichplan);
+/* flags for mt_merge_subcommands */
+#define MERGE_INSERT 0x01
+#define MERGE_UPDATE 0x02
+#define MERGE_DELETE 0x04
+
/*
* Verify that the tuples to be produced by INSERT or UPDATE match the
* target relation's rowtype
@@ -259,11 +260,12 @@ ExecCheckTIDVisible(EState *estate,
* Returns RETURNING result if any, otherwise NULL.
* ----------------------------------------------------------------
*/
-static TupleTableSlot *
+extern TupleTableSlot *
ExecInsert(ModifyTableState *mtstate,
TupleTableSlot *slot,
TupleTableSlot *planSlot,
EState *estate,
+ MergeActionState *actionState,
bool canSetTag)
{
HeapTuple tuple;
@@ -390,9 +392,17 @@ ExecInsert(ModifyTableState *mtstate,
* partition, we should instead check UPDATE policies, because we are
* executing policies defined on the target table, and not those
* defined on the child partitions.
+ *
+ * If we're running MERGE, we refer to the action that we're executing
+ * to know if we're doing an INSERT or UPDATE to a partition table.
*/
- wco_kind = (mtstate->operation == CMD_UPDATE) ?
- WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ if (mtstate->operation == CMD_UPDATE)
+ wco_kind = WCO_RLS_UPDATE_CHECK;
+ else if (mtstate->operation == CMD_MERGE)
+ wco_kind = (actionState->commandType == CMD_UPDATE) ?
+ WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ else
+ wco_kind = WCO_RLS_INSERT_CHECK;
/*
* ExecWithCheckOptions() will skip any WCOs which are not of the kind
@@ -617,10 +627,19 @@ ExecInsert(ModifyTableState *mtstate,
* passed to foreign table triggers; it is NULL when the foreign
* table has no relevant triggers.
*
+ * MERGE passes actionState of the action it's currently executing;
+ * regular DELETE passes NULL. This is used by ExecDelete to know if it's
+ * being called from MERGE or regular DELETE operation.
+ *
+ * If the DELETE fails because the tuple is concurrently updated/deleted
+ * by this or some other transaction, hufdp is filled with the reason as
+ * well as other important information. Currently only MERGE needs this
+ * information.
+ *
* Returns RETURNING result if any, otherwise NULL.
* ----------------------------------------------------------------
*/
-static TupleTableSlot *
+TupleTableSlot *
ExecDelete(ModifyTableState *mtstate,
ItemPointer tupleid,
HeapTuple oldtuple,
@@ -629,6 +648,8 @@ ExecDelete(ModifyTableState *mtstate,
EState *estate,
bool *tupleDeleted,
bool processReturning,
+ HeapUpdateFailureData *hufdp,
+ MergeActionState *actionState,
bool canSetTag)
{
ResultRelInfo *resultRelInfo;
@@ -642,6 +663,14 @@ ExecDelete(ModifyTableState *mtstate,
*tupleDeleted = false;
/*
+ * Initialize hufdp. Since the caller is only interested in the failure
+ * status, initialize with the state that is used to indicate successful
+ * operation.
+ */
+ if (hufdp)
+ hufdp->result = HeapTupleMayBeUpdated;
+
+ /*
* get information on the (current) result relation
*/
resultRelInfo = estate->es_result_relation_info;
@@ -654,7 +683,7 @@ ExecDelete(ModifyTableState *mtstate,
bool dodelete;
dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
- tupleid, oldtuple);
+ tupleid, oldtuple, hufdp);
if (!dodelete) /* "do nothing" */
return NULL;
@@ -721,6 +750,15 @@ ldelete:;
estate->es_crosscheck_snapshot,
true /* wait for commit */ ,
&hufd);
+
+ /*
+ * Copy the necessary information, if the caller has asked for it. We
+ * must do this irrespective of whether the tuple was updated or
+ * deleted.
+ */
+ if (hufdp)
+ *hufdp = hufd;
+
switch (result)
{
case HeapTupleSelfUpdated:
@@ -755,7 +793,11 @@ ldelete:;
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
- /* Else, already deleted by self; nothing to do */
+ /*
+ * Else, already deleted by self; nothing to do but inform
+ * MERGE about it anyways so that it can take necessary
+ * action.
+ */
return NULL;
case HeapTupleMayBeUpdated:
@@ -766,14 +808,24 @@ ldelete:;
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
+
if (!ItemPointerEquals(tupleid, &hufd.ctid))
{
TupleTableSlot *epqslot;
+ /*
+ * If we're executing MERGE, then the onus of running
+ * EvalPlanQual() and handling its outcome lies with the
+ * caller.
+ */
+ if (actionState != NULL)
+ return NULL;
+
+ /* Normal DELETE path. */
epqslot = EvalPlanQual(estate,
epqstate,
resultRelationDesc,
- resultRelInfo->ri_RangeTableIndex,
+ GetEPQRangeTableIndex(resultRelInfo),
LockTupleExclusive,
&hufd.ctid,
hufd.xmax);
@@ -783,7 +835,12 @@ ldelete:;
goto ldelete;
}
}
- /* tuple already deleted; nothing to do */
+
+ /*
+ * tuple already deleted; nothing to do. But MERGE might want
+ * to handle it differently. We've already filled-in hufdp
+ * with sufficient information for MERGE to look at.
+ */
return NULL;
default:
@@ -911,10 +968,21 @@ ldelete:;
* foreign table triggers; it is NULL when the foreign table has
* no relevant triggers.
*
+ * MERGE passes actionState of the action it's currently executing;
+ * regular UPDATE passes NULL. This is used by ExecUpdate to know if it's
+ * being called from MERGE or regular UPDATE operation. ExecUpdate may
+ * pass this information to ExecInsert if it ends up running DELETE+INSERT
+ * for partition key updates.
+ *
+ * If the UPDATE fails because the tuple is concurrently updated/deleted
+ * by this or some other transaction, hufdp is filled with the reason as
+ * well as other important information. Currently only MERGE needs this
+ * information.
+ *
* Returns RETURNING result if any, otherwise NULL.
* ----------------------------------------------------------------
*/
-static TupleTableSlot *
+extern TupleTableSlot *
ExecUpdate(ModifyTableState *mtstate,
ItemPointer tupleid,
HeapTuple oldtuple,
@@ -922,6 +990,9 @@ ExecUpdate(ModifyTableState *mtstate,
TupleTableSlot *planSlot,
EPQState *epqstate,
EState *estate,
+ bool *tuple_updated,
+ HeapUpdateFailureData *hufdp,
+ MergeActionState *actionState,
bool canSetTag)
{
HeapTuple tuple;
@@ -938,6 +1009,17 @@ ExecUpdate(ModifyTableState *mtstate,
if (IsBootstrapProcessingMode())
elog(ERROR, "cannot UPDATE during bootstrap");
+ if (tuple_updated)
+ *tuple_updated = false;
+
+ /*
+ * Initialize hufdp. Since the caller is only interested in the failure
+ * status, initialize with the state that is used to indicate successful
+ * operation.
+ */
+ if (hufdp)
+ hufdp->result = HeapTupleMayBeUpdated;
+
/*
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
@@ -955,7 +1037,7 @@ ExecUpdate(ModifyTableState *mtstate,
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
- tupleid, oldtuple, slot);
+ tupleid, oldtuple, slot, hufdp);
if (slot == NULL) /* "do nothing" */
return NULL;
@@ -1001,7 +1083,6 @@ ExecUpdate(ModifyTableState *mtstate,
}
else
{
- LockTupleMode lockmode;
bool partition_constraint_failed;
/*
@@ -1079,8 +1160,9 @@ lreplace:;
* Row movement, part 1. Delete the tuple, but skip RETURNING
* processing. We want to return rows from INSERT.
*/
- ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
- &tuple_deleted, false, false);
+ ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
+ estate, &tuple_deleted, false, hufdp, NULL,
+ false);
/*
* For some reason if DELETE didn't happen (e.g. trigger prevented
@@ -1116,16 +1198,36 @@ lreplace:;
saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
/*
- * resultRelInfo is one of the per-subplan resultRelInfos. So we
- * should convert the tuple into root's tuple descriptor, since
- * ExecInsert() starts the search from root. The tuple conversion
- * map list is in the order of mtstate->resultRelInfo[], so to
- * retrieve the one for this resultRel, we need to know the
- * position of the resultRel in mtstate->resultRelInfo[].
+ * We should convert the tuple into root's tuple descriptor, since
+ * ExecInsert() starts the search from root. To do that, we need to
+ * retrieve the tuple conversion map for this resultRelInfo.
+ *
+ * If we're running MERGE then resultRelInfo is per-partition
+ * resultRelInfo as initialized in ExecInitPartitionInfo(). Note
+ * that we don't expand inheritance for the resultRelation in case
+ * of MERGE and hence there is just one subplan. Whereas for
+ * regular UPDATE, resultRelInfo is one of the per-subplan
+ * resultRelInfos. In either case the position of this partition in
+ * tracked in ri_PartitionLeafIndex;
+ *
+ * Retrieve the map either by looking at the resultRelInfo's
+ * position in mtstate->resultRelInfo[] (for UPDATE) or by simply
+ * using the ri_PartitionLeafIndex value (for MERGE).
*/
- map_index = resultRelInfo - mtstate->resultRelInfo;
- Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
- tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
+ if (mtstate->operation == CMD_MERGE)
+ {
+ map_index = resultRelInfo->ri_PartitionLeafIndex;
+ Assert(mtstate->rootResultRelInfo == NULL);
+ tupconv_map = TupConvMapForLeaf(proute,
+ mtstate->resultRelInfo,
+ map_index);
+ }
+ else
+ {
+ map_index = resultRelInfo - mtstate->resultRelInfo;
+ Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
+ tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
+ }
tuple = ConvertPartitionTupleSlot(tupconv_map,
tuple,
proute->root_tuple_slot,
@@ -1135,12 +1237,16 @@ lreplace:;
* Prepare for tuple routing, making it look like we're inserting
* into the root.
*/
- Assert(mtstate->rootResultRelInfo != NULL);
slot = ExecPrepareTupleRouting(mtstate, estate, proute,
- mtstate->rootResultRelInfo, slot);
+ getTargetResultRelInfo(mtstate),
+ slot);
ret_slot = ExecInsert(mtstate, slot, planSlot,
- estate, canSetTag);
+ estate, actionState, canSetTag);
+
+ /* Update is successful. */
+ if (tuple_updated)
+ *tuple_updated = true;
/* Revert ExecPrepareTupleRouting's node change. */
estate->es_result_relation_info = resultRelInfo;
@@ -1178,7 +1284,16 @@ lreplace:;
estate->es_output_cid,
estate->es_crosscheck_snapshot,
true /* wait for commit */ ,
- &hufd, &lockmode);
+ &hufd);
+
+ /*
+ * Copy the necessary information, if the caller has asked for it. We
+ * must do this irrespective of whether the tuple was updated or
+ * deleted.
+ */
+ if (hufdp)
+ *hufdp = hufd;
+
switch (result)
{
case HeapTupleSelfUpdated:
@@ -1223,26 +1338,42 @@ lreplace:;
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
+
if (!ItemPointerEquals(tupleid, &hufd.ctid))
{
TupleTableSlot *epqslot;
+ /*
+ * If we're executing MERGE, then the onus of running
+ * EvalPlanQual() and handling its outcome lies with the
+ * caller.
+ */
+ if (actionState != NULL)
+ return NULL;
+
+ /* Regular UPDATE path. */
epqslot = EvalPlanQual(estate,
epqstate,
resultRelationDesc,
- resultRelInfo->ri_RangeTableIndex,
- lockmode,
+ GetEPQRangeTableIndex(resultRelInfo),
+ hufd.lockmode,
&hufd.ctid,
hufd.xmax);
if (!TupIsNull(epqslot))
{
*tupleid = hufd.ctid;
+ /* Normal UPDATE path */
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
tuple = ExecMaterializeSlot(slot);
goto lreplace;
}
}
- /* tuple already deleted; nothing to do */
+
+ /*
+ * tuple already deleted; nothing to do. But MERGE might want
+ * to handle it differently. We've already filled-in hufdp
+ * with sufficient information for MERGE to look at.
+ */
return NULL;
default:
@@ -1271,6 +1402,9 @@ lreplace:;
estate, false, NULL, NIL);
}
+ if (tuple_updated)
+ *tuple_updated = true;
+
if (canSetTag)
(estate->es_processed)++;
@@ -1365,9 +1499,9 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
* there's no historical behavior to break.
*
* It is the user's responsibility to prevent this situation from
- * occurring. These problems are why SQL-2003 similarly specifies
- * that for SQL MERGE, an exception must be raised in the event of
- * an attempt to update the same row twice.
+ * occurring. These problems are why SQL Standard similarly
+ * specifies that for SQL MERGE, an exception must be raised in
+ * the event of an attempt to update the same row twice.
*/
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple.t_data)))
ereport(ERROR,
@@ -1489,7 +1623,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
*returning = ExecUpdate(mtstate, &tuple.t_self, NULL,
mtstate->mt_conflproj, planSlot,
&mtstate->mt_epqstate, mtstate->ps.state,
- canSetTag);
+ NULL, NULL, NULL, canSetTag);
ReleaseBuffer(buffer);
return true;
@@ -1527,6 +1661,14 @@ fireBSTriggers(ModifyTableState *node)
case CMD_DELETE:
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecBSInsertTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -1582,6 +1724,17 @@ fireASTriggers(ModifyTableState *node)
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
node->mt_transition_capture);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecASDeleteTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecASUpdateTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecASInsertTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -1644,7 +1797,7 @@ ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
*
* Returns a slot holding the tuple of the partition rowtype.
*/
-static TupleTableSlot *
+TupleTableSlot *
ExecPrepareTupleRouting(ModifyTableState *mtstate,
EState *estate,
PartitionTupleRouting *proute,
@@ -1967,6 +2120,7 @@ ExecModifyTable(PlanState *pstate)
{
/* advance to next subplan if any */
node->mt_whichplan++;
+
if (node->mt_whichplan < node->mt_nplans)
{
resultRelInfo++;
@@ -2015,6 +2169,12 @@ ExecModifyTable(PlanState *pstate)
EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
slot = planSlot;
+ if (operation == CMD_MERGE)
+ {
+ ExecMerge(node, estate, slot, junkfilter, resultRelInfo);
+ continue;
+ }
+
tupleid = NULL;
oldtuple = NULL;
if (junkfilter != NULL)
@@ -2096,19 +2256,20 @@ ExecModifyTable(PlanState *pstate)
slot = ExecPrepareTupleRouting(node, estate, proute,
resultRelInfo, slot);
slot = ExecInsert(node, slot, planSlot,
- estate, node->canSetTag);
+ estate, NULL, node->canSetTag);
/* Revert ExecPrepareTupleRouting's state change. */
if (proute)
estate->es_result_relation_info = resultRelInfo;
break;
case CMD_UPDATE:
slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot,
- &node->mt_epqstate, estate, node->canSetTag);
+ &node->mt_epqstate, estate,
+ NULL, NULL, NULL, node->canSetTag);
break;
case CMD_DELETE:
slot = ExecDelete(node, tupleid, oldtuple, planSlot,
&node->mt_epqstate, estate,
- NULL, true, node->canSetTag);
+ NULL, true, NULL, NULL, node->canSetTag);
break;
default:
elog(ERROR, "unknown operation");
@@ -2198,6 +2359,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
saved_resultRelInfo = estate->es_result_relation_info;
resultRelInfo = mtstate->resultRelInfo;
+
+ /*
+ * mergeTargetRelation must be set if we're running MERGE and mustn't be
+ * set if we're not.
+ */
+ Assert(operation != CMD_MERGE || node->mergeTargetRelation > 0);
+ Assert(operation == CMD_MERGE || node->mergeTargetRelation == 0);
+
+ resultRelInfo->ri_mergeTargetRTI = node->mergeTargetRelation;
+
i = 0;
foreach(l, node->plans)
{
@@ -2276,7 +2447,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
* partition key.
*/
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
- (operation == CMD_INSERT || update_tuple_routing_needed))
+ (operation == CMD_INSERT || operation == CMD_MERGE ||
+ update_tuple_routing_needed))
mtstate->mt_partition_tuple_routing =
ExecSetupPartitionTupleRouting(mtstate, rel);
@@ -2288,6 +2460,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
ExecSetupTransitionCaptureState(mtstate, estate);
/*
+ * If we are doing MERGE then setup child-parent mapping. This will be
+ * required in case we end up doing a partition-key update, triggering a
+ * tuple routing.
+ */
+ if (mtstate->operation == CMD_MERGE &&
+ mtstate->mt_partition_tuple_routing != NULL)
+ ExecSetupChildParentMapForLeaf(mtstate->mt_partition_tuple_routing);
+
+ /*
* Construct mapping from each of the per-subplan partition attnos to the
* root attno. This is required when during update row movement the tuple
* descriptor of a source partition does not match the root partitioned
@@ -2478,6 +2659,106 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
}
+ resultRelInfo = mtstate->resultRelInfo;
+
+ if (node->mergeActionList)
+ {
+ ListCell *l;
+ ExprContext *econtext;
+ List *mergeMatchedActionStates = NIL;
+ List *mergeNotMatchedActionStates = NIL;
+ TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
+
+ mtstate->mt_merge_subcommands = 0;
+
+ if (mtstate->ps.ps_ExprContext == NULL)
+ ExecAssignExprContext(estate, &mtstate->ps);
+
+ econtext = mtstate->ps.ps_ExprContext;
+
+ /* initialize slot for the existing tuple */
+ Assert(mtstate->mt_existing == NULL);
+ mtstate->mt_existing =
+ ExecInitExtraTupleSlot(mtstate->ps.state,
+ mtstate->mt_partition_tuple_routing ?
+ NULL : relationDesc);
+
+ /* initialize slot for merge actions */
+ Assert(mtstate->mt_mergeproj == NULL);
+ mtstate->mt_mergeproj =
+ ExecInitExtraTupleSlot(mtstate->ps.state,
+ mtstate->mt_partition_tuple_routing ?
+ NULL : relationDesc);
+
+ /*
+ * Create a MergeActionState for each action on the mergeActionList
+ * and add it to either a list of matched actions or not-matched
+ * actions.
+ */
+ foreach(l, node->mergeActionList)
+ {
+ MergeAction *action = (MergeAction *) lfirst(l);
+ MergeActionState *action_state = makeNode(MergeActionState);
+ TupleDesc tupDesc;
+
+ action_state->matched = action->matched;
+ action_state->commandType = action->commandType;
+ action_state->whenqual = ExecInitQual((List *) action->qual,
+ &mtstate->ps);
+
+ /* create target slot for this action's projection */
+ tupDesc = ExecTypeFromTL((List *) action->targetList,
+ resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
+ action_state->tupDesc = tupDesc;
+
+ /* build action projection state */
+ action_state->proj =
+ ExecBuildProjectionInfo(action->targetList, econtext,
+ mtstate->mt_mergeproj, &mtstate->ps,
+ resultRelInfo->ri_RelationDesc->rd_att);
+
+ /*
+ * We create two lists - one for WHEN MATCHED actions and one
+ * for WHEN NOT MATCHED actions - and stick the
+ * MergeActionState into the appropriate list.
+ */
+ if (action_state->matched)
+ mergeMatchedActionStates =
+ lappend(mergeMatchedActionStates, action_state);
+ else
+ mergeNotMatchedActionStates =
+ lappend(mergeNotMatchedActionStates, action_state);
+
+ switch (action->commandType)
+ {
+ case CMD_INSERT:
+ ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
+ action->targetList);
+ mtstate->mt_merge_subcommands |= MERGE_INSERT;
+ break;
+ case CMD_UPDATE:
+ ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
+ action->targetList);
+ mtstate->mt_merge_subcommands |= MERGE_UPDATE;
+ break;
+ case CMD_DELETE:
+ mtstate->mt_merge_subcommands |= MERGE_DELETE;
+ break;
+ case CMD_NOTHING:
+ break;
+ default:
+ elog(ERROR, "unknown operation");
+ break;
+ }
+
+ resultRelInfo->ri_mergeState->matchedActionStates =
+ mergeMatchedActionStates;
+ resultRelInfo->ri_mergeState->notMatchedActionStates =
+ mergeNotMatchedActionStates;
+
+ }
+ }
+
/* select first subplan */
mtstate->mt_whichplan = 0;
subplan = (Plan *) linitial(node->plans);
@@ -2491,7 +2772,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
* --- no need to look first. Typically, this will be a 'ctid' or
* 'wholerow' attribute, but in the case of a foreign data wrapper it
* might be a set of junk attributes sufficient to identify the remote
- * row.
+ * row. We follow this logic for MERGE, so it always has a junk attributes.
*
* If there are multiple result relations, each one needs its own junk
* filter. Note multiple rels are only possible for UPDATE/DELETE, so we
@@ -2519,6 +2800,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
break;
case CMD_UPDATE:
case CMD_DELETE:
+ case CMD_MERGE:
junk_filter_needed = true;
break;
default:
@@ -2534,6 +2816,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
JunkFilter *j;
subplan = mtstate->mt_plans[i]->plan;
+
if (operation == CMD_INSERT || operation == CMD_UPDATE)
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
subplan->targetlist);
@@ -2542,7 +2825,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
resultRelInfo->ri_RelationDesc->rd_att->tdhasoid,
ExecInitExtraTupleSlot(estate, NULL));
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ if (operation == CMD_UPDATE ||
+ operation == CMD_DELETE ||
+ operation == CMD_MERGE)
{
/* For UPDATE/DELETE, find the appropriate junk attr now */
char relkind;
@@ -2555,6 +2840,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid");
if (!AttributeNumberIsValid(j->jf_junkAttNo))
elog(ERROR, "could not find junk ctid column");
+
+ if (operation == CMD_MERGE &&
+ relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ j->jf_otherJunkAttNo = ExecFindJunkAttribute(j, "tableoid");
+ if (!AttributeNumberIsValid(j->jf_otherJunkAttNo))
+ elog(ERROR, "could not find junk tableoid column");
+
+ }
}
else if (relkind == RELKIND_FOREIGN_TABLE)
{