aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c105
1 files changed, 48 insertions, 57 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 606b9e8571c..3497269850d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -611,76 +611,67 @@ static void
ProcessStandbyHSFeedbackMessage(void)
{
StandbyHSFeedbackMessage reply;
- TransactionId newxmin = InvalidTransactionId;
+ TransactionId nextXid;
+ uint32 nextEpoch;
- pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+ /* Decipher the reply message */
+ pq_copymsgbytes(&reply_message, (char *) &reply,
+ sizeof(StandbyHSFeedbackMessage));
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
reply.xmin,
reply.epoch);
+ /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+ if (!TransactionIdIsNormal(reply.xmin))
+ return;
+
/*
- * Update the WalSender's proc xmin to allow it to be visible to
- * snapshots. This will hold back the removal of dead rows and thereby
- * prevent the generation of cleanup conflicts on the standby server.
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around. Ignore if not.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
*/
- if (TransactionIdIsValid(reply.xmin))
- {
- TransactionId nextXid;
- uint32 nextEpoch;
- bool epochOK = false;
-
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
- /*
- * Epoch of oldestXmin should be same as standby or if the counter has
- * wrapped, then one less than reply.
- */
- if (reply.xmin <= nextXid)
- {
- if (reply.epoch == nextEpoch)
- epochOK = true;
- }
- else
- {
- if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
- epochOK = true;
- }
-
- /*
- * Feedback from standby must not go backwards, nor should it go
- * forwards further than our most recent xid.
- */
- if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
- {
- if (!TransactionIdIsValid(MyProc->xmin))
- {
- TransactionId oldestXmin = GetOldestXmin(true, true);
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
- if (TransactionIdPrecedes(oldestXmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = oldestXmin;
- }
- else
- {
- if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = MyProc->xmin; /* stay the same */
- }
- }
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch != nextEpoch)
+ return;
}
+ else
+ {
+ if (reply.epoch + 1 != nextEpoch)
+ return;
+ }
+
+ if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ return; /* epoch OK, but it's wrapped around */
/*
- * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+ * Set the WalSender's xmin equal to the standby's requested xmin, so that
+ * the xmin will be taken into account by GetOldestXmin. This will hold
+ * back the removal of dead rows and thereby prevent the generation of
+ * cleanup conflicts on the standby server.
+ *
+ * There is a small window for a race condition here: although we just
+ * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * advanced between our fetching it and applying the xmin below, perhaps
+ * far enough to make reply.xmin wrap around. In that case the xmin we
+ * set here would be "in the future" and have no effect. No point in
+ * worrying about this since it's too late to save the desired data
+ * anyway. Assuming that the standby sends us an increasing sequence of
+ * xmins, this could only happen during the first reply cycle, else our
+ * own xmin would prevent nextXid from advancing so far.
+ *
+ * We don't bother taking the ProcArrayLock here. Setting the xmin field
+ * is assumed atomic, and there's no real need to prevent a concurrent
+ * GetOldestXmin. (If we're moving our xmin forward, this is obviously
+ * safe, and if we're moving it backwards, well, the data is at risk
+ * already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- if (MyProc->xmin != newxmin)
- {
- LWLockAcquire(ProcArrayLock, LW_SHARED);
- MyProc->xmin = newxmin;
- LWLockRelease(ProcArrayLock);
- }
+ MyProc->xmin = reply.xmin;
}
/* Main loop of walsender process */