aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c6
-rw-r--r--src/backend/access/transam/xact.c16
-rw-r--r--src/backend/access/transam/xlog.c23
-rw-r--r--src/backend/replication/README18
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c26
-rw-r--r--src/backend/replication/syncrep.c35
-rw-r--r--src/backend/replication/walreceiver.c85
-rw-r--r--src/backend/utils/misc/guc.c5
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
-rw-r--r--src/include/access/xact.h7
-rw-r--r--src/include/access/xlog.h2
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/replication/syncrep.h5
-rw-r--r--src/include/replication/walreceiver.h12
14 files changed, 185 insertions, 59 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e7234c87bbc..a65048b683b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_end_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
*/
- SyncRepWaitForLSN(recptr);
+ SyncRepWaitForLSN(recptr, true);
}
/*
@@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
*/
- SyncRepWaitForLSN(recptr);
+ SyncRepWaitForLSN(recptr, false);
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e31540548e0..7e373316139 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1324,7 +1324,7 @@ RecordTransactionCommit(void)
* in the procarray and continue to hold locks.
*/
if (wrote_xlog && markXidCommitted)
- SyncRepWaitForLSN(XactLastRecEnd);
+ SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */
XactLastCommitEnd = XactLastRecEnd;
@@ -5123,6 +5123,13 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
/*
+ * Check if the caller would like to ask standbys for immediate feedback
+ * once this commit is applied.
+ */
+ if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+ xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
+
+ /*
* Relcache invalidations requires information about the current database
* and so does logical decoding.
*/
@@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
+ /*
+ * If asked by the primary (because someone is waiting for a synchronous
+ * commit = remote_apply), we will need to ask walreceiver to send a
+ * reply immediately.
+ */
+ if (XactCompletionApplyFeedback(parsed->xinfo))
+ XLogRequestWalReceiverReply();
}
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b119a47e4e0..06cefe2efeb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr;
*/
static bool doPageWrites;
+/* Has the recovery code requested a walreceiver wakeup? */
+static bool doRequestWalReceiverReply;
+
/*
* RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby
@@ -6879,6 +6882,17 @@ StartupXLOG(void)
XLogCtl->lastReplayedTLI = ThisTimeLineID;
SpinLockRelease(&XLogCtl->info_lck);
+ /*
+ * If rm_redo called XLogRequestWalReceiverReply, then we
+ * wake up the receiver so that it notices the updated
+ * lastReplayedEndRecPtr and sends a reply to the master.
+ */
+ if (doRequestWalReceiverReply)
+ {
+ doRequestWalReceiverReply = false;
+ WalRcvForceReply();
+ }
+
/* Remember this record as the last-applied one */
LastRec = ReadRecPtr;
@@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping)
XLogCtl->WalWriterSleeping = sleeping;
SpinLockRelease(&XLogCtl->info_lck);
}
+
+/*
+ * Schedule a walreceiver wakeup in the main recovery loop.
+ */
+void
+XLogRequestWalReceiverReply(void)
+{
+ doRequestWalReceiverReply = true;
+}
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 8e5bf0d2b8a..419a2d74d73 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
Establish connection to the primary, and starts streaming from 'startpoint'.
Returns true on success.
-bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
-
-Retrieve any message available through the connection, blocking for
-maximum of 'timeout' ms. If a message was successfully read, returns true,
-otherwise false. On success, a pointer to the message payload is stored in
-*buffer, length in *len, and the type of message received in *type. The
-returned buffer is valid until the next call to walrcv_* functions, the
-caller should not attempt freeing it.
+int walrcv_receive(char **buffer, int *wait_fd)
+
+Retrieve any message available without blocking through the
+connection. If a message was successfully read, returns its
+length. If the connection is closed, returns -1. Otherwise returns 0
+to indicate that no data is available, and sets *wait_fd to a file
+descriptor which can be waited on before trying again. On success, a
+pointer to the message payload is stored in *buffer. The returned
+buffer is valid until the next call to walrcv_* functions, and the
+caller should not attempt to free it.
void walrcv_send(const char *buffer, int nbytes)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 4ee4d7106da..a3bec498fa0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, int *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -463,8 +463,7 @@ libpqrcv_disconnect(void)
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
@@ -472,15 +471,15 @@ libpqrcv_disconnect(void)
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * file descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, int *wait_fd)
{
int rawlen;
@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
@@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 92faf4e8e0d..2da9cba5dc7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
+ *
+ * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
+ * represents a commit record. If it doesn't, then we wait only for the WAL
+ * to be flushed if synchronous_commit is set to the higher level of
+ * remote_apply, because only commit records provide apply feedback.
*/
void
-SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char *new_status = NULL;
const char *old_status;
- int mode = SyncRepWaitMode;
+ int mode;
+
+ /* Cap the level for anything other than commit to remote flush only. */
+ if (commit)
+ mode = SyncRepWaitMode;
+ else
+ mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/*
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XactCommitLSN <= WalSndCtl->lsn[mode])
+ lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
- MyProc->waitLSN = XactCommitLSN;
+ MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
- (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
+ (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
+ int numapply = 0;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+ }
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+ numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+ numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/*
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7b36e02faa9..057c250793d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -352,8 +352,6 @@ WalReceiverMain(void)
if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
- bool endofwal = false;
-
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
@@ -376,18 +374,13 @@ WalReceiverMain(void)
ping_sent = false;
/* Loop until end-of-streaming or error */
- while (!endofwal)
+ for (;;)
{
char *buf;
int len;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid
- * the necessity for manual cleanup of all postmaster
- * children.
- */
- if (!PostmasterIsAlive())
- exit(1);
+ bool endofwal = false;
+ int wait_fd = PGINVALID_SOCKET;
+ int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -407,8 +400,8 @@ WalReceiverMain(void)
XLogWalRcvSendHSFeedback(true);
}
- /* Wait a while for data to arrive */
- len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ /* See if we can read data immediately */
+ len = walrcv_receive(&buf, &wait_fd);
if (len != 0)
{
/*
@@ -439,7 +432,7 @@ WalReceiverMain(void)
endofwal = true;
break;
}
- len = walrcv_receive(0, &buf);
+ len = walrcv_receive(&buf, &wait_fd);
}
/* Let the master know that we received some data. */
@@ -452,7 +445,54 @@ WalReceiverMain(void)
*/
XLogWalRcvFlush(false);
}
- else
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofwal)
+ break;
+
+ /*
+ * Ideally we would reuse a WaitEventSet object repeatedly
+ * here to avoid the overheads of WaitLatchOrSocket on epoll
+ * systems, but we can't be sure that libpq (or any other
+ * walreceiver implementation) has the same socket (even if
+ * the fd is the same number, it may have been closed and
+ * reopened since the last time). In future, if there is a
+ * function for removing sockets from WaitEventSet, then we
+ * could add and remove just the socket each time, potentially
+ * avoiding some system calls.
+ */
+ Assert(wait_fd != PGINVALID_SOCKET);
+ rc = WaitLatchOrSocket(&walrcv->latch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_LATCH_SET,
+ wait_fd,
+ NAPTIME_PER_CYCLE);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(&walrcv->latch);
+ if (walrcv->force_reply)
+ {
+ /*
+ * The recovery process has asked us to send apply
+ * feedback now. Make sure the flag is really set to
+ * false in shared memory before sending the reply,
+ * so we don't miss a new request for a reply.
+ */
+ walrcv->force_reply = false;
+ pg_memory_barrier();
+ XLogWalRcvSendReply(true, false);
+ }
+ }
+ if (rc & WL_POSTMASTER_DEATH)
+ {
+ /*
+ * Emergency bailout if postmaster has died. This is to
+ * avoid the necessity for manual cleanup of all
+ * postmaster children.
+ */
+ exit(1);
+ }
+ if (rc & WL_TIMEOUT)
{
/*
* We didn't receive anything new. If we haven't heard
@@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
/*
+ * Wake up the walreceiver main loop.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = remote_apply.
+ */
+void
+WalRcvForceReply(void)
+{
+ WalRcv->force_reply = true;
+ SetLatch(&WalRcv->latch);
+}
+
+/*
* Return a string constant representing the state. This is used
* in system functions and views, and should *not* be translated.
*/
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 65a6cd4404f..06cb1660eb6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
};
/*
- * Although only "on", "off", "remote_write", and "local" are documented, we
- * accept all the likely variants of "on" and "off".
+ * Although only "on", "off", "remote_apply", "remote_write", and "local" are
+ * documented, we accept all the likely variants of "on" and "off".
*/
static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+ {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
{"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5536012e050..ec4427f2d88 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -177,7 +177,7 @@
# (change requires restart)
#fsync = on # turns forced synchronization on or off
#synchronous_commit = on # synchronization level;
- # off, local, remote_write, or on
+ # off, local, remote_write, remote_apply, or on
#wal_sync_method = fsync # the default is the first option
# supported by the operating system:
# open_datasync
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 3ba23f5e87b..503ae1b82d7 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,9 @@ typedef enum
SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */
SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote
* write */
- SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */
+ SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */
+ SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote
+ * apply */
} SyncCommitLevel;
/* Define the default setting for synchonous_commit */
@@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
* EOXact... routines which run at the end of the original transaction
* completion.
*/
+#define XACT_COMPLETION_APPLY_FEEDBACK (1U << 29)
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
/* Access macros for above flags */
+#define XactCompletionApplyFeedback(xinfo) \
+ ((xinfo & XACT_COMPLETION_APPLY_FEEDBACK) != 0)
#define XactCompletionRelcacheInitFileInval(xinfo) \
((xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) != 0)
#define XactCompletionForceSyncCommit(xinfo) \
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 74a139496e6..a7dcdae67f8 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -267,6 +267,8 @@ extern bool CheckPromoteSignal(void);
extern void WakeupRecovery(void);
extern void SetWalWriterSleeping(bool sleeping);
+extern void XLogRequestWalReceiverReply(void);
+
extern void assign_max_wal_size(int newval, void *extra);
extern void assign_checkpoint_completion_target(double newval, void *extra);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index e8b64dd4262..aa2f074201a 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD088 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD089 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 96e059bc3fa..c005a425836 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,8 +23,9 @@
#define SYNC_REP_NO_WAIT -1
#define SYNC_REP_WAIT_WRITE 0
#define SYNC_REP_WAIT_FLUSH 1
+#define SYNC_REP_WAIT_APPLY 2
-#define NUM_SYNC_REP_WAIT_MODE 2
+#define NUM_SYNC_REP_WAIT_MODE 3
/* syncRepState */
#define SYNC_REP_NOT_WAITING 0
@@ -35,7 +36,7 @@
extern char *SyncRepStandbyNames;
/* called by user backend */
-extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
+extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
/* called at backend exit */
extern void SyncRepCleanupAtProcExit(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 6eacb095d1b..36bcb471720 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -113,9 +113,16 @@ typedef struct
slock_t mutex; /* locks shared variables shown above */
/*
+ * force walreceiver reply? This doesn't need to be locked; memory
+ * barriers for ordering are sufficient.
+ */
+ bool force_reply;
+
+ /*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
- * receiveStartTLI).
+ * receiveStartTLI), and also to tell it to send apply feedback to the
+ * primary whenever specially marked commit records are applied.
*/
Latch latch;
} WalRcvData;
@@ -138,7 +145,7 @@ extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
-typedef int (*walrcv_receive_type) (int timeout, char **buffer);
+typedef int (*walrcv_receive_type) (char **buffer, int *wait_fd);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
@@ -162,5 +169,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
+extern void WalRcvForceReply(void);
#endif /* _WALRECEIVER_H */