aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/launcher.c16
-rw-r--r--src/backend/replication/logical/tablesync.c10
-rw-r--r--src/backend/replication/logical/worker.c34
-rw-r--r--src/backend/tcop/postgres.c5
-rw-r--r--src/include/replication/logicalworker.h2
-rw-r--r--src/include/replication/worker_internal.h4
6 files changed, 50 insertions, 21 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 63d903ac021..345a4152123 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false;
@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
}
/* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
}
/* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
/* Establish signal handlers. */
- pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGHUP, logicalrep_launcher_sighup);
+ pqsignal(SIGTERM, logicalrep_launcher_sigterm);
BackgroundWorkerUnblockSignals();
/* Make it easy to identify our processes. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 515724e1026..85e480db4bd 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
int rc;
char state = origstate;
- while (!got_SIGTERM)
+ for (;;)
{
LogicalRepWorker *worker;
+ CHECK_FOR_INTERRUPTS();
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
relid, false);
@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
bytesread += avail;
}
- while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+ while (maxread > 0 && bytesread < minread)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
ResetLatch(&MyProc->procLatch);
}
- /* Check for exit condition. */
- if (got_SIGTERM)
- proc_exit(0);
-
return bytesread;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ea3ba1d5b48..e31551340c9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -72,6 +72,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
/*
* Should this worker apply changes for given relation.
*
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
- while (!got_SIGTERM)
+ for (;;)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ CHECK_FOR_INTERRUPTS();
+
MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false;
}
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
/* Logical Replication Apply worker entry point */
void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
- /* We should only get here if we received SIGTERM */
proc_exit(0);
}
+
+/*
+ * Is current process a logical replication worker?
+ */
+bool
+IsLogicalWorker(void)
+{
+ return MyLogicalRepWorker != NULL;
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a61d0..13577691505 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -55,6 +55,7 @@
#include "pg_getopt.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (IsLogicalWorker())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating logical replication worker due to administrator command")));
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 3e0affa190b..5877a930f68 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,4 +14,6 @@
extern void ApplyWorkerMain(Datum main_arg);
+extern bool IsLogicalWorker(void);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0654461305b..2bfff5c1205 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,