aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2018-07-16 17:33:22 -0400
committerRobert Haas <rhaas@postgresql.org>2018-07-16 17:55:13 -0400
commit4beb25c63221393f8a7ec790ebce1d6b5911289a (patch)
treeb1967eb5507a296cd9a173b6e2de4b7bc971d0ef /src
parent0bb28ca36eaf14ba73695bc0a29e3f36de34e07b (diff)
downloadpostgresql-4beb25c63221393f8a7ec790ebce1d6b5911289a.tar.gz
postgresql-4beb25c63221393f8a7ec790ebce1d6b5911289a.zip
Add subtransaction handling for table synchronization workers.
Since the old logic was completely unaware of subtransactions, a change made in a subsequently-aborted subtransaction would still cause workers to be stopped at toplevel transaction commit. Fix that by managing a stack of worker lists rather than just one. Amit Khandekar and Robert Haas Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c2
-rw-r--r--src/backend/replication/logical/launcher.c116
-rw-r--r--src/include/replication/logicallauncher.h1
-rw-r--r--src/tools/pgindent/typedefs.list1
4 files changed, 112 insertions, 8 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index df3dc1c168a..9004e38e6d4 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4542,6 +4542,7 @@ CommitSubTransaction(void)
AtEOSubXact_HashTables(true, s->nestingLevel);
AtEOSubXact_PgStat(true, s->nestingLevel);
AtSubCommit_Snapshot(s->nestingLevel);
+ AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
/*
* We need to restore the upper transaction's read-only state, in case the
@@ -4695,6 +4696,7 @@ AbortSubTransaction(void)
AtEOSubXact_HashTables(false, s->nestingLevel);
AtEOSubXact_PgStat(false, s->nestingLevel);
AtSubAbort_Snapshot(s->nestingLevel);
+ AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
}
/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 44bdcab3b97..ef803b86fef 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
Oid relid;
} LogicalRepWorkerId;
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+ int nestDepth; /* Sub-transaction nest level */
+ List *workers; /* List of LogicalRepWorkerId */
+ struct StopWorkersData *parent; /* This need not be an immediate
+ * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void
logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
{
+ int nestDepth = GetCurrentTransactionNestLevel();
LogicalRepWorkerId *wid;
MemoryContext oldctx;
/* Make sure we store the info in context that survives until commit. */
oldctx = MemoryContextSwitchTo(TopTransactionContext);
+ /* Check that previous transactions were properly cleaned up. */
+ Assert(on_commit_stop_workers == NULL ||
+ nestDepth >= on_commit_stop_workers->nestDepth);
+
+ /*
+ * Push a new stack element if we don't already have one for the current
+ * nestDepth.
+ */
+ if (on_commit_stop_workers == NULL ||
+ nestDepth > on_commit_stop_workers->nestDepth)
+ {
+ StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+
+ newdata->nestDepth = nestDepth;
+ newdata->workers = NIL;
+ newdata->parent = on_commit_stop_workers;
+ on_commit_stop_workers = newdata;
+ }
+
+ /*
+ * Finally add a new worker into the worker list of the current
+ * subtransaction.
+ */
wid = palloc(sizeof(LogicalRepWorkerId));
wid->subid = subid;
wid->relid = relid;
-
- on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+ on_commit_stop_workers->workers =
+ lappend(on_commit_stop_workers->workers, wid);
MemoryContextSwitchTo(oldctx);
}
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
bool
XactManipulatesLogicalReplicationWorkers(void)
{
- return (on_commit_stop_workers != NIL);
+ return (on_commit_stop_workers != NULL);
}
/*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
void
AtEOXact_ApplyLauncher(bool isCommit)
{
+
+ Assert(on_commit_stop_workers == NULL ||
+ (on_commit_stop_workers->nestDepth == 1 &&
+ on_commit_stop_workers->parent == NULL));
+
if (isCommit)
{
ListCell *lc;
- foreach(lc, on_commit_stop_workers)
+ if (on_commit_stop_workers != NULL)
{
- LogicalRepWorkerId *wid = lfirst(lc);
+ List *workers = on_commit_stop_workers->workers;
+
+ foreach(lc, workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
- logicalrep_worker_stop(wid->subid, wid->relid);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
}
if (on_commit_launcher_wakeup)
@@ -848,11 +894,65 @@ AtEOXact_ApplyLauncher(bool isCommit)
* No need to pfree on_commit_stop_workers. It was allocated in
* transaction memory context, which is going to be cleaned soon.
*/
- on_commit_stop_workers = NIL;
+ on_commit_stop_workers = NULL;
on_commit_launcher_wakeup = false;
}
/*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+ StopWorkersData *parent;
+
+ /* Exit immediately if there's no work to do at this level. */
+ if (on_commit_stop_workers == NULL ||
+ on_commit_stop_workers->nestDepth < nestDepth)
+ return;
+
+ Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+ parent = on_commit_stop_workers->parent;
+
+ if (isCommit)
+ {
+ /*
+ * If the upper stack element is not an immediate parent
+ * subtransaction, just decrement the notional nesting depth without
+ * doing any real work. Else, we need to merge the current workers
+ * list into the parent.
+ */
+ if (!parent || parent->nestDepth < nestDepth - 1)
+ {
+ on_commit_stop_workers->nestDepth--;
+ return;
+ }
+
+ parent->workers =
+ list_concat(parent->workers, on_commit_stop_workers->workers);
+ }
+ else
+ {
+ /*
+ * Abandon everything that was done at this nesting level. Explicitly
+ * free memory to avoid a transaction-lifespan leak.
+ */
+ list_free_deep(on_commit_stop_workers->workers);
+ }
+
+ /*
+ * We have taken care of the current subtransaction workers list for both
+ * abort or commit. So we are ready to pop the stack.
+ */
+ pfree(on_commit_stop_workers);
+ on_commit_stop_workers = parent;
+}
+
+/*
* Request wakeup of the launcher on commit of the transaction.
*
* This is used to send launcher signal to stop sleeping and process the
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 78016c448f3..84f6041e727 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
extern bool IsLogicalLauncher(void);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8166d86ca1d..6da8ae40a52 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2112,6 +2112,7 @@ StdAnalyzeData
StdRdOptions
Step
StopList
+StopWorkersData
StrategyNumber
StreamCtl
StringInfo