aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c91
1 files changed, 76 insertions, 15 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28b8591efa5..ee911394a23 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -65,6 +65,7 @@
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
@@ -84,6 +85,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
- case 'c': /* CopyDone */
- case 'f': /* CopyFail */
- case 'H': /* Flush */
- case 'S': /* Sync */
+ case PqMsg_CopyDone:
+ case PqMsg_CopyFail:
+ case PqMsg_Flush:
+ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
- case 'c': /* CopyDone */
+ case PqMsg_CopyDone:
return false;
- case 'H': /* Sync */
- case 'S': /* Flush */
+ case PqMsg_Sync:
+ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
- case 'f':
+ case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
@@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
tmpbuf.data, sizeof(int64));
/* output previously gathered data in a CopyData packet */
- pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
CHECK_FOR_INTERRUPTS();
@@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
}
@@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
+ case 'p':
+ ProcessStandbyPSRequestMessage();
+ break;
+
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ TransactionId oldestXidInCommit;
+ FullTransactionId nextFullXid;
+ FullTransactionId fullOldestXidInCommit;
+ WalSnd *walsnd = MyWalSnd;
+ TimestampTz replyTime;
+
+ /*
+ * This shouldn't happen because we don't support getting primary status
+ * message from standby.
+ */
+ if (RecoveryInProgress())
+ elog(ERROR, "the primary status is unavailable during recovery");
+
+ replyTime = pq_getmsgint64(&reply_message);
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * Consider transactions in the current database, as only these are the
+ * ones replicated.
+ */
+ oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ nextFullXid = ReadNextFullTransactionId();
+ fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+ oldestXidInCommit);
+ lsn = GetXLogWriteRecPtr();
+
+ elog(DEBUG2, "sending primary status");
+
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 's');
+ pq_sendint64(&output_message, lsn);
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+ pq_sendint64(&output_message, GetCurrentTimestamp());
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
+}
+
+/*
* Compute how long send/receive loops should sleep.
*
* If wal_sender_timeout is enabled we want to wake up in time to send
@@ -3246,7 +3307,7 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader);
/* Send CopyDone */
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
@@ -3374,7 +3435,7 @@ retry:
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
sentPtr = endptr;
@@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
/* Set local flag */
if (requestReply)