aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c59
1 files changed, 29 insertions, 30 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8bfb0415608..cc3cf7d2147 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -74,16 +74,9 @@ int wal_receiver_status_interval;
int wal_receiver_timeout;
bool hot_standby_feedback;
-/* libpqreceiver hooks to these when loaded */
-walrcv_connect_type walrcv_connect = NULL;
-walrcv_get_conninfo_type walrcv_get_conninfo = NULL;
-walrcv_identify_system_type walrcv_identify_system = NULL;
-walrcv_startstreaming_type walrcv_startstreaming = NULL;
-walrcv_endstreaming_type walrcv_endstreaming = NULL;
-walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
-walrcv_receive_type walrcv_receive = NULL;
-walrcv_send_type walrcv_send = NULL;
-walrcv_disconnect_type walrcv_disconnect = NULL;
+/* libpqwalreceiver connection */
+static WalReceiverConn *wrconn = NULL;
+WalReceiverFunctionsType *WalReceiverFunctions = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
@@ -286,14 +279,7 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL ||
- walrcv_get_conninfo == NULL ||
- walrcv_startstreaming == NULL ||
- walrcv_endstreaming == NULL ||
- walrcv_identify_system == NULL ||
- walrcv_readtimelinehistoryfile == NULL ||
- walrcv_receive == NULL || walrcv_send == NULL ||
- walrcv_disconnect == NULL)
+ if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
@@ -307,14 +293,14 @@ WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo);
+ wrconn = walrcv_connect(conninfo, false, "walreceiver");
DisableWalRcvImmediateExit();
/*
* Save user-visible connection string. This clobbers the original
* conninfo, for security.
*/
- tmp_conninfo = walrcv_get_conninfo();
+ tmp_conninfo = walrcv_get_conninfo(wrconn);
SpinLockAcquire(&walrcv->mutex);
memset(walrcv->conninfo, 0, MAXCONNINFO);
if (tmp_conninfo)
@@ -328,12 +314,25 @@ WalReceiverMain(void)
first_stream = true;
for (;;)
{
+ char *primary_sysid;
+ char standby_sysid[32];
+
/*
* Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command,
*/
EnableWalRcvImmediateExit();
- walrcv_identify_system(&primaryTLI);
+ primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
+
+ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ if (strcmp(primary_sysid, standby_sysid) != 0)
+ {
+ ereport(ERROR,
+ (errmsg("database system identifier differs between the primary and standby"),
+ errdetail("The primary's identifier is %s, the standby's identifier is %s.",
+ primary_sysid, standby_sysid)));
+ }
DisableWalRcvImmediateExit();
/*
@@ -370,7 +369,7 @@ WalReceiverMain(void)
* on the new timeline.
*/
ThisTimeLineID = startpointTLI;
- if (walrcv_startstreaming(startpointTLI, startpoint,
+ if (walrcv_startstreaming(wrconn, startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
if (first_stream)
@@ -422,7 +421,7 @@ WalReceiverMain(void)
}
/* See if we can read data immediately */
- len = walrcv_receive(&buf, &wait_fd);
+ len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/*
@@ -453,7 +452,7 @@ WalReceiverMain(void)
endofwal = true;
break;
}
- len = walrcv_receive(&buf, &wait_fd);
+ len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/* Let the master know that we received some data. */
@@ -570,7 +569,7 @@ WalReceiverMain(void)
* our side, too.
*/
EnableWalRcvImmediateExit();
- walrcv_endstreaming(&primaryTLI);
+ walrcv_endstreaming(wrconn, &primaryTLI);
DisableWalRcvImmediateExit();
/*
@@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
tli)));
EnableWalRcvImmediateExit();
- walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+ walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
DisableWalRcvImmediateExit();
/*
@@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg)
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
- if (walrcv_disconnect != NULL)
- walrcv_disconnect();
+ if (wrconn != NULL)
+ walrcv_disconnect(wrconn);
/* Wake up the startup process to notice promptly that we're gone */
WakeupRecovery();
@@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
(uint32) (applyPtr >> 32), (uint32) applyPtr,
requestReply ? " (reply requested)" : "");
- walrcv_send(reply_message.data, reply_message.len);
+ walrcv_send(wrconn, reply_message.data, reply_message.len);
}
/*
@@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed)
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4);
- walrcv_send(reply_message.data, reply_message.len);
+ walrcv_send(wrconn, reply_message.data, reply_message.len);
if (TransactionIdIsValid(xmin))
master_has_standby_xmin = true;
else