aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2021-06-10 12:27:27 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2021-06-10 12:27:27 -0400
commit26383da7d47a140c33271bfd9215fe50959c80e3 (patch)
tree4c6ae94c4d3d6a2da9505563d34d2576a9f8ae2e
parent2208d71a0099bf51c70e1636e10a3e7dffe6581f (diff)
downloadpostgresql-26383da7d47a140c33271bfd9215fe50959c80e3.tar.gz
postgresql-26383da7d47a140c33271bfd9215fe50959c80e3.zip
Rearrange logrep worker's snapshot handling some more.
It turns out that worker.c's code path for TRUNCATE was also careless about establishing a snapshot while executing user-defined code, allowing the checks added by commit 84f5c2908 to fail when a trigger is fired in that context. We could just wrap Push/PopActiveSnapshot around the truncate call, but it seems better to establish a policy of holding a snapshot throughout execution of a replication step. To help with that and possible future requirements, replace the previous ensure_transaction calls with pairs of begin/end_replication_step calls. Per report from Mark Dilger. Back-patch to v11, like the previous changes. Discussion: https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com
-rw-r--r--src/backend/replication/logical/worker.c70
1 files changed, 38 insertions, 32 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 30402c22628..f6650a2ee92 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -138,30 +138,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
}
/*
- * Make sure that we started local transaction.
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
*
- * Also switches to ApplyMessageContext as necessary.
+ * Start a transaction, if this is the first step (else we keep using the
+ * existing transaction).
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
*/
-static bool
-ensure_transaction(void)
+static void
+begin_replication_step(void)
{
- if (IsTransactionState())
- {
- SetCurrentStatementStartTimestamp();
-
- if (CurrentMemoryContext != ApplyMessageContext)
- MemoryContextSwitchTo(ApplyMessageContext);
+ SetCurrentStatementStartTimestamp();
- return false;
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
}
- SetCurrentStatementStartTimestamp();
- StartTransactionCommand();
-
- maybe_reread_subscription();
+ PushActiveSnapshot(GetTransactionSnapshot());
MemoryContextSwitchTo(ApplyMessageContext);
- return true;
+}
+
+/*
+ * Finish up one step of a replication transaction.
+ * Callers of begin_replication_step() must also call this.
+ *
+ * We don't close out the transaction here, but we should increment
+ * the command counter to make the effects of this step visible.
+ */
+static void
+end_replication_step(void)
+{
+ PopActiveSnapshot();
+
+ CommandCounterIncrement();
}
@@ -178,13 +189,6 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
ResultRelInfo *resultRelInfo;
RangeTblEntry *rte;
- /*
- * Input functions may need an active snapshot, as may AFTER triggers
- * invoked during finish_estate. For safety, ensure an active snapshot
- * exists throughout all our usage of the executor.
- */
- PushActiveSnapshot(GetTransactionSnapshot());
-
estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
@@ -222,7 +226,6 @@ finish_estate(EState *estate)
/* Cleanup. */
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
- PopActiveSnapshot();
}
/*
@@ -612,7 +615,7 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- ensure_transaction();
+ begin_replication_step();
relid = logicalrep_read_insert(s, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -623,6 +626,7 @@ apply_handle_insert(StringInfo s)
* transaction so it's safe to unlock it.
*/
logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
return;
}
@@ -650,7 +654,7 @@ apply_handle_insert(StringInfo s)
logicalrep_rel_close(rel, NoLock);
- CommandCounterIncrement();
+ end_replication_step();
}
/*
@@ -708,7 +712,7 @@ apply_handle_update(StringInfo s)
bool found;
MemoryContext oldctx;
- ensure_transaction();
+ begin_replication_step();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
&newtup);
@@ -720,6 +724,7 @@ apply_handle_update(StringInfo s)
* transaction so it's safe to unlock it.
*/
logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
return;
}
@@ -825,7 +830,7 @@ apply_handle_update(StringInfo s)
logicalrep_rel_close(rel, NoLock);
- CommandCounterIncrement();
+ end_replication_step();
}
/*
@@ -847,7 +852,7 @@ apply_handle_delete(StringInfo s)
bool found;
MemoryContext oldctx;
- ensure_transaction();
+ begin_replication_step();
relid = logicalrep_read_delete(s, &oldtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -858,6 +863,7 @@ apply_handle_delete(StringInfo s)
* transaction so it's safe to unlock it.
*/
logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
return;
}
@@ -920,7 +926,7 @@ apply_handle_delete(StringInfo s)
logicalrep_rel_close(rel, NoLock);
- CommandCounterIncrement();
+ end_replication_step();
}
/*
@@ -941,7 +947,7 @@ apply_handle_truncate(StringInfo s)
ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock;
- ensure_transaction();
+ begin_replication_step();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@@ -982,7 +988,7 @@ apply_handle_truncate(StringInfo s)
logicalrep_rel_close(rel, NoLock);
}
- CommandCounterIncrement();
+ end_replication_step();
}