aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
authorAlexander Korotkov <akorotkov@postgresql.org>2024-04-11 16:30:32 +0300
committerAlexander Korotkov <akorotkov@postgresql.org>2024-04-11 17:28:15 +0300
commit772faafca1b288c4dd66b7150a7831c27b768003 (patch)
tree53ccb481bb8e8ca93feff18555734b706aa4ad7c /src/backend/commands
parent922c4c461d213a422ee7eb6c38e399607539210a (diff)
downloadpostgresql-772faafca1b288c4dd66b7150a7831c27b768003.tar.gz
postgresql-772faafca1b288c4dd66b7150a7831c27b768003.zip
Revert: Implement pg_wal_replay_wait() stored procedure
This commit reverts 06c418e163, e37662f221, bf1e650806, 25f42429e2, ee79928441, and 74eaf66f98 per review by Heikki Linnakangas. Discussion: https://postgr.es/m/b155606b-e744-4218-bda5-29379779da1a%40iki.fi
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/Makefile3
-rw-r--r--src/backend/commands/meson.build1
-rw-r--r--src/backend/commands/waitlsn.c337
3 files changed, 1 insertions, 340 deletions
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index cede90c3b98..48f7348f91c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,7 +61,6 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o \
- waitlsn.o
+ view.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 7549be5dc3b..6dd00a4abde 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,5 +50,4 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
- 'waitlsn.c',
)
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
deleted file mode 100644
index 1a83c34e09f..00000000000
--- a/src/backend/commands/waitlsn.c
+++ /dev/null
@@ -1,337 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * waitlsn.c
- * Implements waiting for the given replay LSN, which is used in
- * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
- *
- * Copyright (c) 2024, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/commands/waitlsn.c
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include <float.h>
-#include <math.h>
-
-#include "pgstat.h"
-#include "access/xlog.h"
-#include "access/xlogrecovery.h"
-#include "commands/waitlsn.h"
-#include "funcapi.h"
-#include "miscadmin.h"
-#include "storage/latch.h"
-#include "storage/proc.h"
-#include "storage/shmem.h"
-#include "utils/fmgrprotos.h"
-#include "utils/pg_lsn.h"
-#include "utils/snapmgr.h"
-#include "utils/wait_event_types.h"
-
-static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
- void *arg);
-
-struct WaitLSNState *waitLSN = NULL;
-
-/* Report the amount of shared memory space needed for WaitLSNState. */
-Size
-WaitLSNShmemSize(void)
-{
- Size size;
-
- size = offsetof(WaitLSNState, procInfos);
- size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
- return size;
-}
-
-/* Initialize the WaitLSNState in the shared memory. */
-void
-WaitLSNShmemInit(void)
-{
- bool found;
-
- waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
- WaitLSNShmemSize(),
- &found);
- if (!found)
- {
- pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
- pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
- memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
- }
-}
-
-/*
- * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
- * ordered by lsn, so that the waiter with smallest lsn is at the top.
- */
-static int
-lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
-{
- const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
- const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
-
- if (aproc->waitLSN < bproc->waitLSN)
- return 1;
- else if (aproc->waitLSN > bproc->waitLSN)
- return -1;
- else
- return 0;
-}
-
-/*
- * Update waitLSN->minWaitedLSN according to the current state of
- * waitLSN->waitersHeap.
- */
-static void
-updateMinWaitedLSN(void)
-{
- XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
-
- if (!pairingheap_is_empty(&waitLSN->waitersHeap))
- {
- pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
-
- minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
- }
-
- pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN);
-}
-
-/*
- * Put the current process into the heap of LSN waiters.
- */
-static void
-addLSNWaiter(XLogRecPtr lsn)
-{
- WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- Assert(!procInfo->inHeap);
-
- procInfo->procnum = MyProcNumber;
- procInfo->waitLSN = lsn;
-
- pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
- procInfo->inHeap = true;
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-}
-
-/*
- * Remove the current process from the heap of LSN waiters if it's there.
- */
-static void
-deleteLSNWaiter(void)
-{
- WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- if (!procInfo->inHeap)
- {
- LWLockRelease(WaitLSNLock);
- return;
- }
-
- pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode);
- procInfo->inHeap = false;
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-}
-
-/*
- * Set latches of LSN waiters whose LSN has been replayed. Set latches of all
- * LSN waiters when InvalidXLogRecPtr is given.
- */
-void
-WaitLSNSetLatches(XLogRecPtr currentLSN)
-{
- int i;
- int *wakeUpProcNums;
- int numWakeUpProcs = 0;
-
- wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- /*
- * Iterate the pairing heap of waiting processes till we find LSN not yet
- * replayed. Record the process numbers to set their latches later.
- */
- while (!pairingheap_is_empty(&waitLSN->waitersHeap))
- {
- pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
- WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
-
- if (!XLogRecPtrIsInvalid(currentLSN) &&
- procInfo->waitLSN > currentLSN)
- break;
-
- wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum;
- (void) pairingheap_remove_first(&waitLSN->waitersHeap);
- procInfo->inHeap = false;
- }
-
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-
- /*
- * Set latches for processes, whose waited LSNs are already replayed. This
- * involves spinlocks. So, we shouldn't do this under a spinlock.
- */
- for (i = 0; i < numWakeUpProcs; i++)
- {
- PGPROC *backend;
-
- backend = GetPGProcByNumber(wakeUpProcNums[i]);
- SetLatch(&backend->procLatch);
- }
- pfree(wakeUpProcNums);
-}
-
-/*
- * Delete our item from shmem array if any.
- */
-void
-WaitLSNCleanup(void)
-{
- /*
- * We do a fast-path check of the 'inHeap' flag without the lock. This
- * flag is set to true only by the process itself. So, it's only possible
- * to get a false positive. But that will be eliminated by a recheck
- * inside deleteLSNWaiter().
- */
- if (waitLSN->procInfos[MyProcNumber].inHeap)
- deleteLSNWaiter();
-}
-
-/*
- * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
- * timeout happens.
- */
-void
-WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
-{
- XLogRecPtr currentLSN;
- TimestampTz endtime = 0;
-
- /* Shouldn't be called when shmem isn't initialized */
- Assert(waitLSN);
-
- /* Should be only called by a backend */
- Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);
-
- if (!RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("recovery is not in progress"),
- errhint("Waiting for LSN can only be executed during recovery.")));
-
- /* If target LSN is already replayed, exit immediately */
- if (targetLSN <= GetXLogReplayRecPtr(NULL))
- return;
-
- if (timeout > 0)
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
-
- addLSNWaiter(targetLSN);
-
- for (;;)
- {
- int rc;
- int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
- long delay_ms = 0;
-
- /* Check if the waited LSN has been replayed */
- currentLSN = GetXLogReplayRecPtr(NULL);
- if (targetLSN <= currentLSN)
- break;
-
- /* Recheck that recovery is still in-progress */
- if (!RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("recovery is not in progress"),
- errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
- LSN_FORMAT_ARGS(targetLSN),
- LSN_FORMAT_ARGS(currentLSN))));
-
- if (timeout > 0)
- {
- delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
- latch_events |= WL_TIMEOUT;
- if (delay_ms <= 0)
- break;
- }
-
- CHECK_FOR_INTERRUPTS();
-
- rc = WaitLatch(MyLatch, latch_events, delay_ms,
- WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
-
- if (rc & WL_LATCH_SET)
- ResetLatch(MyLatch);
- }
-
- if (targetLSN > currentLSN)
- {
- deleteLSNWaiter();
- ereport(ERROR,
- (errcode(ERRCODE_QUERY_CANCELED),
- errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
- LSN_FORMAT_ARGS(targetLSN),
- LSN_FORMAT_ARGS(currentLSN))));
- }
-}
-
-Datum
-pg_wal_replay_wait(PG_FUNCTION_ARGS)
-{
- XLogRecPtr target_lsn = PG_GETARG_LSN(0);
- int64 timeout = PG_GETARG_INT64(1);
- CallContext *context = (CallContext *) fcinfo->context;
-
- if (timeout < 0)
- ereport(ERROR,
- (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
- errmsg("\"timeout\" must not be negative")));
-
- /*
- * We are going to wait for the LSN replay. We should first care that we
- * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
- * Otherwise, our snapshot could prevent the replay of WAL records
- * implying a kind of self-deadlock. This is the reason why
- * pg_wal_replay_wait() is a procedure, not a function.
- *
- * At first, we check that pg_wal_replay_wait() is called in a non-atomic
- * context. That is, a procedure call isn't wrapped into a transaction,
- * another procedure call, or a function call.
- *
- * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
- * context, CallStmt is processed with a snapshot. Thankfully, we can pop
- * this snapshot, because PortalRunUtility() can tolerate this.
- */
- if (context->atomic)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
- errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));
-
- if (ActiveSnapshotSet())
- PopActiveSnapshot();
- Assert(!ActiveSnapshotSet());
- InvalidateCatalogSnapshot();
- Assert(MyProc->xmin == InvalidTransactionId);
-
- (void) WaitForLSN(target_lsn, timeout);
-
- PG_RETURN_VOID();
-}