diff options
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/aio/README.md | 5 | ||||
-rw-r--r-- | src/backend/storage/aio/aio.c | 17 | ||||
-rw-r--r-- | src/backend/storage/aio/aio_callback.c | 7 | ||||
-rw-r--r-- | src/backend/storage/aio/method_io_uring.c | 216 | ||||
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 69 | ||||
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 31 | ||||
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 30 | ||||
-rw-r--r-- | src/backend/storage/file/fd.c | 19 | ||||
-rw-r--r-- | src/backend/storage/ipc/dsm_registry.c | 314 | ||||
-rw-r--r-- | src/backend/storage/ipc/latch.c | 8 | ||||
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 72 | ||||
-rw-r--r-- | src/backend/storage/ipc/procsignal.c | 6 | ||||
-rw-r--r-- | src/backend/storage/ipc/shmem.c | 4 | ||||
-rw-r--r-- | src/backend/storage/ipc/standby.c | 4 | ||||
-rw-r--r-- | src/backend/storage/lmgr/generate-lwlocknames.pl | 113 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 20 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 48 | ||||
-rw-r--r-- | src/backend/storage/lmgr/predicate.c | 11 |
18 files changed, 748 insertions, 246 deletions
diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md index f10b5c7e31e..72ae3b3737d 100644 --- a/src/backend/storage/aio/README.md +++ b/src/backend/storage/aio/README.md @@ -94,7 +94,7 @@ pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV, 0); * * In this example we're reading only a single buffer, hence the 1. */ -pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); +pgaio_io_set_handle_data_32(ioh, (uint32 *) &buffer, 1); /* * Pass the AIO handle to lower-level function. When operating on the level of @@ -119,8 +119,9 @@ pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); * e.g. due to reaching a limit on the number of unsubmitted IOs, and even * complete before smgrstartreadv() returns. */ +void *page = BufferGetBlock(buffer); smgrstartreadv(ioh, operation->smgr, forknum, blkno, - BufferGetBlock(buffer), 1); + &page, 1); /* * To benefit from AIO, it is beneficial to perform other work, including diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 6c6c0a908e2..3643f27ad6e 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -556,6 +556,13 @@ bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state) { *state = ioh->state; + + /* + * Ensure that we don't see an earlier state of the handle than ioh->state + * due to compiler or CPU reordering. This protects both ->generation as + * directly used here, and other fields in the handle accessed in the + * caller if the handle was not reused. + */ pg_read_barrier(); return ioh->generation != ref_generation; @@ -773,7 +780,12 @@ pgaio_io_wait_for_free(void) * Note that no interrupts are processed between the state check * and the call to reclaim - that's important as otherwise an * interrupt could have already reclaimed the handle. + * + * Need to ensure that there's no reordering, in the more common + * paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). */ + pg_read_barrier(); pgaio_io_reclaim(ioh); reclaimed++; } @@ -852,7 +864,12 @@ pgaio_io_wait_for_free(void) * check and the call to reclaim - that's important as * otherwise an interrupt could have already reclaimed the * handle. + * + * Need to ensure that there's no reordering, in the more + * common paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). */ + pg_read_barrier(); pgaio_io_reclaim(ioh); break; } diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 0ad9795bb7e..03c9bba0802 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -256,6 +256,9 @@ pgaio_io_call_complete_shared(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_shared(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } ioh->distilled_result = result; @@ -290,6 +293,7 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) /* start with distilled result from shared callback */ result = ioh->distilled_result; + Assert(result.status != PGAIO_RS_UNKNOWN); for (int i = ioh->num_callbacks; i > 0; i--) { @@ -306,6 +310,9 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_local(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } /* diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index cc312b641ca..0a8c054162f 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -29,6 +29,9 @@ #ifdef IOMETHOD_IO_URING_ENABLED +#include <sys/mman.h> +#include <unistd.h> + #include <liburing.h> #include "miscadmin.h" @@ -94,12 +97,32 @@ PgAioUringContext struct io_uring io_uring_ring; } PgAioUringContext; +/* + * Information about the capabilities that io_uring has. + * + * Depending on liburing and kernel version different features are + * supported. At least for the kernel a kernel version check does not suffice + * as various vendors do backport features to older kernels :(. + */ +typedef struct PgAioUringCaps +{ + bool checked; + /* -1 if io_uring_queue_init_mem() is unsupported */ + int mem_init_size; +} PgAioUringCaps; + + /* PgAioUringContexts for all backends */ static PgAioUringContext *pgaio_uring_contexts; /* the current backend's context */ static PgAioUringContext *pgaio_my_uring_context; +static PgAioUringCaps pgaio_uring_caps = +{ + .checked = false, + .mem_init_size = -1, +}; static uint32 pgaio_uring_procs(void) @@ -111,16 +134,145 @@ pgaio_uring_procs(void) return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; } -static Size +/* + * Initializes pgaio_uring_caps, unless that's already done. + */ +static void +pgaio_uring_check_capabilities(void) +{ + if (pgaio_uring_caps.checked) + return; + + /* + * By default io_uring creates a shared memory mapping for each io_uring + * instance, leading to a large number of memory mappings. Unfortunately a + * large number of memory mappings slows things down, backend exit is + * particularly affected. To address that, newer kernels (6.5) support + * using user-provided memory for the memory, by putting the relevant + * memory into shared memory we don't need any additional mappings. + * + * To know whether this is supported, we unfortunately need to probe the + * kernel by trying to create a ring with userspace-provided memory. This + * also has a secondary benefit: We can determine precisely how much + * memory we need for each io_uring instance. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + { + struct io_uring test_ring; + size_t ring_size; + void *ring_ptr; + struct io_uring_params p = {0}; + int ret; + + /* + * Liburing does not yet provide an API to query how much memory a + * ring will need. So we over-estimate it here. As the memory is freed + * just below that's small temporary waste of memory. + * + * 1MB is more than enough for rings within io_max_concurrency's + * range. + */ + ring_size = 1024 * 1024; + + /* + * Hard to believe a system exists where 1MB would not be a multiple + * of the page size. But it's cheap to ensure... + */ + ring_size -= ring_size % sysconf(_SC_PAGESIZE); + + ring_ptr = mmap(NULL, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + if (ring_ptr == MAP_FAILED) + elog(ERROR, + "mmap(%zu) to determine io_uring_queue_init_mem() support failed: %m", + ring_size); + + ret = io_uring_queue_init_mem(io_max_concurrency, &test_ring, &p, ring_ptr, ring_size); + if (ret > 0) + { + pgaio_uring_caps.mem_init_size = ret; + + elog(DEBUG1, + "can use combined memory mapping for io_uring, each ring needs %d bytes", + ret); + + /* clean up the created ring, it was just for a test */ + io_uring_queue_exit(&test_ring); + } + else + { + /* + * There are different reasons for ring creation to fail, but it's + * ok to treat that just as io_uring_queue_init_mem() not being + * supported. We'll report a more detailed error in + * pgaio_uring_shmem_init(). + */ + errno = -ret; + elog(DEBUG1, + "cannot use combined memory mapping for io_uring, ring creation failed: %m"); + + } + + if (munmap(ring_ptr, ring_size) != 0) + elog(ERROR, "munmap() failed: %m"); + } +#else + { + elog(DEBUG1, + "can't use combined memory mapping for io_uring, kernel or liburing too old"); + } +#endif + + pgaio_uring_caps.checked = true; +} + +/* + * Memory for all PgAioUringContext instances + */ +static size_t pgaio_uring_context_shmem_size(void) { return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext)); } +/* + * Memory for the combined memory used by io_uring instances. Returns 0 if + * that is not supported by kernel/liburing. + */ +static size_t +pgaio_uring_ring_shmem_size(void) +{ + size_t sz = 0; + + if (pgaio_uring_caps.mem_init_size > 0) + { + /* + * Memory for rings needs to be allocated to the page boundary, + * reserve space. Luckily it does not need to be aligned to hugepage + * boundaries, even if huge pages are used. + */ + sz = add_size(sz, sysconf(_SC_PAGESIZE)); + sz = add_size(sz, mul_size(pgaio_uring_procs(), + pgaio_uring_caps.mem_init_size)); + } + + return sz; +} + static size_t pgaio_uring_shmem_size(void) { - return pgaio_uring_context_shmem_size(); + size_t sz; + + /* + * Kernel and liburing support for various features influences how much + * shmem we need, perform the necessary checks. + */ + pgaio_uring_check_capabilities(); + + sz = pgaio_uring_context_shmem_size(); + sz = add_size(sz, pgaio_uring_ring_shmem_size()); + + return sz; } static void @@ -128,13 +280,38 @@ pgaio_uring_shmem_init(bool first_time) { int TotalProcs = pgaio_uring_procs(); bool found; + char *shmem; + size_t ring_mem_remain = 0; + char *ring_mem_next = 0; - pgaio_uring_contexts = (PgAioUringContext *) - ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found); - + /* + * We allocate memory for all PgAioUringContext instances and, if + * supported, the memory required for each of the io_uring instances, in + * one ShmemInitStruct(). + */ + shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found); if (found) return; + pgaio_uring_contexts = (PgAioUringContext *) shmem; + shmem += pgaio_uring_context_shmem_size(); + + /* if supported, handle memory alignment / sizing for io_uring memory */ + if (pgaio_uring_caps.mem_init_size > 0) + { + ring_mem_remain = pgaio_uring_ring_shmem_size(); + ring_mem_next = (char *) shmem; + + /* align to page boundary, see also pgaio_uring_ring_shmem_size() */ + ring_mem_next = (char *) TYPEALIGN(sysconf(_SC_PAGESIZE), ring_mem_next); + + /* account for alignment */ + ring_mem_remain -= ring_mem_next - shmem; + shmem += ring_mem_next - shmem; + + shmem += ring_mem_remain; + } + for (int contextno = 0; contextno < TotalProcs; contextno++) { PgAioUringContext *context = &pgaio_uring_contexts[contextno]; @@ -158,7 +335,28 @@ pgaio_uring_shmem_init(bool first_time) * be worth using that - also need to evaluate if that causes * noticeable additional contention? */ - ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + + /* + * If supported (c.f. pgaio_uring_check_capabilities()), create ring + * with its data in shared memory. Otherwise fall back io_uring + * creating a memory mapping for each ring. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + if (pgaio_uring_caps.mem_init_size > 0) + { + struct io_uring_params p = {0}; + + ret = io_uring_queue_init_mem(io_max_concurrency, &context->io_uring_ring, &p, ring_mem_next, ring_mem_remain); + + ring_mem_remain -= ret; + ring_mem_next += ret; + } + else +#endif + { + ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + } + if (ret < 0) { char *hint = NULL; @@ -400,9 +598,9 @@ pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation) while (true) { pgaio_debug_io(DEBUG3, ioh, - "wait_one io_gen: %llu, ref_gen: %llu, cycle %d", - (long long unsigned) ioh->generation, - (long long unsigned) ref_generation, + "wait_one io_gen: %" PRIu64 ", ref_gen: %" PRIu64 ", cycle %d", + ioh->generation, + ref_generation, waited); if (pgaio_io_was_recycled(ioh, ref_generation, &state) || diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 743cccc2acd..bf8f77e6ff6 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -52,26 +52,26 @@ #define IO_WORKER_WAKEUP_FANOUT 2 -typedef struct AioWorkerSubmissionQueue +typedef struct PgAioWorkerSubmissionQueue { uint32 size; uint32 mask; uint32 head; uint32 tail; - uint32 ios[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerSubmissionQueue; + uint32 sqes[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerSubmissionQueue; -typedef struct AioWorkerSlot +typedef struct PgAioWorkerSlot { Latch *latch; bool in_use; -} AioWorkerSlot; +} PgAioWorkerSlot; -typedef struct AioWorkerControl +typedef struct PgAioWorkerControl { uint64 idle_worker_mask; - AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerControl; + PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerControl; static size_t pgaio_worker_shmem_size(void); @@ -96,8 +96,8 @@ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; -static AioWorkerSubmissionQueue *io_worker_submission_queue; -static AioWorkerControl *io_worker_control; +static PgAioWorkerSubmissionQueue *io_worker_submission_queue; +static PgAioWorkerControl *io_worker_control; static size_t @@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size) /* Round size up to next power of two so we can make a mask. */ *queue_size = pg_nextpower2_32(io_worker_queue_size); - return offsetof(AioWorkerSubmissionQueue, ios) + + return offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(uint32) * *queue_size; } static size_t pgaio_worker_control_shmem_size(void) { - return offsetof(AioWorkerControl, workers) + - sizeof(AioWorkerSlot) * MAX_IO_WORKERS; + return offsetof(PgAioWorkerControl, workers) + + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS; } static size_t @@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time) } static int -pgaio_choose_idle_worker(void) +pgaio_worker_choose_idle(void) { int worker; @@ -172,6 +172,7 @@ pgaio_choose_idle_worker(void) /* Find the lowest bit position, and clear it. */ worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); + Assert(io_worker_control->workers[worker].in_use); return worker; } @@ -179,7 +180,7 @@ pgaio_choose_idle_worker(void) static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 new_head; queue = io_worker_submission_queue; @@ -191,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) return false; /* full */ } - queue->ios[queue->head] = pgaio_io_get_id(ioh); + queue->sqes[queue->head] = pgaio_io_get_id(ioh); queue->head = new_head; return true; @@ -200,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) static uint32 pgaio_worker_submission_queue_consume(void) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 result; queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ - result = queue->ios[queue->tail]; + result = queue->sqes[queue->tail]; queue->tail = (queue->tail + 1) & (queue->size - 1); return result; @@ -240,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) } static void -pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) +pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; Latch *wakeup = NULL; int worker; - Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); + Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < nios; ++i) + for (int i = 0; i < num_staged_ios; ++i) { - Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); - if (!pgaio_worker_submission_queue_insert(ios[i])) + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) { /* * We'll do it synchronously, but only after we've sent as many as * we can to workers, to maximize concurrency. */ - synchronous_ios[nsync++] = ios[i]; + synchronous_ios[nsync++] = staged_ios[i]; continue; } if (wakeup == NULL) { /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_choose_idle_worker(); + worker = pgaio_worker_choose_idle(); if (worker >= 0) wakeup = io_worker_control->workers[worker].latch; - pgaio_debug_io(DEBUG4, ios[i], + pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } @@ -316,6 +317,7 @@ pgaio_worker_die(int code, Datum arg) Assert(io_worker_control->workers[MyIoWorkerId].in_use); Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].in_use = false; io_worker_control->workers[MyIoWorkerId].latch = NULL; LWLockRelease(AioWorkerSubmissionQueueLock); @@ -461,7 +463,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) int nwakeups = 0; int worker; - /* Try to get a job to do. */ + /* + * Try to get a job to do. + * + * The lwlock acquisition also provides the necessary memory barrier + * to ensure that we don't see an outdated data in the handle. + */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) { @@ -483,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) IO_WORKER_WAKEUP_FANOUT); for (int i = 0; i < nwakeups; ++i) { - if ((worker = pgaio_choose_idle_worker()) < 0) + if ((worker = pgaio_worker_choose_idle()) < 0) break; latches[nlatches++] = io_worker_control->workers[worker].latch; } @@ -568,6 +575,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) } CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } } error_context_stack = errcallback.previous; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 667aa0c0c78..67431208e7f 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -2743,12 +2743,10 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, * because mdread doesn't complain about reads beyond EOF (when * zero_damaged_pages is ON) and so a previous attempt to read a block * beyond EOF could have left a "valid" zero-filled buffer. - * Unfortunately, we have also seen this case occurring because of - * buggy Linux kernels that sometimes return an lseek(SEEK_END) result - * that doesn't account for a recent write. In that situation, the - * pre-existing buffer would contain valid data that we don't want to - * overwrite. Since the legitimate cases should always have left a - * zero-filled buffer, complain if not PageIsNew. + * + * This has also been observed when relation was overwritten by + * external process. Since the legitimate cases should always have + * left a zero-filled buffer, complain if not PageIsNew. */ if (existing_id >= 0) { @@ -2778,8 +2776,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, ereport(ERROR, (errmsg("unexpected data beyond EOF in block %u of relation %s", existing_hdr->tag.blockNum, - relpath(bmr.smgr->smgr_rlocator, fork).str), - errhint("This has been seen to occur with buggy kernels; consider updating your system."))); + relpath(bmr.smgr->smgr_rlocator, fork).str))); /* * We *must* do smgr[zero]extend before succeeding, else the page @@ -3339,10 +3336,10 @@ UnpinBufferNoOwner(BufferDesc *buf) * BufferSync -- Write out all dirty buffers in the pool. * * This is called at checkpoint time to write out all dirty shared buffers. - * The checkpoint request flags should be passed in. If CHECKPOINT_IMMEDIATE - * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN, - * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even - * unlogged buffers, which are otherwise skipped. The remaining flags + * The checkpoint request flags should be passed in. If CHECKPOINT_FAST is + * set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN, + * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_UNLOGGED is set, we write + * even unlogged buffers, which are otherwise skipped. The remaining flags * currently have no effect here. */ static void @@ -3367,7 +3364,7 @@ BufferSync(int flags) * recovery, we write all dirty buffers. */ if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY | - CHECKPOINT_FLUSH_ALL)))) + CHECKPOINT_FLUSH_UNLOGGED)))) mask |= BM_PERMANENT; /* @@ -4550,11 +4547,9 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, if (RelFileLocatorBackendIsTemp(rlocator)) { if (rlocator.backend == MyProcNumber) - { - for (j = 0; j < nforks; j++) - DropRelationLocalBuffers(rlocator.locator, forkNum[j], - firstDelBlock[j]); - } + DropRelationLocalBuffers(rlocator.locator, forkNum, nforks, + firstDelBlock); + return; } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index ba26627f7b0..3c0d20f4659 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -660,10 +660,11 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) * See DropRelationBuffers in bufmgr.c for more notes. */ void -DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum, - BlockNumber firstDelBlock) +DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum, + int nforks, BlockNumber *firstDelBlock) { int i; + int j; for (i = 0; i < NLocBuffer; i++) { @@ -672,12 +673,18 @@ DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum, buf_state = pg_atomic_read_u32(&bufHdr->state); - if ((buf_state & BM_TAG_VALID) && - BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) && - BufTagGetForkNum(&bufHdr->tag) == forkNum && - bufHdr->tag.blockNum >= firstDelBlock) + if (!(buf_state & BM_TAG_VALID) || + !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator)) + continue; + + for (j = 0; j < nforks; j++) { - InvalidateLocalBuffer(bufHdr, true); + if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] && + bufHdr->tag.blockNum >= firstDelBlock[j]) + { + InvalidateLocalBuffer(bufHdr, true); + break; + } } } } @@ -925,10 +932,11 @@ GetLocalBufferStorage(void) num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ); /* Buffers should be I/O aligned. */ - cur_block = (char *) - TYPEALIGN(PG_IO_ALIGN_SIZE, - MemoryContextAlloc(LocalBufferContext, - num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE)); + cur_block = MemoryContextAllocAligned(LocalBufferContext, + num_bufs * BLCKSZ, + PG_IO_ALIGN_SIZE, + 0); + next_buf_in_block = 0; num_bufs_in_block = num_bufs; } diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 0e8299dd556..a4ec7959f31 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -400,25 +400,22 @@ pg_fsync(int fd) * portable, even if it runs ok on the current system. * * We assert here that a descriptor for a file was opened with write - * permissions (either O_RDWR or O_WRONLY) and for a directory without - * write permissions (O_RDONLY). + * permissions (i.e., not O_RDONLY) and for a directory without write + * permissions (O_RDONLY). Notice that the assertion check is made even + * if fsync() is disabled. * - * Ignore any fstat errors and let the follow-up fsync() do its work. - * Doing this sanity check here counts for the case where fsync() is - * disabled. + * If fstat() fails, ignore it and let the follow-up fsync() complain. */ if (fstat(fd, &st) == 0) { int desc_flags = fcntl(fd, F_GETFL); - /* - * O_RDONLY is historically 0, so just make sure that for directories - * no write flags are used. - */ + desc_flags &= O_ACCMODE; + if (S_ISDIR(st.st_mode)) - Assert((desc_flags & (O_RDWR | O_WRONLY)) == 0); + Assert(desc_flags == O_RDONLY); else - Assert((desc_flags & (O_RDWR | O_WRONLY)) != 0); + Assert(desc_flags != O_RDONLY); } errno = 0; #endif diff --git a/src/backend/storage/ipc/dsm_registry.c b/src/backend/storage/ipc/dsm_registry.c index 1d4fd31ffed..1682cc6d34c 100644 --- a/src/backend/storage/ipc/dsm_registry.c +++ b/src/backend/storage/ipc/dsm_registry.c @@ -15,6 +15,20 @@ * current backend. This function guarantees that only one backend * initializes the segment and that all other backends just attach it. * + * A DSA can be created in or retrieved from the registry by calling + * GetNamedDSA(). As with GetNamedDSMSegment(), if a DSA with the provided + * name does not yet exist, it is created. Otherwise, GetNamedDSA() + * ensures the DSA is attached to the current backend. This function + * guarantees that only one backend initializes the DSA and that all other + * backends just attach it. + * + * A dshash table can be created in or retrieved from the registry by + * calling GetNamedDSHash(). As with GetNamedDSMSegment(), if a hash + * table with the provided name does not yet exist, it is created. + * Otherwise, GetNamedDSHash() ensures the hash table is attached to the + * current backend. This function guarantees that only one backend + * initializes the table and that all other backends just attach it. + * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -26,12 +40,20 @@ #include "postgres.h" +#include "funcapi.h" #include "lib/dshash.h" #include "storage/dsm_registry.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "utils/builtins.h" #include "utils/memutils.h" +#define DSMR_NAME_LEN 128 + +#define DSMR_DSA_TRANCHE_SUFFIX " DSA" +#define DSMR_DSA_TRANCHE_SUFFIX_LEN (sizeof(DSMR_DSA_TRANCHE_SUFFIX) - 1) +#define DSMR_DSA_TRANCHE_NAME_LEN (DSMR_NAME_LEN + DSMR_DSA_TRANCHE_SUFFIX_LEN) + typedef struct DSMRegistryCtxStruct { dsa_handle dsah; @@ -40,15 +62,55 @@ typedef struct DSMRegistryCtxStruct static DSMRegistryCtxStruct *DSMRegistryCtx; -typedef struct DSMRegistryEntry +typedef struct NamedDSMState { - char name[64]; dsm_handle handle; size_t size; +} NamedDSMState; + +typedef struct NamedDSAState +{ + dsa_handle handle; + int tranche; + char tranche_name[DSMR_DSA_TRANCHE_NAME_LEN]; +} NamedDSAState; + +typedef struct NamedDSHState +{ + NamedDSAState dsa; + dshash_table_handle handle; + int tranche; + char tranche_name[DSMR_NAME_LEN]; +} NamedDSHState; + +typedef enum DSMREntryType +{ + DSMR_ENTRY_TYPE_DSM, + DSMR_ENTRY_TYPE_DSA, + DSMR_ENTRY_TYPE_DSH, +} DSMREntryType; + +static const char *const DSMREntryTypeNames[] = +{ + [DSMR_ENTRY_TYPE_DSM] = "segment", + [DSMR_ENTRY_TYPE_DSA] = "area", + [DSMR_ENTRY_TYPE_DSH] = "hash", +}; + +typedef struct DSMRegistryEntry +{ + char name[DSMR_NAME_LEN]; + DSMREntryType type; + union + { + NamedDSMState dsm; + NamedDSAState dsa; + NamedDSHState dsh; + } data; } DSMRegistryEntry; static const dshash_parameters dsh_params = { - offsetof(DSMRegistryEntry, handle), + offsetof(DSMRegistryEntry, type), sizeof(DSMRegistryEntry), dshash_strcmp, dshash_strhash, @@ -141,7 +203,7 @@ GetNamedDSMSegment(const char *name, size_t size, ereport(ERROR, (errmsg("DSM segment name cannot be empty"))); - if (strlen(name) >= offsetof(DSMRegistryEntry, handle)) + if (strlen(name) >= offsetof(DSMRegistryEntry, type)) ereport(ERROR, (errmsg("DSM segment name too long"))); @@ -158,32 +220,39 @@ GetNamedDSMSegment(const char *name, size_t size, entry = dshash_find_or_insert(dsm_registry_table, name, found); if (!(*found)) { + NamedDSMState *state = &entry->data.dsm; + dsm_segment *seg; + + entry->type = DSMR_ENTRY_TYPE_DSM; + /* Initialize the segment. */ - dsm_segment *seg = dsm_create(size, 0); + seg = dsm_create(size, 0); dsm_pin_segment(seg); dsm_pin_mapping(seg); - entry->handle = dsm_segment_handle(seg); - entry->size = size; + state->handle = dsm_segment_handle(seg); + state->size = size; ret = dsm_segment_address(seg); if (init_callback) (*init_callback) (ret); } - else if (entry->size != size) - { + else if (entry->type != DSMR_ENTRY_TYPE_DSM) ereport(ERROR, - (errmsg("requested DSM segment size does not match size of " - "existing segment"))); - } + (errmsg("requested DSM segment does not match type of existing entry"))); + else if (entry->data.dsm.size != size) + ereport(ERROR, + (errmsg("requested DSM segment size does not match size of existing segment"))); else { - dsm_segment *seg = dsm_find_mapping(entry->handle); + NamedDSMState *state = &entry->data.dsm; + dsm_segment *seg; /* If the existing segment is not already attached, attach it now. */ + seg = dsm_find_mapping(state->handle); if (seg == NULL) { - seg = dsm_attach(entry->handle); + seg = dsm_attach(state->handle); if (seg == NULL) elog(ERROR, "could not map dynamic shared memory segment"); @@ -198,3 +267,220 @@ GetNamedDSMSegment(const char *name, size_t size, return ret; } + +/* + * Initialize or attach a named DSA. + * + * This routine returns a pointer to the DSA. A new LWLock tranche ID will be + * generated if needed. Note that the lock tranche will be registered with the + * provided name. Also note that this should be called at most once for a + * given DSA in each backend. + */ +dsa_area * +GetNamedDSA(const char *name, bool *found) +{ + DSMRegistryEntry *entry; + MemoryContext oldcontext; + dsa_area *ret; + + Assert(found); + + if (!name || *name == '\0') + ereport(ERROR, + (errmsg("DSA name cannot be empty"))); + + if (strlen(name) >= offsetof(DSMRegistryEntry, type)) + ereport(ERROR, + (errmsg("DSA name too long"))); + + /* Be sure any local memory allocated by DSM/DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Connect to the registry. */ + init_dsm_registry(); + + entry = dshash_find_or_insert(dsm_registry_table, name, found); + if (!(*found)) + { + NamedDSAState *state = &entry->data.dsa; + + entry->type = DSMR_ENTRY_TYPE_DSA; + + /* Initialize the LWLock tranche for the DSA. */ + state->tranche = LWLockNewTrancheId(); + strcpy(state->tranche_name, name); + LWLockRegisterTranche(state->tranche, state->tranche_name); + + /* Initialize the DSA. */ + ret = dsa_create(state->tranche); + dsa_pin(ret); + dsa_pin_mapping(ret); + + /* Store handle for other backends to use. */ + state->handle = dsa_get_handle(ret); + } + else if (entry->type != DSMR_ENTRY_TYPE_DSA) + ereport(ERROR, + (errmsg("requested DSA does not match type of existing entry"))); + else + { + NamedDSAState *state = &entry->data.dsa; + + if (dsa_is_attached(state->handle)) + ereport(ERROR, + (errmsg("requested DSA already attached to current process"))); + + /* Initialize existing LWLock tranche for the DSA. */ + LWLockRegisterTranche(state->tranche, state->tranche_name); + + /* Attach to existing DSA. */ + ret = dsa_attach(state->handle); + dsa_pin_mapping(ret); + } + + dshash_release_lock(dsm_registry_table, entry); + MemoryContextSwitchTo(oldcontext); + + return ret; +} + +/* + * Initialize or attach a named dshash table. + * + * This routine returns the address of the table. The tranche_id member of + * params is ignored; new tranche IDs will be generated if needed. Note that + * the DSA lock tranche will be registered with the provided name with " DSA" + * appended. The dshash lock tranche will be registered with the provided + * name. Also note that this should be called at most once for a given table + * in each backend. + */ +dshash_table * +GetNamedDSHash(const char *name, const dshash_parameters *params, bool *found) +{ + DSMRegistryEntry *entry; + MemoryContext oldcontext; + dshash_table *ret; + + Assert(params); + Assert(found); + + if (!name || *name == '\0') + ereport(ERROR, + (errmsg("DSHash name cannot be empty"))); + + if (strlen(name) >= offsetof(DSMRegistryEntry, type)) + ereport(ERROR, + (errmsg("DSHash name too long"))); + + /* Be sure any local memory allocated by DSM/DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Connect to the registry. */ + init_dsm_registry(); + + entry = dshash_find_or_insert(dsm_registry_table, name, found); + if (!(*found)) + { + NamedDSAState *dsa_state = &entry->data.dsh.dsa; + NamedDSHState *dsh_state = &entry->data.dsh; + dshash_parameters params_copy; + dsa_area *dsa; + + entry->type = DSMR_ENTRY_TYPE_DSH; + + /* Initialize the LWLock tranche for the DSA. */ + dsa_state->tranche = LWLockNewTrancheId(); + sprintf(dsa_state->tranche_name, "%s%s", name, DSMR_DSA_TRANCHE_SUFFIX); + LWLockRegisterTranche(dsa_state->tranche, dsa_state->tranche_name); + + /* Initialize the LWLock tranche for the dshash table. */ + dsh_state->tranche = LWLockNewTrancheId(); + strcpy(dsh_state->tranche_name, name); + LWLockRegisterTranche(dsh_state->tranche, dsh_state->tranche_name); + + /* Initialize the DSA for the hash table. */ + dsa = dsa_create(dsa_state->tranche); + dsa_pin(dsa); + dsa_pin_mapping(dsa); + + /* Initialize the dshash table. */ + memcpy(¶ms_copy, params, sizeof(dshash_parameters)); + params_copy.tranche_id = dsh_state->tranche; + ret = dshash_create(dsa, ¶ms_copy, NULL); + + /* Store handles for other backends to use. */ + dsa_state->handle = dsa_get_handle(dsa); + dsh_state->handle = dshash_get_hash_table_handle(ret); + } + else if (entry->type != DSMR_ENTRY_TYPE_DSH) + ereport(ERROR, + (errmsg("requested DSHash does not match type of existing entry"))); + else + { + NamedDSAState *dsa_state = &entry->data.dsh.dsa; + NamedDSHState *dsh_state = &entry->data.dsh; + dsa_area *dsa; + + /* XXX: Should we verify params matches what table was created with? */ + + if (dsa_is_attached(dsa_state->handle)) + ereport(ERROR, + (errmsg("requested DSHash already attached to current process"))); + + /* Initialize existing LWLock tranches for the DSA and dshash table. */ + LWLockRegisterTranche(dsa_state->tranche, dsa_state->tranche_name); + LWLockRegisterTranche(dsh_state->tranche, dsh_state->tranche_name); + + /* Attach to existing DSA for the hash table. */ + dsa = dsa_attach(dsa_state->handle); + dsa_pin_mapping(dsa); + + /* Attach to existing dshash table. */ + ret = dshash_attach(dsa, params, dsh_state->handle, NULL); + } + + dshash_release_lock(dsm_registry_table, entry); + MemoryContextSwitchTo(oldcontext); + + return ret; +} + +Datum +pg_get_dsm_registry_allocations(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + DSMRegistryEntry *entry; + MemoryContext oldcontext; + dshash_seq_status status; + + InitMaterializedSRF(fcinfo, MAT_SRF_USE_EXPECTED_DESC); + + /* Be sure any local memory allocated by DSM/DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + init_dsm_registry(); + MemoryContextSwitchTo(oldcontext); + + dshash_seq_init(&status, dsm_registry_table, false); + while ((entry = dshash_seq_next(&status)) != NULL) + { + Datum vals[3]; + bool nulls[3] = {0}; + + vals[0] = CStringGetTextDatum(entry->name); + vals[1] = CStringGetTextDatum(DSMREntryTypeNames[entry->type]); + + /* + * Since we can't know the size of DSA/dshash entries without first + * attaching to them, return NULL for those. + */ + if (entry->type == DSMR_ENTRY_TYPE_DSM) + vals[2] = Int64GetDatum(entry->data.dsm.size); + else + nulls[2] = true; + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, vals, nulls); + } + dshash_seq_term(&status); + + return (Datum) 0; +} diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index c6aefd2f688..beadeb5e46a 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -187,9 +187,11 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout, if (!(wakeEvents & WL_LATCH_SET)) latch = NULL; ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch); - ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos, - (wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)), - NULL); + + if (IsUnderPostmaster) + ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos, + (wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)), + NULL); if (WaitEventSetWait(LatchWaitSet, (wakeEvents & WL_TIMEOUT) ? timeout : -1, diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e5b945a9ee3..bf987aed8d3 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1622,58 +1622,6 @@ TransactionIdIsInProgress(TransactionId xid) return false; } -/* - * TransactionIdIsActive -- is xid the top-level XID of an active backend? - * - * This differs from TransactionIdIsInProgress in that it ignores prepared - * transactions, as well as transactions running on the primary if we're in - * hot standby. Also, we ignore subtransactions since that's not needed - * for current uses. - */ -bool -TransactionIdIsActive(TransactionId xid) -{ - bool result = false; - ProcArrayStruct *arrayP = procArray; - TransactionId *other_xids = ProcGlobal->xids; - int i; - - /* - * Don't bother checking a transaction older than RecentXmin; it could not - * possibly still be running. - */ - if (TransactionIdPrecedes(xid, RecentXmin)) - return false; - - LWLockAcquire(ProcArrayLock, LW_SHARED); - - for (i = 0; i < arrayP->numProcs; i++) - { - int pgprocno = arrayP->pgprocnos[i]; - PGPROC *proc = &allProcs[pgprocno]; - TransactionId pxid; - - /* Fetch xid just once - see GetNewTransactionId */ - pxid = UINT32_ACCESS_ONCE(other_xids[i]); - - if (!TransactionIdIsValid(pxid)) - continue; - - if (proc->pid == 0) - continue; /* ignore prepared transactions */ - - if (TransactionIdEquals(pxid, xid)) - { - result = true; - break; - } - } - - LWLockRelease(ProcArrayLock); - - return result; -} - /* * Determine XID horizons. @@ -2866,8 +2814,10 @@ GetRunningTransactionData(void) * * Similar to GetSnapshotData but returns just oldestActiveXid. We include * all PGPROCs with an assigned TransactionId, even VACUUM processes. - * We look at all databases, though there is no need to include WALSender - * since this has no effect on hot standby conflicts. + * + * If allDbs is true, we look at all databases, though there is no need to + * include WALSender since this has no effect on hot standby conflicts. If + * allDbs is false, skip processes attached to other databases. * * This is never executed during recovery so there is no need to look at * KnownAssignedXids. @@ -2875,9 +2825,12 @@ GetRunningTransactionData(void) * We don't worry about updating other counters, we want to keep this as * simple as possible and leave GetSnapshotData() as the primary code for * that bookkeeping. + * + * inCommitOnly indicates getting the oldestActiveXid among the transactions + * in the commit critical section. */ TransactionId -GetOldestActiveTransactionId(void) +GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs) { ProcArrayStruct *arrayP = procArray; TransactionId *other_xids = ProcGlobal->xids; @@ -2904,6 +2857,8 @@ GetOldestActiveTransactionId(void) for (index = 0; index < arrayP->numProcs; index++) { TransactionId xid; + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; /* Fetch xid just once - see GetNewTransactionId */ xid = UINT32_ACCESS_ONCE(other_xids[index]); @@ -2911,6 +2866,13 @@ GetOldestActiveTransactionId(void) if (!TransactionIdIsNormal(xid)) continue; + if (inCommitOnly && + (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0) + continue; + + if (!allDbs && proc->databaseId != MyDatabaseId) + continue; + if (TransactionIdPrecedes(xid, oldestRunningXid)) oldestRunningXid = xid; diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index a9bb540b55a..087821311cc 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -728,7 +728,11 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cancel_key_len) { - Assert(backendPID != 0); + if (backendPID == 0) + { + ereport(LOG, (errmsg("invalid cancel request with PID 0"))); + return; + } /* * See if we have a matching backend. Reading the pss_pid and diff --git a/src/backend/storage/ipc/shmem.c b/src/backend/storage/ipc/shmem.c index c9ae3b45b76..ca3656fc76f 100644 --- a/src/backend/storage/ipc/shmem.c +++ b/src/backend/storage/ipc/shmem.c @@ -679,12 +679,10 @@ pg_get_shmem_allocations_numa(PG_FUNCTION_ARGS) */ for (i = 0; i < shm_ent_page_count; i++) { - volatile uint64 touch pg_attribute_unused(); - page_ptrs[i] = startptr + (i * os_page_size); if (firstNumaTouch) - pg_numa_touch_mem_if_required(touch, page_ptrs[i]); + pg_numa_touch_mem_if_required(page_ptrs[i]); CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 7fa8d9247e0..4222bdab078 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -1376,7 +1376,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) if (xlrec.subxid_overflow) elog(DEBUG2, - "snapshot of %d running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)", + "snapshot of %d running transactions overflowed (lsn %X/%08X oldest xid %u latest complete %u next xid %u)", CurrRunningXacts->xcnt, LSN_FORMAT_ARGS(recptr), CurrRunningXacts->oldestRunningXid, @@ -1384,7 +1384,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) CurrRunningXacts->nextXid); else elog(DEBUG2, - "snapshot of %d+%d running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)", + "snapshot of %d+%d running transaction ids (lsn %X/%08X oldest xid %u latest complete %u next xid %u)", CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt, LSN_FORMAT_ARGS(recptr), CurrRunningXacts->oldestRunningXid, diff --git a/src/backend/storage/lmgr/generate-lwlocknames.pl b/src/backend/storage/lmgr/generate-lwlocknames.pl index 4441b7cba0c..cd3e43c448a 100644 --- a/src/backend/storage/lmgr/generate-lwlocknames.pl +++ b/src/backend/storage/lmgr/generate-lwlocknames.pl @@ -10,7 +10,6 @@ use Getopt::Long; my $output_path = '.'; my $lastlockidx = -1; -my $continue = "\n"; GetOptions('outdir:s' => \$output_path); @@ -28,18 +27,24 @@ print $h "/* there is deliberately not an #ifndef LWLOCKNAMES_H here */\n\n"; # -# First, record the predefined LWLocks listed in wait_event_names.txt. We'll -# cross-check those with the ones in lwlocklist.h. +# First, record the predefined LWLocks and built-in tranches listed in +# wait_event_names.txt. We'll cross-check those with the ones in lwlocklist.h. # +my @wait_event_tranches; my @wait_event_lwlocks; my $record_lwlocks = 0; +my $in_tranches = 0; while (<$wait_event_names>) { chomp; # Check for end marker. - last if /^# END OF PREDEFINED LWLOCKS/; + if (/^# END OF PREDEFINED LWLOCKS/) + { + $in_tranches = 1; + next; + } # Skip comments and empty lines. next if /^#/; @@ -55,13 +60,29 @@ while (<$wait_event_names>) # Go to the next line if we are not yet recording LWLocks. next if not $record_lwlocks; + # Stop recording if we reach another section. + last if /^Section:/; + # Record the LWLock. (my $waiteventname, my $waitevendocsentence) = split(/\t/, $_); - push(@wait_event_lwlocks, $waiteventname); + + if ($in_tranches) + { + push(@wait_event_tranches, $waiteventname); + } + else + { + push(@wait_event_lwlocks, $waiteventname); + } } +# +# While gathering the list of predefined LWLocks, cross-check the lists in +# lwlocklist.h with the wait events we just recorded. +# my $in_comment = 0; -my $i = 0; +my $lwlock_count = 0; +my $tranche_count = 0; while (<$lwlocklist>) { chomp; @@ -82,40 +103,72 @@ while (<$lwlocklist>) next; } - die "unable to parse lwlocklist.h line \"$_\"" - unless /^PG_LWLOCK\((\d+),\s+(\w+)\)$/; + # + # Gather list of predefined LWLocks and cross-check with the wait events. + # + if (/^PG_LWLOCK\((\d+),\s+(\w+)\)$/) + { + my ($lockidx, $lockname) = ($1, $2); - (my $lockidx, my $lockname) = ($1, $2); + die "lwlocklist.h not in order" if $lockidx < $lastlockidx; + die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; - die "lwlocklist.h not in order" if $lockidx < $lastlockidx; - die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; + die "$lockname defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $lwlock_count >= scalar @wait_event_lwlocks; + die "lists of predefined LWLocks do not match (first mismatch at " + . "$wait_event_lwlocks[$lwlock_count] in wait_event_names.txt and " + . "$lockname in lwlocklist.h)" + if $wait_event_lwlocks[$lwlock_count] ne $lockname; - die "$lockname defined in lwlocklist.h but missing from " - . "wait_event_names.txt" - if $i >= scalar @wait_event_lwlocks; - die "lists of predefined LWLocks do not match (first mismatch at " - . "$wait_event_lwlocks[$i] in wait_event_names.txt and $lockname in " - . "lwlocklist.h)" - if $wait_event_lwlocks[$i] ne $lockname; - $i++; + $lwlock_count++; - while ($lastlockidx < $lockidx - 1) + while ($lastlockidx < $lockidx - 1) + { + ++$lastlockidx; + } + $lastlockidx = $lockidx; + + # Add a "Lock" suffix to each lock name, as the C code depends on that. + printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", + $lockname . "Lock"; + + next; + } + + # + # Cross-check the built-in LWLock tranches with the wait events. + # + if (/^PG_LWLOCKTRANCHE\((\w+),\s+(\w+)\)$/) { - ++$lastlockidx; - $continue = ",\n"; + my ($tranche_id, $tranche_name) = ($1, $2); + + die "$tranche_name defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $tranche_count >= scalar @wait_event_tranches; + die + "lists of built-in LWLock tranches do not match (first mismatch at " + . "$wait_event_tranches[$tranche_count] in wait_event_names.txt and " + . "$tranche_name in lwlocklist.h)" + if $wait_event_tranches[$tranche_count] ne $tranche_name; + + $tranche_count++; + + next; } - $lastlockidx = $lockidx; - $continue = ",\n"; - # Add a "Lock" suffix to each lock name, as the C code depends on that - printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", - $lockname . "Lock"; + die "unable to parse lwlocklist.h line \"$_\""; } die - "$wait_event_lwlocks[$i] defined in wait_event_names.txt but missing from " - . "lwlocklist.h" - if $i < scalar @wait_event_lwlocks; + "$wait_event_lwlocks[$lwlock_count] defined in wait_event_names.txt but " + . " missing from lwlocklist.h" + if $lwlock_count < scalar @wait_event_lwlocks; + +die + "$wait_event_tranches[$tranche_count] defined in wait_event_names.txt but " + . "missing from lwlocklist.h" + if $tranche_count < scalar @wait_event_tranches; print $h "\n"; printf $h "#define NUM_INDIVIDUAL_LWLOCKS %s\n", $lastlockidx + 1; diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 2776ceb295b..62f3471448e 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -3539,9 +3539,9 @@ AtPrepare_Locks(void) * but that probably costs more cycles. */ void -PostPrepare_Locks(TransactionId xid) +PostPrepare_Locks(FullTransactionId fxid) { - PGPROC *newproc = TwoPhaseGetDummyProc(xid, false); + PGPROC *newproc = TwoPhaseGetDummyProc(fxid, false); HASH_SEQ_STATUS status; LOCALLOCK *locallock; LOCK *lock; @@ -4324,11 +4324,11 @@ DumpAllLocks(void) * and PANIC anyway. */ void -lock_twophase_recover(TransactionId xid, uint16 info, +lock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; - PGPROC *proc = TwoPhaseGetDummyProc(xid, false); + PGPROC *proc = TwoPhaseGetDummyProc(fxid, false); LOCKTAG *locktag; LOCKMODE lockmode; LOCKMETHODID lockmethodid; @@ -4505,7 +4505,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, * starting up into hot standby mode. */ void -lock_twophase_standby_recover(TransactionId xid, uint16 info, +lock_twophase_standby_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; @@ -4524,7 +4524,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info, if (lockmode == AccessExclusiveLock && locktag->locktag_type == LOCKTAG_RELATION) { - StandbyAcquireAccessExclusiveLock(xid, + StandbyAcquireAccessExclusiveLock(XidFromFullTransactionId(fxid), locktag->locktag_field1 /* dboid */ , locktag->locktag_field2 /* reloid */ ); } @@ -4537,11 +4537,11 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info, * Find and release the lock indicated by the 2PC record. */ void -lock_twophase_postcommit(TransactionId xid, uint16 info, +lock_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; - PGPROC *proc = TwoPhaseGetDummyProc(xid, true); + PGPROC *proc = TwoPhaseGetDummyProc(fxid, true); LOCKTAG *locktag; LOCKMETHODID lockmethodid; LockMethod lockMethodTable; @@ -4563,10 +4563,10 @@ lock_twophase_postcommit(TransactionId xid, uint16 info, * This is actually just the same as the COMMIT case. */ void -lock_twophase_postabort(TransactionId xid, uint16 info, +lock_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { - lock_twophase_postcommit(xid, info, recdata, len); + lock_twophase_postcommit(fxid, info, recdata, len); } /* diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 46f44bc4511..ec9c345ffdf 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -122,9 +122,8 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, * own tranche. We absorb the names of these tranches from there into * BuiltinTrancheNames here. * - * 2. There are some predefined tranches for built-in groups of locks. - * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names - * appear in BuiltinTrancheNames[] below. + * 2. There are some predefined tranches for built-in groups of locks defined + * in lwlocklist.h. We absorb the names of these tranches, too. * * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche * or LWLockRegisterTranche. The names of these that are known in the current @@ -135,49 +134,10 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, */ static const char *const BuiltinTrancheNames[] = { #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname), +#define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname), #include "storage/lwlocklist.h" #undef PG_LWLOCK - [LWTRANCHE_XACT_BUFFER] = "XactBuffer", - [LWTRANCHE_COMMITTS_BUFFER] = "CommitTsBuffer", - [LWTRANCHE_SUBTRANS_BUFFER] = "SubtransBuffer", - [LWTRANCHE_MULTIXACTOFFSET_BUFFER] = "MultiXactOffsetBuffer", - [LWTRANCHE_MULTIXACTMEMBER_BUFFER] = "MultiXactMemberBuffer", - [LWTRANCHE_NOTIFY_BUFFER] = "NotifyBuffer", - [LWTRANCHE_SERIAL_BUFFER] = "SerialBuffer", - [LWTRANCHE_WAL_INSERT] = "WALInsert", - [LWTRANCHE_BUFFER_CONTENT] = "BufferContent", - [LWTRANCHE_REPLICATION_ORIGIN_STATE] = "ReplicationOriginState", - [LWTRANCHE_REPLICATION_SLOT_IO] = "ReplicationSlotIO", - [LWTRANCHE_LOCK_FASTPATH] = "LockFastPath", - [LWTRANCHE_BUFFER_MAPPING] = "BufferMapping", - [LWTRANCHE_LOCK_MANAGER] = "LockManager", - [LWTRANCHE_PREDICATE_LOCK_MANAGER] = "PredicateLockManager", - [LWTRANCHE_PARALLEL_HASH_JOIN] = "ParallelHashJoin", - [LWTRANCHE_PARALLEL_BTREE_SCAN] = "ParallelBtreeScan", - [LWTRANCHE_PARALLEL_QUERY_DSA] = "ParallelQueryDSA", - [LWTRANCHE_PER_SESSION_DSA] = "PerSessionDSA", - [LWTRANCHE_PER_SESSION_RECORD_TYPE] = "PerSessionRecordType", - [LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod", - [LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore", - [LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap", - [LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend", - [LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList", - [LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA", - [LWTRANCHE_PGSTATS_HASH] = "PgStatsHash", - [LWTRANCHE_PGSTATS_DATA] = "PgStatsData", - [LWTRANCHE_LAUNCHER_DSA] = "LogicalRepLauncherDSA", - [LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash", - [LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA", - [LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash", - [LWTRANCHE_COMMITTS_SLRU] = "CommitTsSLRU", - [LWTRANCHE_MULTIXACTOFFSET_SLRU] = "MultixactOffsetSLRU", - [LWTRANCHE_MULTIXACTMEMBER_SLRU] = "MultixactMemberSLRU", - [LWTRANCHE_NOTIFY_SLRU] = "NotifySLRU", - [LWTRANCHE_SERIAL_SLRU] = "SerialSLRU", - [LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU", - [LWTRANCHE_XACT_SLRU] = "XactSLRU", - [LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA", - [LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion", +#undef PG_LWLOCKTRANCHE }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index d82114ffca1..c07fb588355 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -191,7 +191,7 @@ * AtPrepare_PredicateLocks(void); * PostPrepare_PredicateLocks(TransactionId xid); * PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); - * predicatelock_twophase_recover(TransactionId xid, uint16 info, + * predicatelock_twophase_recover(FullTransactionId fxid, uint16 info, * void *recdata, uint32 len); */ @@ -4856,7 +4856,7 @@ AtPrepare_PredicateLocks(void) * anyway. We only need to clean up our local state. */ void -PostPrepare_PredicateLocks(TransactionId xid) +PostPrepare_PredicateLocks(FullTransactionId fxid) { if (MySerializableXact == InvalidSerializableXact) return; @@ -4879,12 +4879,12 @@ PostPrepare_PredicateLocks(TransactionId xid) * commits or aborts. */ void -PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) +PredicateLockTwoPhaseFinish(FullTransactionId fxid, bool isCommit) { SERIALIZABLEXID *sxid; SERIALIZABLEXIDTAG sxidtag; - sxidtag.xid = xid; + sxidtag.xid = XidFromFullTransactionId(fxid); LWLockAcquire(SerializableXactHashLock, LW_SHARED); sxid = (SERIALIZABLEXID *) @@ -4906,10 +4906,11 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) * Re-acquire a predicate lock belonging to a transaction that was prepared. */ void -predicatelock_twophase_recover(TransactionId xid, uint16 info, +predicatelock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhasePredicateRecord *record; + TransactionId xid = XidFromFullTransactionId(fxid); Assert(len == sizeof(TwoPhasePredicateRecord)); |