aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c55
1 files changed, 29 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index db52e4a4e54..f8c52b4ccc1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -194,6 +194,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
ResultRelInfo *resultRelInfo;
RangeTblEntry *rte;
+ /*
+ * Input functions may need an active snapshot, as may AFTER triggers
+ * invoked during finish_estate. For safety, ensure an active snapshot
+ * exists throughout all our usage of the executor.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
@@ -222,6 +229,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
}
/*
+ * Finish any operations related to the executor state created by
+ * create_estate_for_relation().
+ */
+static void
+finish_estate(EState *estate)
+{
+ /* Handle any queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ /* Cleanup. */
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+ PopActiveSnapshot();
+}
+
+/*
* Executes default values for columns for which we can't map to remote
* relation columns.
*
@@ -627,9 +650,6 @@ apply_handle_insert(StringInfo s)
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel));
- /* Input functions may need an active snapshot, so get one */
- PushActiveSnapshot(GetTransactionSnapshot());
-
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
@@ -643,13 +663,8 @@ apply_handle_insert(StringInfo s)
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -760,7 +775,6 @@ apply_handle_update(StringInfo s)
}
}
- PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Build the search tuple. */
@@ -819,15 +833,10 @@ apply_handle_update(StringInfo s)
}
/* Cleanup. */
+ EvalPlanQualEnd(&epqstate);
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -878,7 +887,6 @@ apply_handle_delete(StringInfo s)
RelationGetDescr(rel->localrel));
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
- PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Find the tuple using the replica identity index. */
@@ -919,15 +927,10 @@ apply_handle_delete(StringInfo s)
}
/* Cleanup. */
+ EvalPlanQualEnd(&epqstate);
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -950,7 +953,7 @@ apply_handle_truncate(StringInfo s)
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
- LOCKMODE lockmode = AccessExclusiveLock;
+ LOCKMODE lockmode = AccessExclusiveLock;
ensure_transaction();