diff options
author | Alexander Korotkov <akorotkov@postgresql.org> | 2024-04-11 16:30:32 +0300 |
---|---|---|
committer | Alexander Korotkov <akorotkov@postgresql.org> | 2024-04-11 17:28:15 +0300 |
commit | 772faafca1b288c4dd66b7150a7831c27b768003 (patch) | |
tree | 53ccb481bb8e8ca93feff18555734b706aa4ad7c /src/backend/commands | |
parent | 922c4c461d213a422ee7eb6c38e399607539210a (diff) | |
download | postgresql-772faafca1b288c4dd66b7150a7831c27b768003.tar.gz postgresql-772faafca1b288c4dd66b7150a7831c27b768003.zip |
Revert: Implement pg_wal_replay_wait() stored procedure
This commit reverts 06c418e163, e37662f221, bf1e650806, 25f42429e2,
ee79928441, and 74eaf66f98 per review by Heikki Linnakangas.
Discussion: https://postgr.es/m/b155606b-e744-4218-bda5-29379779da1a%40iki.fi
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/Makefile | 3 | ||||
-rw-r--r-- | src/backend/commands/meson.build | 1 | ||||
-rw-r--r-- | src/backend/commands/waitlsn.c | 337 |
3 files changed, 1 insertions, 340 deletions
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cede90c3b98..48f7348f91c 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -61,7 +61,6 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o \ - waitlsn.o + view.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index 7549be5dc3b..6dd00a4abde 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -50,5 +50,4 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', - 'waitlsn.c', ) diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c deleted file mode 100644 index 1a83c34e09f..00000000000 --- a/src/backend/commands/waitlsn.c +++ /dev/null @@ -1,337 +0,0 @@ -/*------------------------------------------------------------------------- - * - * waitlsn.c - * Implements waiting for the given replay LSN, which is used in - * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). - * - * Copyright (c) 2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/backend/commands/waitlsn.c - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include <float.h> -#include <math.h> - -#include "pgstat.h" -#include "access/xlog.h" -#include "access/xlogrecovery.h" -#include "commands/waitlsn.h" -#include "funcapi.h" -#include "miscadmin.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "storage/shmem.h" -#include "utils/fmgrprotos.h" -#include "utils/pg_lsn.h" -#include "utils/snapmgr.h" -#include "utils/wait_event_types.h" - -static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, - void *arg); - -struct WaitLSNState *waitLSN = NULL; - -/* Report the amount of shared memory space needed for WaitLSNState. */ -Size -WaitLSNShmemSize(void) -{ - Size size; - - size = offsetof(WaitLSNState, procInfos); - size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); - return size; -} - -/* Initialize the WaitLSNState in the shared memory. */ -void -WaitLSNShmemInit(void) -{ - bool found; - - waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState", - WaitLSNShmemSize(), - &found); - if (!found) - { - pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); - pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); - memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); - } -} - -/* - * Comparison function for waitLSN->waitersHeap heap. Waiting processes are - * ordered by lsn, so that the waiter with smallest lsn is at the top. - */ -static int -lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) -{ - const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); - const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); - - if (aproc->waitLSN < bproc->waitLSN) - return 1; - else if (aproc->waitLSN > bproc->waitLSN) - return -1; - else - return 0; -} - -/* - * Update waitLSN->minWaitedLSN according to the current state of - * waitLSN->waitersHeap. - */ -static void -updateMinWaitedLSN(void) -{ - XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - - if (!pairingheap_is_empty(&waitLSN->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); - - minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; - } - - pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); -} - -/* - * Put the current process into the heap of LSN waiters. - */ -static void -addLSNWaiter(XLogRecPtr lsn) -{ - WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - Assert(!procInfo->inHeap); - - procInfo->procnum = MyProcNumber; - procInfo->waitLSN = lsn; - - pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); - procInfo->inHeap = true; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Remove the current process from the heap of LSN waiters if it's there. - */ -static void -deleteLSNWaiter(void) -{ - WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - if (!procInfo->inHeap) - { - LWLockRelease(WaitLSNLock); - return; - } - - pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); - procInfo->inHeap = false; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Set latches of LSN waiters whose LSN has been replayed. Set latches of all - * LSN waiters when InvalidXLogRecPtr is given. - */ -void -WaitLSNSetLatches(XLogRecPtr currentLSN) -{ - int i; - int *wakeUpProcNums; - int numWakeUpProcs = 0; - - wakeUpProcNums = palloc(sizeof(int) * MaxBackends); - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - /* - * Iterate the pairing heap of waiting processes till we find LSN not yet - * replayed. Record the process numbers to set their latches later. - */ - while (!pairingheap_is_empty(&waitLSN->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); - WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); - - if (!XLogRecPtrIsInvalid(currentLSN) && - procInfo->waitLSN > currentLSN) - break; - - wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; - (void) pairingheap_remove_first(&waitLSN->waitersHeap); - procInfo->inHeap = false; - } - - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); - - /* - * Set latches for processes, whose waited LSNs are already replayed. This - * involves spinlocks. So, we shouldn't do this under a spinlock. - */ - for (i = 0; i < numWakeUpProcs; i++) - { - PGPROC *backend; - - backend = GetPGProcByNumber(wakeUpProcNums[i]); - SetLatch(&backend->procLatch); - } - pfree(wakeUpProcNums); -} - -/* - * Delete our item from shmem array if any. - */ -void -WaitLSNCleanup(void) -{ - /* - * We do a fast-path check of the 'inHeap' flag without the lock. This - * flag is set to true only by the process itself. So, it's only possible - * to get a false positive. But that will be eliminated by a recheck - * inside deleteLSNWaiter(). - */ - if (waitLSN->procInfos[MyProcNumber].inHeap) - deleteLSNWaiter(); -} - -/* - * Wait using MyLatch till the given LSN is replayed, the postmaster dies or - * timeout happens. - */ -void -WaitForLSN(XLogRecPtr targetLSN, int64 timeout) -{ - XLogRecPtr currentLSN; - TimestampTz endtime = 0; - - /* Shouldn't be called when shmem isn't initialized */ - Assert(waitLSN); - - /* Should be only called by a backend */ - Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); - - if (!RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errhint("Waiting for LSN can only be executed during recovery."))); - - /* If target LSN is already replayed, exit immediately */ - if (targetLSN <= GetXLogReplayRecPtr(NULL)) - return; - - if (timeout > 0) - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); - - addLSNWaiter(targetLSN); - - for (;;) - { - int rc; - int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; - long delay_ms = 0; - - /* Check if the waited LSN has been replayed */ - currentLSN = GetXLogReplayRecPtr(NULL); - if (targetLSN <= currentLSN) - break; - - /* Recheck that recovery is still in-progress */ - if (!RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.", - LSN_FORMAT_ARGS(targetLSN), - LSN_FORMAT_ARGS(currentLSN)))); - - if (timeout > 0) - { - delay_ms = (endtime - GetCurrentTimestamp()) / 1000; - latch_events |= WL_TIMEOUT; - if (delay_ms <= 0) - break; - } - - CHECK_FOR_INTERRUPTS(); - - rc = WaitLatch(MyLatch, latch_events, delay_ms, - WAIT_EVENT_WAIT_FOR_WAL_REPLAY); - - if (rc & WL_LATCH_SET) - ResetLatch(MyLatch); - } - - if (targetLSN > currentLSN) - { - deleteLSNWaiter(); - ereport(ERROR, - (errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", - LSN_FORMAT_ARGS(targetLSN), - LSN_FORMAT_ARGS(currentLSN)))); - } -} - -Datum -pg_wal_replay_wait(PG_FUNCTION_ARGS) -{ - XLogRecPtr target_lsn = PG_GETARG_LSN(0); - int64 timeout = PG_GETARG_INT64(1); - CallContext *context = (CallContext *) fcinfo->context; - - if (timeout < 0) - ereport(ERROR, - (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), - errmsg("\"timeout\" must not be negative"))); - - /* - * We are going to wait for the LSN replay. We should first care that we - * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. - * Otherwise, our snapshot could prevent the replay of WAL records - * implying a kind of self-deadlock. This is the reason why - * pg_wal_replay_wait() is a procedure, not a function. - * - * At first, we check that pg_wal_replay_wait() is called in a non-atomic - * context. That is, a procedure call isn't wrapped into a transaction, - * another procedure call, or a function call. - * - * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic - * context, CallStmt is processed with a snapshot. Thankfully, we can pop - * this snapshot, because PortalRunUtility() can tolerate this. - */ - if (context->atomic) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_wal_replay_wait() must be only called in non-atomic context"), - errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function."))); - - if (ActiveSnapshotSet()) - PopActiveSnapshot(); - Assert(!ActiveSnapshotSet()); - InvalidateCatalogSnapshot(); - Assert(MyProc->xmin == InvalidTransactionId); - - (void) WaitForLSN(target_lsn, timeout); - - PG_RETURN_VOID(); -} |