diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8bfb0415608..cc3cf7d2147 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -74,16 +74,9 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; -/* libpqreceiver hooks to these when loaded */ -walrcv_connect_type walrcv_connect = NULL; -walrcv_get_conninfo_type walrcv_get_conninfo = NULL; -walrcv_identify_system_type walrcv_identify_system = NULL; -walrcv_startstreaming_type walrcv_startstreaming = NULL; -walrcv_endstreaming_type walrcv_endstreaming = NULL; -walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL; -walrcv_receive_type walrcv_receive = NULL; -walrcv_send_type walrcv_send = NULL; -walrcv_disconnect_type walrcv_disconnect = NULL; +/* libpqwalreceiver connection */ +static WalReceiverConn *wrconn = NULL; +WalReceiverFunctionsType *WalReceiverFunctions = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -286,14 +279,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || - walrcv_get_conninfo == NULL || - walrcv_startstreaming == NULL || - walrcv_endstreaming == NULL || - walrcv_identify_system == NULL || - walrcv_readtimelinehistoryfile == NULL || - walrcv_receive == NULL || walrcv_send == NULL || - walrcv_disconnect == NULL) + if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* @@ -307,14 +293,14 @@ WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); - walrcv_connect(conninfo); + wrconn = walrcv_connect(conninfo, false, "walreceiver"); DisableWalRcvImmediateExit(); /* * Save user-visible connection string. This clobbers the original * conninfo, for security. */ - tmp_conninfo = walrcv_get_conninfo(); + tmp_conninfo = walrcv_get_conninfo(wrconn); SpinLockAcquire(&walrcv->mutex); memset(walrcv->conninfo, 0, MAXCONNINFO); if (tmp_conninfo) @@ -328,12 +314,25 @@ WalReceiverMain(void) first_stream = true; for (;;) { + char *primary_sysid; + char standby_sysid[32]; + /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command, */ EnableWalRcvImmediateExit(); - walrcv_identify_system(&primaryTLI); + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + ereport(ERROR, + (errmsg("database system identifier differs between the primary and standby"), + errdetail("The primary's identifier is %s, the standby's identifier is %s.", + primary_sysid, standby_sysid))); + } DisableWalRcvImmediateExit(); /* @@ -370,7 +369,7 @@ WalReceiverMain(void) * on the new timeline. */ ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(startpointTLI, startpoint, + if (walrcv_startstreaming(wrconn, startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { if (first_stream) @@ -422,7 +421,7 @@ WalReceiverMain(void) } /* See if we can read data immediately */ - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); if (len != 0) { /* @@ -453,7 +452,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -570,7 +569,7 @@ WalReceiverMain(void) * our side, too. */ EnableWalRcvImmediateExit(); - walrcv_endstreaming(&primaryTLI); + walrcv_endstreaming(wrconn, &primaryTLI); DisableWalRcvImmediateExit(); /* @@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) tli))); EnableWalRcvImmediateExit(); - walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); + walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); DisableWalRcvImmediateExit(); /* @@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg) SpinLockRelease(&walrcv->mutex); /* Terminate the connection gracefully. */ - if (walrcv_disconnect != NULL) - walrcv_disconnect(); + if (wrconn != NULL) + walrcv_disconnect(wrconn); /* Wake up the startup process to notice promptly that we're gone */ WakeupRecovery(); @@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) (uint32) (applyPtr >> 32), (uint32) applyPtr, requestReply ? " (reply requested)" : ""); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); } /* @@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed) pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, nextEpoch, 4); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); if (TransactionIdIsValid(xmin)) master_has_standby_xmin = true; else |