aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Paquier <michael@paquier.xyz>2018-07-12 10:20:21 +0900
committerMichael Paquier <michael@paquier.xyz>2018-07-12 10:20:21 +0900
commit19648375cc307eb5849906c0ddef737ddacf141a (patch)
tree3ca97b0aefc3f8b9ad1cdb8d0c8645cdeb41a618 /src
parent324c9554022d4403f34264598cb6e0a7d0d5873c (diff)
downloadpostgresql-19648375cc307eb5849906c0ddef737ddacf141a.tar.gz
postgresql-19648375cc307eb5849906c0ddef737ddacf141a.zip
Make logical WAL sender report streaming state appropriately
WAL senders sending logically-decoded data fail to properly report in "streaming" state when starting up, hence as long as one extra record is not replayed, such WAL senders would remain in a "catchup" state, which is inconsistent with the physical cousin. This can be easily reproduced by for example using pg_recvlogical and restarting the upstream server. The TAP tests have been slightly modified to detect the failure and strengthened so as future tests also make sure that a node is in streaming state when waiting for its catchup. Backpatch down to 9.4 where this code has been introduced. Reported-by: Sawada Masahiko Author: Simon Riggs, Sawada Masahiko Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c22
1 files changed, 16 insertions, 6 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 25b12123d5a..6dbbe8b9152 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1923,8 +1923,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
- (errmsg("standby \"%s\" has now caught up with primary",
- application_name)));
+ (errmsg("\"%s\" has now caught up with upstream server",
+ application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
}
@@ -2483,10 +2483,10 @@ XLogSendLogical(void)
char *errm;
/*
- * Don't know whether we've caught up yet. We'll set it to true in
- * WalSndWaitForWal, if we're actually waiting. We also set to true if
- * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
- * i.e. when we're shutting down.
+ * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+ * true in WalSndWaitForWal, if we're actually waiting. We also set to
+ * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+ * didn't wait - i.e. when we're shutting down.
*/
WalSndCaughtUp = false;
@@ -2499,9 +2499,19 @@ XLogSendLogical(void)
if (record != NULL)
{
+ /* XXX: Note that logical decoding cannot be used while in recovery */
+ XLogRecPtr flushPtr = GetFlushRecPtr();
+
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+ /*
+ * If we have sent a record that is at or beyond the flushed point, we
+ * have caught up.
+ */
+ if (sentPtr >= flushPtr)
+ WalSndCaughtUp = true;
}
else
{