diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/twophase.c | 6 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 16 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 23 | ||||
-rw-r--r-- | src/backend/replication/README | 18 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 26 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 35 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 85 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 5 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 | ||||
-rw-r--r-- | src/include/access/xact.h | 7 | ||||
-rw-r--r-- | src/include/access/xlog.h | 2 | ||||
-rw-r--r-- | src/include/access/xlog_internal.h | 2 | ||||
-rw-r--r-- | src/include/replication/syncrep.h | 5 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 12 |
14 files changed, 185 insertions, 59 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e7234c87bbc..a65048b683b 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact) * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_end_lsn); + SyncRepWaitForLSN(gxact->prepare_end_lsn, false); records.tail = records.head = NULL; records.num_chunks = 0; @@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, true); } /* @@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, false); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e31540548e0..7e373316139 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1324,7 +1324,7 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog && markXidCommitted) - SyncRepWaitForLSN(XactLastRecEnd); + SyncRepWaitForLSN(XactLastRecEnd, true); /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -5123,6 +5123,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit = remote_apply), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b119a47e4e0..06cefe2efeb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr; */ static bool doPageWrites; +/* Has the recovery code requested a walreceiver wakeup? */ +static bool doRequestWalReceiverReply; + /* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby @@ -6879,6 +6882,17 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo called XLogRequestWalReceiverReply, then we + * wake up the receiver so that it notices the updated + * lastReplayedEndRecPtr and sends a reply to the master. + */ + if (doRequestWalReceiverReply) + { + doRequestWalReceiverReply = false; + WalRcvForceReply(); + } + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Schedule a walreceiver wakeup in the main recovery loop. + */ +void +XLogRequestWalReceiverReply(void) +{ + doRequestWalReceiverReply = true; +} diff --git a/src/backend/replication/README b/src/backend/replication/README index 8e5bf0d2b8a..419a2d74d73 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint) Establish connection to the primary, and starts streaming from 'startpoint'. Returns true on success. -bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) - -Retrieve any message available through the connection, blocking for -maximum of 'timeout' ms. If a message was successfully read, returns true, -otherwise false. On success, a pointer to the message payload is stored in -*buffer, length in *len, and the type of message received in *type. The -returned buffer is valid until the next call to walrcv_* functions, the -caller should not attempt freeing it. +int walrcv_receive(char **buffer, int *wait_fd) + +Retrieve any message available without blocking through the +connection. If a message was successfully read, returns its +length. If the connection is closed, returns -1. Otherwise returns 0 +to indicate that no data is available, and sets *wait_fd to a file +descriptor which can be waited on before trying again. On success, a +pointer to the message payload is stored in *buffer. The returned +buffer is valid until the next call to walrcv_* functions, and the +caller should not attempt to free it. void walrcv_send(const char *buffer, int nbytes) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 4ee4d7106da..a3bec498fa0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); -static int libpqrcv_receive(int timeout, char **buffer); +static int libpqrcv_receive(char **buffer, int *wait_fd); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -463,8 +463,7 @@ libpqrcv_disconnect(void) } /* - * Receive a message available from XLOG stream, blocking for - * maximum of 'timeout' ms. + * Receive a message available from XLOG stream. * * Returns: * @@ -472,15 +471,15 @@ libpqrcv_disconnect(void) * point to a buffer holding the received message. The buffer is only valid * until the next libpqrcv_* call. * - * 0 if no data was available within timeout, or wait was interrupted - * by signal. + * If no data was available immediately, returns 0, and *wait_fd is set to a + * file descriptor which can be waited on before trying again. * * -1 if the server ended the COPY. * * ereports on error. */ static int -libpqrcv_receive(int timeout, char **buffer) +libpqrcv_receive(char **buffer, int *wait_fd) { int rawlen; @@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer) rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) { - /* - * No data available yet. If the caller requested to block, wait for - * more data to arrive. - */ - if (timeout > 0) - { - if (!libpq_select(timeout)) - return 0; - } - + /* Try consuming some data. */ if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", @@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer) /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) + { + /* Tell caller to try again when our socket is ready. */ + *wait_fd = PQsocket(streamConn); return 0; + } } if (rawlen == -1) /* end-of-streaming or error */ { diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 92faf4e8e0d..2da9cba5dc7 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to the wait queue. During SyncRepWakeQueue() a WALSender changes * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. * This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN + * represents a commit record. If it doesn't, then we wait only for the WAL + * to be flushed if synchronous_commit is set to the higher level of + * remote_apply, because only commit records provide apply feedback. */ void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode; + + /* Cap the level for anything other than commit to remote flush only. */ + if (commit) + mode = SyncRepWaitMode; + else + mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); /* * Fast exit if user has not requested sync replication, or there are no @@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XactCommitLSN <= WalSndCtl->lsn[mode]) + lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; @@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ - MyProc->waitLSN = XactCommitLSN; + MyProc->waitLSN = lsn; MyProc->syncRepState = SYNC_REP_WAITING; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); @@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status = (char *) palloc(len + 32 + 1); memcpy(new_status, old_status, len); sprintf(new_status + len, " waiting for %X/%X", - (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); + (uint32) (lsn >> 32), (uint32) lsn); set_ps_display(new_status, false); new_status[len] = '\0'; /* truncate off " waiting ..." */ } @@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02faa9..057c250793d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -352,8 +352,6 @@ WalReceiverMain(void) if (walrcv_startstreaming(startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { - bool endofwal = false; - if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", @@ -376,18 +374,13 @@ WalReceiverMain(void) ping_sent = false; /* Loop until end-of-streaming or error */ - while (!endofwal) + for (;;) { char *buf; int len; - - /* - * Emergency bailout if postmaster has died. This is to avoid - * the necessity for manual cleanup of all postmaster - * children. - */ - if (!PostmasterIsAlive()) - exit(1); + bool endofwal = false; + int wait_fd = PGINVALID_SOCKET; + int rc; /* * Exit walreceiver if we're not in recovery. This should not @@ -407,8 +400,8 @@ WalReceiverMain(void) XLogWalRcvSendHSFeedback(true); } - /* Wait a while for data to arrive */ - len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + /* See if we can read data immediately */ + len = walrcv_receive(&buf, &wait_fd); if (len != 0) { /* @@ -439,7 +432,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(0, &buf); + len = walrcv_receive(&buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -452,7 +445,54 @@ WalReceiverMain(void) */ XLogWalRcvFlush(false); } - else + + /* Check if we need to exit the streaming loop. */ + if (endofwal) + break; + + /* + * Ideally we would reuse a WaitEventSet object repeatedly + * here to avoid the overheads of WaitLatchOrSocket on epoll + * systems, but we can't be sure that libpq (or any other + * walreceiver implementation) has the same socket (even if + * the fd is the same number, it may have been closed and + * reopened since the last time). In future, if there is a + * function for removing sockets from WaitEventSet, then we + * could add and remove just the socket each time, potentially + * avoiding some system calls. + */ + Assert(wait_fd != PGINVALID_SOCKET); + rc = WaitLatchOrSocket(&walrcv->latch, + WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_LATCH_SET, + wait_fd, + NAPTIME_PER_CYCLE); + if (rc & WL_LATCH_SET) + { + ResetLatch(&walrcv->latch); + if (walrcv->force_reply) + { + /* + * The recovery process has asked us to send apply + * feedback now. Make sure the flag is really set to + * false in shared memory before sending the reply, + * so we don't miss a new request for a reply. + */ + walrcv->force_reply = false; + pg_memory_barrier(); + XLogWalRcvSendReply(true, false); + } + } + if (rc & WL_POSTMASTER_DEATH) + { + /* + * Emergency bailout if postmaster has died. This is to + * avoid the necessity for manual cleanup of all + * postmaster children. + */ + exit(1); + } + if (rc & WL_TIMEOUT) { /* * We didn't receive anything new. If we haven't heard @@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver main loop. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvForceReply(void) +{ + WalRcv->force_reply = true; + SetLatch(&WalRcv->latch); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65a6cd4404f..06cb1660eb6 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", "remote_write", and "local" are documented, we - * accept all the likely variants of "on" and "off". + * Although only "on", "off", "remote_apply", "remote_write", and "local" are + * documented, we accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5536012e050..ec4427f2d88 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -177,7 +177,7 @@ # (change requires restart) #fsync = on # turns forced synchronization on or off #synchronous_commit = on # synchronization level; - # off, local, remote_write, or on + # off, local, remote_write, remote_apply, or on #wal_sync_method = fsync # the default is the first option # supported by the operating system: # open_datasync diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 3ba23f5e87b..503ae1b82d7 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,9 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote + * apply */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionApplyFeedback(xinfo) \ + ((xinfo & XACT_COMPLETION_APPLY_FEEDBACK) != 0) #define XactCompletionRelcacheInitFileInval(xinfo) \ ((xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) != 0) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 74a139496e6..a7dcdae67f8 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -267,6 +267,8 @@ extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogRequestWalReceiverReply(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index e8b64dd4262..aa2f074201a 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD088 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD089 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059bc3fa..c005a425836 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,8 +23,9 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 3 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 @@ -35,7 +36,7 @@ extern char *SyncRepStandbyNames; /* called by user backend */ -extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); +extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6eacb095d1b..36bcb471720 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -113,9 +113,16 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ /* + * force walreceiver reply? This doesn't need to be locked; memory + * barriers for ordering are sufficient. + */ + bool force_reply; + + /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and - * receiveStartTLI). + * receiveStartTLI), and also to tell it to send apply feedback to the + * primary whenever specially marked commit records are applied. */ Latch latch; } WalRcvData; @@ -138,7 +145,7 @@ extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; -typedef int (*walrcv_receive_type) (int timeout, char **buffer); +typedef int (*walrcv_receive_type) (char **buffer, int *wait_fd); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_send_type) (const char *buffer, int nbytes); @@ -162,5 +169,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvForceReply(void); #endif /* _WALRECEIVER_H */ |