aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/trigger.c2
-rw-r--r--src/backend/replication/logical/worker.c113
2 files changed, 80 insertions, 35 deletions
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 9c5bc5f2c8b..e92616507f2 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -4140,6 +4140,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
{
rInfo = ExecGetTriggerResultRel(estate, evtshared->ats_relid);
rel = rInfo->ri_RelationDesc;
+ /* Catch calls with insufficient relcache refcounting */
+ Assert(!RelationHasReferenceCountZero(rel));
trigdesc = rInfo->ri_TrigDesc;
finfo = rInfo->ri_TrigFunctions;
instr = rInfo->ri_TrigInstrument;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index caaa59c7bc7..454a5cbfbab 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -97,6 +97,18 @@ typedef struct SlotErrCallbackArg
int remote_attnum;
} SlotErrCallbackArg;
+typedef struct ApplyExecutionData
+{
+ EState *estate; /* executor state, used to track resources */
+
+ LogicalRepRelMapEntry *targetRel; /* replication target rel */
+ ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
+
+ /* These fields are used when the target relation is partitioned: */
+ ModifyTableState *mtstate; /* dummy ModifyTable state */
+ PartitionTupleRouting *proute; /* partition routing info */
+} ApplyExecutionData;
+
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
@@ -127,11 +139,9 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
-static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
- EState *estate,
+static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry,
CmdType operation);
/*
@@ -188,25 +198,29 @@ ensure_transaction(void)
/*
* Executor state preparation for evaluation of constraint expressions,
- * indexes and triggers.
+ * indexes and triggers for the specified relation.
*
- * This is based on similar code in copy.c
+ * Note that the caller must open and close any indexes to be updated.
*/
-static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel)
+static ApplyExecutionData *
+create_edata_for_relation(LogicalRepRelMapEntry *rel)
{
+ ApplyExecutionData *edata;
EState *estate;
ResultRelInfo *resultRelInfo;
RangeTblEntry *rte;
/*
* Input functions may need an active snapshot, as may AFTER triggers
- * invoked during finish_estate. For safety, ensure an active snapshot
+ * invoked during finish_edata. For safety, ensure an active snapshot
* exists throughout all our usage of the executor.
*/
PushActiveSnapshot(GetTransactionSnapshot());
- estate = CreateExecutorState();
+ edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
+ edata->targetRel = rel;
+
+ edata->estate = estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
@@ -215,7 +229,12 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
rte->rellockmode = AccessShareLock;
ExecInitRangeTable(estate, list_make1(rte));
- resultRelInfo = makeNode(ResultRelInfo);
+ edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
+
+ /*
+ * Use Relation opened by logicalrep_rel_open() instead of opening it
+ * again.
+ */
InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
estate->es_result_relations = resultRelInfo;
@@ -227,22 +246,38 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
- return estate;
+ /* other fields of edata remain NULL for now */
+
+ return edata;
}
/*
* Finish any operations related to the executor state created by
- * create_estate_for_relation().
+ * create_edata_for_relation().
*/
static void
-finish_estate(EState *estate)
+finish_edata(ApplyExecutionData *edata)
{
+ EState *estate = edata->estate;
+
/* Handle any queued AFTER triggers. */
AfterTriggerEndQuery(estate);
- /* Cleanup. */
+ /* Shut down tuple routing, if any was done. */
+ if (edata->proute)
+ ExecCleanupTupleRouting(edata->mtstate, edata->proute);
+
+ /*
+ * Cleanup. It might seem that we should call ExecCloseResultRelations()
+ * here, but we intentionally don't. It would close the rel we added to
+ * the estate above, which is wrong because we took no corresponding
+ * refcount. We rely on ExecCleanupTupleRouting() to close any other
+ * relations opened during execution.
+ */
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
+ pfree(edata);
+
PopActiveSnapshot();
}
@@ -633,6 +668,7 @@ apply_handle_insert(StringInfo s)
LogicalRepRelMapEntry *rel;
LogicalRepTupleData newtup;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
@@ -652,7 +688,8 @@ apply_handle_insert(StringInfo s)
}
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
@@ -665,13 +702,13 @@ apply_handle_insert(StringInfo s)
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(estate->es_result_relation_info, estate,
- remoteslot, NULL, rel, CMD_INSERT);
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_INSERT);
else
apply_handle_insert_internal(estate->es_result_relation_info, estate,
remoteslot);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
@@ -735,6 +772,7 @@ apply_handle_update(StringInfo s)
{
LogicalRepRelMapEntry *rel;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
@@ -762,7 +800,8 @@ apply_handle_update(StringInfo s)
check_relation_updatable(rel);
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
@@ -800,13 +839,13 @@ apply_handle_update(StringInfo s)
/* For a partitioned table, apply update to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(estate->es_result_relation_info, estate,
- remoteslot, &newtup, rel, CMD_UPDATE);
+ apply_handle_tuple_routing(edata,
+ remoteslot, &newtup, CMD_UPDATE);
else
apply_handle_update_internal(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
@@ -881,6 +920,7 @@ apply_handle_delete(StringInfo s)
LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
@@ -903,7 +943,8 @@ apply_handle_delete(StringInfo s)
check_relation_updatable(rel);
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
@@ -915,13 +956,13 @@ apply_handle_delete(StringInfo s)
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(estate->es_result_relation_info, estate,
- remoteslot, NULL, rel, CMD_DELETE);
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_DELETE);
else
apply_handle_delete_internal(estate->es_result_relation_info, estate,
remoteslot, &rel->remoterel);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
@@ -1004,16 +1045,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
* This handles insert, update, delete on a partitioned table.
*/
static void
-apply_handle_tuple_routing(ResultRelInfo *relinfo,
- EState *estate,
+apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry,
CmdType operation)
{
+ EState *estate = edata->estate;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ ResultRelInfo *relinfo = edata->targetRelInfo;
Relation parentrel = relinfo->ri_RelationDesc;
- ModifyTableState *mtstate = NULL;
- PartitionTupleRouting *proute = NULL;
+ ModifyTableState *mtstate;
+ PartitionTupleRouting *proute;
ResultRelInfo *partrelinfo;
Relation partrel;
TupleTableSlot *remoteslot_part;
@@ -1022,12 +1064,15 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
MemoryContext oldctx;
/* ModifyTableState is needed for ExecFindPartition(). */
- mtstate = makeNode(ModifyTableState);
+ edata->mtstate = mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = operation;
mtstate->resultRelInfo = relinfo;
- proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
+
+ /* ... as is PartitionTupleRouting. */
+ edata->proute = proute = ExecSetupPartitionTupleRouting(estate, mtstate,
+ parentrel);
/*
* Find the partition to which the "search tuple" belongs.
@@ -1225,8 +1270,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
-
- ExecCleanupTupleRouting(mtstate, proute);
}
/*