aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c34
1 files changed, 31 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ea3ba1d5b48..e31551340c9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -72,6 +72,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
/*
* Should this worker apply changes for given relation.
*
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
- while (!got_SIGTERM)
+ for (;;)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ CHECK_FOR_INTERRUPTS();
+
MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false;
}
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
/* Logical Replication Apply worker entry point */
void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
- /* We should only get here if we received SIGTERM */
proc_exit(0);
}
+
+/*
+ * Is current process a logical replication worker?
+ */
+bool
+IsLogicalWorker(void)
+{
+ return MyLogicalRepWorker != NULL;
+}