diff options
Diffstat (limited to 'src/backend/storage/ipc/procsignal.c')
-rw-r--r-- | src/backend/storage/ipc/procsignal.c | 301 |
1 files changed, 290 insertions, 11 deletions
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index fde97a1036b..06e00eae15b 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -20,6 +20,7 @@ #include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -45,25 +46,53 @@ * The flags are actually declared as "volatile sig_atomic_t" for maximum * portability. This should ensure that loads and stores of the flag * values are atomic, allowing us to dispense with any explicit locking. + * + * pss_signalFlags are intended to be set in cases where we don't need to + * keep track of whether or not the target process has handled the signal, + * but sometimes we need confirmation, as when making a global state change + * that cannot be considered complete until all backends have taken notice + * of it. For such use cases, we set a bit in pss_barrierCheckMask and then + * increment the current "barrier generation"; when the new barrier generation + * (or greater) appears in the pss_barrierGeneration flag of every process, + * we know that the message has been received everywhere. */ typedef struct { pid_t pss_pid; sig_atomic_t pss_signalFlags[NUM_PROCSIGNALS]; + pg_atomic_uint64 pss_barrierGeneration; + pg_atomic_uint32 pss_barrierCheckMask; } ProcSignalSlot; /* + * Information that is global to the entire ProcSignal system can be stored + * here. + * + * psh_barrierGeneration is the highest barrier generation in existence. + */ +typedef struct +{ + pg_atomic_uint64 psh_barrierGeneration; + ProcSignalSlot psh_slot[FLEXIBLE_ARRAY_MEMBER]; +} ProcSignalHeader; + +/* * We reserve a slot for each possible BackendId, plus one for each * possible auxiliary process type. (This scheme assumes there is not * more than one of any auxiliary process type at a time.) */ #define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES) -static ProcSignalSlot *ProcSignalSlots = NULL; +/* Check whether the relevant type bit is set in the flags. */ +#define BARRIER_SHOULD_CHECK(flags, type) \ + (((flags) & (((uint32) 1) << (uint32) (type))) != 0) + +static ProcSignalHeader *ProcSignal = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); +static void ProcessBarrierPlaceholder(void); /* * ProcSignalShmemSize @@ -72,7 +101,11 @@ static void CleanupProcSignalState(int status, Datum arg); Size ProcSignalShmemSize(void) { - return NumProcSignalSlots * sizeof(ProcSignalSlot); + Size size; + + size = mul_size(NumProcSignalSlots, sizeof(ProcSignalSlot)); + size = add_size(size, offsetof(ProcSignalHeader, psh_slot)); + return size; } /* @@ -85,12 +118,26 @@ ProcSignalShmemInit(void) Size size = ProcSignalShmemSize(); bool found; - ProcSignalSlots = (ProcSignalSlot *) - ShmemInitStruct("ProcSignalSlots", size, &found); + ProcSignal = (ProcSignalHeader *) + ShmemInitStruct("ProcSignal", size, &found); - /* If we're first, set everything to zeroes */ + /* If we're first, initialize. */ if (!found) - MemSet(ProcSignalSlots, 0, size); + { + int i; + + pg_atomic_init_u64(&ProcSignal->psh_barrierGeneration, 0); + + for (i = 0; i < NumProcSignalSlots; ++i) + { + ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + + slot->pss_pid = 0; + MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags)); + pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0); + } + } } /* @@ -104,10 +151,11 @@ void ProcSignalInit(int pss_idx) { volatile ProcSignalSlot *slot; + uint64 barrier_generation; Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots); - slot = &ProcSignalSlots[pss_idx - 1]; + slot = &ProcSignal->psh_slot[pss_idx - 1]; /* sanity check */ if (slot->pss_pid != 0) @@ -117,6 +165,23 @@ ProcSignalInit(int pss_idx) /* Clear out any leftover signal reasons */ MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t)); + /* + * Initialize barrier state. Since we're a brand-new process, there + * shouldn't be any leftover backend-private state that needs to be + * updated. Therefore, we can broadcast the latest barrier generation + * and disregard any previously-set check bits. + * + * NB: This only works if this initialization happens early enough in the + * startup sequence that we haven't yet cached any state that might need + * to be invalidated. That's also why we have a memory barrier here, to + * be sure that any later reads of memory happen strictly after this. + */ + pg_atomic_write_u32(&slot->pss_barrierCheckMask, 0); + barrier_generation = + pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation); + pg_memory_barrier(); + /* Mark slot with my PID */ slot->pss_pid = MyProcPid; @@ -129,7 +194,7 @@ ProcSignalInit(int pss_idx) /* * CleanupProcSignalState - * Remove current process from ProcSignalSlots + * Remove current process from ProcSignal mechanism * * This function is called via on_shmem_exit() during backend shutdown. */ @@ -139,7 +204,7 @@ CleanupProcSignalState(int status, Datum arg) int pss_idx = DatumGetInt32(arg); volatile ProcSignalSlot *slot; - slot = &ProcSignalSlots[pss_idx - 1]; + slot = &ProcSignal->psh_slot[pss_idx - 1]; Assert(slot == MyProcSignalSlot); /* @@ -161,6 +226,12 @@ CleanupProcSignalState(int status, Datum arg) return; /* XXX better to zero the slot anyway? */ } + /* + * Make this slot look like it's absorbed all possible barriers, so that + * no barrier waits block on it. + */ + pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + slot->pss_pid = 0; } @@ -182,7 +253,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) if (backendId != InvalidBackendId) { - slot = &ProcSignalSlots[backendId - 1]; + slot = &ProcSignal->psh_slot[backendId - 1]; /* * Note: Since there's no locking, it's possible that the target @@ -212,7 +283,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) for (i = NumProcSignalSlots - 1; i >= 0; i--) { - slot = &ProcSignalSlots[i]; + slot = &ProcSignal->psh_slot[i]; if (slot->pss_pid == pid) { @@ -231,6 +302,187 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) } /* + * EmitProcSignalBarrier + * Send a signal to every Postgres process + * + * The return value of this function is the barrier "generation" created + * by this operation. This value can be passed to WaitForProcSignalBarrier + * to wait until it is known that every participant in the ProcSignal + * mechanism has absorbed the signal (or started afterwards). + * + * Note that it would be a bad idea to use this for anything that happens + * frequently, as interrupting every backend could cause a noticeable + * performance hit. + * + * Callers are entitled to assume that this function will not throw ERROR + * or FATAL. + */ +uint64 +EmitProcSignalBarrier(ProcSignalBarrierType type) +{ + uint64 flagbit = UINT64CONST(1) << (uint64) type; + uint64 generation; + + /* + * Set all the flags. + * + * Note that pg_atomic_fetch_or_u32 has full barrier semantics, so this + * is totally ordered with respect to anything the caller did before, and + * anything that we do afterwards. (This is also true of the later call + * to pg_atomic_add_fetch_u64.) + */ + for (int i = 0; i < NumProcSignalSlots; i++) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + + pg_atomic_fetch_or_u32(&slot->pss_barrierCheckMask, flagbit); + } + + /* + * Increment the generation counter. + */ + generation = + pg_atomic_add_fetch_u64(&ProcSignal->psh_barrierGeneration, 1); + + /* + * Signal all the processes, so that they update their advertised barrier + * generation. + * + * Concurrency is not a problem here. Backends that have exited don't + * matter, and new backends that have joined since we entered this function + * must already have current state, since the caller is responsible for + * making sure that the relevant state is entirely visible before calling + * this function in the first place. We still have to wake them up - + * because we can't distinguish between such backends and older backends + * that need to update state - but they won't actually need to change + * any state. + */ + for (int i = NumProcSignalSlots - 1; i >= 0; i--) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + pid_t pid = slot->pss_pid; + + if (pid != 0) + kill(pid, SIGUSR1); + } + + return generation; +} + +/* + * WaitForProcSignalBarrier - wait until it is guaranteed that all changes + * requested by a specific call to EmitProcSignalBarrier() have taken effect. + * + * We expect that the barrier will normally be absorbed very quickly by other + * backends, so we start by waiting just 1/8 of a second and then back off + * by a factor of two every time we time out, to a maximum wait time of + * 1 second. + */ +void +WaitForProcSignalBarrier(uint64 generation) +{ + long timeout = 125L; + + for (int i = NumProcSignalSlots - 1; i >= 0; i--) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + uint64 oldval; + + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + while (oldval < generation) + { + int events; + + CHECK_FOR_INTERRUPTS(); + + events = + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + timeout, WAIT_EVENT_PROC_SIGNAL_BARRIER); + ResetLatch(MyLatch); + + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + if (events & WL_TIMEOUT) + timeout = Min(timeout * 2, 1000L); + } + } + + /* + * The caller is probably calling this function because it wants to + * read the shared state or perform further writes to shared state once + * all backends are known to have absorbed the barrier. However, the + * read of pss_barrierGeneration was performed unlocked; insert a memory + * barrier to separate it from whatever follows. + */ + pg_memory_barrier(); +} + +/* + * Perform global barrier related interrupt checking. + * + * Any backend that participates in ProcSignal signalling must arrange to + * call this function periodically. It is called from CHECK_FOR_INTERRUPTS(), + * which is enough for normal backends, but not necessarily for all types of + * background processes. + */ +void +ProcessProcSignalBarrier(void) +{ + uint64 generation; + uint32 flags; + + /* Exit quickly if there's no work to do. */ + if (!ProcSignalBarrierPending) + return; + ProcSignalBarrierPending = false; + + /* + * Read the current barrier generation, and then get the flags that + * are set for this backend. Note that pg_atomic_exchange_u32 is a full + * barrier, so we're guaranteed that the read of the barrier generation + * happens before we atomically extract the flags, and that any subsequent + * state changes happen afterward. + */ + generation = pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + flags = pg_atomic_exchange_u32(&MyProcSignalSlot->pss_barrierCheckMask, 0); + + /* + * Process each type of barrier. It's important that nothing we call from + * here throws an error, because pss_barrierCheckMask has already been + * cleared. If we jumped out of here before processing all barrier types, + * then we'd forget about the need to do so later. + * + * NB: It ought to be OK to call the barrier-processing functions + * unconditionally, but it's more efficient to call only the ones that + * might need us to do something based on the flags. + */ + if (BARRIER_SHOULD_CHECK(flags, PROCSIGNAL_BARRIER_PLACEHOLDER)) + ProcessBarrierPlaceholder(); + + /* + * State changes related to all types of barriers that might have been + * emitted have now been handled, so we can update our notion of the + * generation to the one we observed before beginning the updates. If + * things have changed further, it'll get fixed up when this function is + * next called. + */ + pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, generation); +} + +static void +ProcessBarrierPlaceholder(void) +{ + /* + * XXX. This is just a placeholder until the first real user of this + * machinery gets committed. Rename PROCSIGNAL_BARRIER_PLACEHOLDER to + * PROCSIGNAL_BARRIER_SOMETHING_ELSE where SOMETHING_ELSE is something + * appropriately descriptive. Get rid of this function and instead have + * ProcessBarrierSomethingElse. Most likely, that function should live + * in the file pertaining to that subsystem, rather than here. + */ +} + +/* * CheckProcSignal - check to see if a particular reason has been * signaled, and clear the signal flag. Should be called after receiving * SIGUSR1. @@ -254,6 +506,27 @@ CheckProcSignal(ProcSignalReason reason) } /* + * CheckProcSignalBarrier - check for new barriers we need to absorb + */ +static bool +CheckProcSignalBarrier(void) +{ + volatile ProcSignalSlot *slot = MyProcSignalSlot; + + if (slot != NULL) + { + uint64 mygen; + uint64 curgen; + + mygen = pg_atomic_read_u64(&slot->pss_barrierGeneration); + curgen = pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + return (mygen != curgen); + } + + return false; +} + +/* * procsignal_sigusr1_handler - handle SIGUSR1 signal. */ void @@ -291,6 +564,12 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignalBarrier()) + { + InterruptPending = true; + ProcSignalBarrierPending = true; + } + SetLatch(MyLatch); latch_sigusr1_handler(); |