aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c55
1 files changed, 29 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 583752c17f1..30402c22628 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -178,6 +178,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
ResultRelInfo *resultRelInfo;
RangeTblEntry *rte;
+ /*
+ * Input functions may need an active snapshot, as may AFTER triggers
+ * invoked during finish_estate. For safety, ensure an active snapshot
+ * exists throughout all our usage of the executor.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
@@ -203,6 +210,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
}
/*
+ * Finish any operations related to the executor state created by
+ * create_estate_for_relation().
+ */
+static void
+finish_estate(EState *estate)
+{
+ /* Handle any queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ /* Cleanup. */
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+ PopActiveSnapshot();
+}
+
+/*
* Executes default values for columns for which we can't map to remote
* relation columns.
*
@@ -609,9 +632,6 @@ apply_handle_insert(StringInfo s)
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
- /* Input functions may need an active snapshot, so get one */
- PushActiveSnapshot(GetTransactionSnapshot());
-
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
@@ -625,13 +645,8 @@ apply_handle_insert(StringInfo s)
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -745,7 +760,6 @@ apply_handle_update(StringInfo s)
/* Also populate extraUpdatedCols, in case we have generated columns */
fill_extraUpdatedCols(target_rte, rel->localrel);
- PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Build the search tuple. */
@@ -804,15 +818,10 @@ apply_handle_update(StringInfo s)
}
/* Cleanup. */
+ EvalPlanQualEnd(&epqstate);
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -864,7 +873,6 @@ apply_handle_delete(StringInfo s)
&estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
- PushActiveSnapshot(GetTransactionSnapshot());
ExecOpenIndices(estate->es_result_relation_info, false);
/* Find the tuple using the replica identity index. */
@@ -905,15 +913,10 @@ apply_handle_delete(StringInfo s)
}
/* Cleanup. */
+ EvalPlanQualEnd(&epqstate);
ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
@@ -936,7 +939,7 @@ apply_handle_truncate(StringInfo s)
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
- LOCKMODE lockmode = AccessExclusiveLock;
+ LOCKMODE lockmode = AccessExclusiveLock;
ensure_transaction();