aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-06-03 11:37:47 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-06-03 11:40:05 -0400
commit66b84fa82f7318d8da75dbf754df16eb7b1f1037 (patch)
treeaafbc89ec60952a30cfaa9ca4cf0b27d9552515e /src
parent3c9bc2157a4f465b3c070d1250597568d2dc285f (diff)
downloadpostgresql-66b84fa82f7318d8da75dbf754df16eb7b1f1037.tar.gz
postgresql-66b84fa82f7318d8da75dbf754df16eb7b1f1037.zip
Receive invalidation messages correctly in tablesync worker
We didn't accept any invalidation messages until the whole sync process had finished (because it flattens all the remote transactions in the single one). So the sync worker didn't learn about subscription changes/drop until it has finished. This could lead to "orphaned" sync workers. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c23
1 files changed, 15 insertions, 8 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e31551340c9..a570900a429 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
-static void reread_subscription(void);
+static void maybe_reread_subscription(void);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
StartTransactionCommand();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext);
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
store_flush_position(commit_data.end_lsn);
}
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
in_remote_transaction = false;
@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* now.
*/
AcceptInvalidationMessages();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
/* Process any table synchronization changes. */
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos;
}
-
/*
- * Reread subscription info and exit on change.
+ * Reread subscription info if needed. Most changes will be exit.
*/
static void
-reread_subscription(void)
+maybe_reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
+ /* When cache state is valid there is nothing to do here. */
+ if (MySubscriptionValid)
+ return;
+
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
{