aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c123
1 files changed, 92 insertions, 31 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f2c33250e8b..ee911394a23 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -65,6 +65,7 @@
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
@@ -84,6 +85,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -408,7 +411,7 @@ IdentifySystem(void)
else
logptr = GetFlushRecPtr(&currTLI);
- snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
+ snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
if (MyDatabaseId != InvalidOid)
{
@@ -515,7 +518,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
{
char xloc[64];
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
values[i] = CStringGetTextDatum(xloc);
nulls[i] = false;
@@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
- case 'c': /* CopyDone */
- case 'f': /* CopyFail */
- case 'H': /* Flush */
- case 'S': /* Sync */
+ case PqMsg_CopyDone:
+ case PqMsg_CopyFail:
+ case PqMsg_Flush:
+ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
- case 'c': /* CopyDone */
+ case PqMsg_CopyDone:
return false;
- case 'H': /* Sync */
- case 'S': /* Flush */
+ case PqMsg_Sync:
+ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
- case 'f':
+ case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
@@ -892,12 +895,12 @@ StartReplication(StartReplicationCmd *cmd)
switchpoint < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
- LSN_FORMAT_ARGS(cmd->startpoint),
- cmd->timeline),
- errdetail("This server's history forked from timeline %u at %X/%X.",
- cmd->timeline,
- LSN_FORMAT_ARGS(switchpoint))));
+ errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ cmd->timeline),
+ errdetail("This server's history forked from timeline %u at %X/%08X.",
+ cmd->timeline,
+ LSN_FORMAT_ARGS(switchpoint)));
}
sendTimeLineValidUpto = switchpoint;
}
@@ -939,9 +942,9 @@ StartReplication(StartReplicationCmd *cmd)
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
- LSN_FORMAT_ARGS(cmd->startpoint),
- LSN_FORMAT_ARGS(FlushPtr))));
+ errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ LSN_FORMAT_ARGS(FlushPtr)));
}
/* Start streaming from the requested point */
@@ -983,7 +986,7 @@ StartReplication(StartReplicationCmd *cmd)
Datum values[2];
bool nulls[2] = {0};
- snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+ snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
LSN_FORMAT_ARGS(sendTimeLineValidUpto));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -1324,7 +1327,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ReplicationSlotPersist();
}
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
tmpbuf.data, sizeof(int64));
/* output previously gathered data in a CopyData packet */
- pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
CHECK_FOR_INTERRUPTS();
@@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
}
@@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
+ case 'p':
+ ProcessStandbyPSRequestMessage();
+ break;
+
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2429,7 +2436,7 @@ ProcessStandbyReplyMessage(void)
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr),
@@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ TransactionId oldestXidInCommit;
+ FullTransactionId nextFullXid;
+ FullTransactionId fullOldestXidInCommit;
+ WalSnd *walsnd = MyWalSnd;
+ TimestampTz replyTime;
+
+ /*
+ * This shouldn't happen because we don't support getting primary status
+ * message from standby.
+ */
+ if (RecoveryInProgress())
+ elog(ERROR, "the primary status is unavailable during recovery");
+
+ replyTime = pq_getmsgint64(&reply_message);
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * Consider transactions in the current database, as only these are the
+ * ones replicated.
+ */
+ oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ nextFullXid = ReadNextFullTransactionId();
+ fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+ oldestXidInCommit);
+ lsn = GetXLogWriteRecPtr();
+
+ elog(DEBUG2, "sending primary status");
+
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 's');
+ pq_sendint64(&output_message, lsn);
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+ pq_sendint64(&output_message, GetCurrentTimestamp());
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
+}
+
+/*
* Compute how long send/receive loops should sleep.
*
* If wal_sender_timeout is enabled we want to wake up in time to send
@@ -3246,12 +3307,12 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader);
/* Send CopyDone */
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
- elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
LSN_FORMAT_ARGS(sendTimeLineValidUpto),
LSN_FORMAT_ARGS(sentPtr));
return;
@@ -3374,7 +3435,7 @@ retry:
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
sentPtr = endptr;
@@ -3392,7 +3453,7 @@ retry:
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
@@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
/* Set local flag */
if (requestReply)