diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 55 |
1 files changed, 29 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 583752c17f1..30402c22628 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -178,6 +178,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) ResultRelInfo *resultRelInfo; RangeTblEntry *rte; + /* + * Input functions may need an active snapshot, as may AFTER triggers + * invoked during finish_estate. For safety, ensure an active snapshot + * exists throughout all our usage of the executor. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); @@ -203,6 +210,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) } /* + * Finish any operations related to the executor state created by + * create_estate_for_relation(). + */ +static void +finish_estate(EState *estate) +{ + /* Handle any queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + /* Cleanup. */ + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + PopActiveSnapshot(); +} + +/* * Executes default values for columns for which we can't map to remote * relation columns. * @@ -609,9 +632,6 @@ apply_handle_insert(StringInfo s) RelationGetDescr(rel->localrel), &TTSOpsVirtual); - /* Input functions may need an active snapshot, so get one */ - PushActiveSnapshot(GetTransactionSnapshot()); - /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, newtup.values); @@ -625,13 +645,8 @@ apply_handle_insert(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); - PopActiveSnapshot(); - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -745,7 +760,6 @@ apply_handle_update(StringInfo s) /* Also populate extraUpdatedCols, in case we have generated columns */ fill_extraUpdatedCols(target_rte, rel->localrel); - PushActiveSnapshot(GetTransactionSnapshot()); ExecOpenIndices(estate->es_result_relation_info, false); /* Build the search tuple. */ @@ -804,15 +818,10 @@ apply_handle_update(StringInfo s) } /* Cleanup. */ + EvalPlanQualEnd(&epqstate); ExecCloseIndices(estate->es_result_relation_info); - PopActiveSnapshot(); - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - EvalPlanQualEnd(&epqstate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -864,7 +873,6 @@ apply_handle_delete(StringInfo s) &estate->es_tupleTable); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); - PushActiveSnapshot(GetTransactionSnapshot()); ExecOpenIndices(estate->es_result_relation_info, false); /* Find the tuple using the replica identity index. */ @@ -905,15 +913,10 @@ apply_handle_delete(StringInfo s) } /* Cleanup. */ + EvalPlanQualEnd(&epqstate); ExecCloseIndices(estate->es_result_relation_info); - PopActiveSnapshot(); - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - - EvalPlanQualEnd(&epqstate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); + finish_estate(estate); logicalrep_rel_close(rel, NoLock); @@ -936,7 +939,7 @@ apply_handle_truncate(StringInfo s) List *relids = NIL; List *relids_logged = NIL; ListCell *lc; - LOCKMODE lockmode = AccessExclusiveLock; + LOCKMODE lockmode = AccessExclusiveLock; ensure_transaction(); |