diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ea3ba1d5b48..e31551340c9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -72,6 +72,8 @@ #include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" + #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/datum.h" @@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void reread_subscription(void); +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; + /* * Should this worker apply changes for given relation. * @@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!got_SIGTERM) + for (;;) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + CHECK_FOR_INTERRUPTS(); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +logicalrep_worker_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} /* Logical Replication Apply worker entry point */ void @@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg) /* Setup signal handling */ pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Initialise stats to a sanish value */ @@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg) /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); - /* We should only get here if we received SIGTERM */ proc_exit(0); } + +/* + * Is current process a logical replication worker? + */ +bool +IsLogicalWorker(void) +{ + return MyLogicalRepWorker != NULL; +} |