diff options
Diffstat (limited to 'src/backend/postmaster/checkpointer.c')
-rw-r--r-- | src/backend/postmaster/checkpointer.c | 232 |
1 files changed, 179 insertions, 53 deletions
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index fda91ffd1ce..8490148a47d 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -42,6 +42,8 @@ #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogrecovery.h" +#include "catalog/pg_authid.h" +#include "commands/defrem.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -61,6 +63,7 @@ #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" +#include "utils/acl.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/resowner.h" @@ -127,6 +130,13 @@ typedef struct int num_requests; /* current # of requests */ int max_requests; /* allocated array size */ + + int head; /* Index of the first request in the ring + * buffer */ + int tail; /* Index of the last request in the ring + * buffer */ + + /* The ring buffer of pending checkpointer requests */ CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER]; } CheckpointerShmemStruct; @@ -135,6 +145,12 @@ static CheckpointerShmemStruct *CheckpointerShmem; /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */ #define WRITES_PER_ABSORB 1000 +/* Maximum number of checkpointer requests to process in one batch */ +#define CKPT_REQ_BATCH_SIZE 10000 + +/* Max number of requests the checkpointer request queue can hold */ +#define MAX_CHECKPOINT_REQUESTS 10000000 + /* * GUC parameters */ @@ -161,7 +177,7 @@ static pg_time_t last_xlog_switch_time; static void ProcessCheckpointerInterrupts(void); static void CheckArchiveTimeout(void); static bool IsCheckpointOnSchedule(double progress); -static bool ImmediateCheckpointRequested(void); +static bool FastCheckpointRequested(void); static bool CompactCheckpointerRequestQueue(void); static void UpdateSharedMemoryConfig(void); @@ -734,12 +750,12 @@ CheckArchiveTimeout(void) } /* - * Returns true if an immediate checkpoint request is pending. (Note that - * this does not check the *current* checkpoint's IMMEDIATE flag, but whether - * there is one pending behind it.) + * Returns true if a fast checkpoint request is pending. (Note that this does + * not check the *current* checkpoint's FAST flag, but whether there is one + * pending behind it.) */ static bool -ImmediateCheckpointRequested(void) +FastCheckpointRequested(void) { volatile CheckpointerShmemStruct *cps = CheckpointerShmem; @@ -747,7 +763,7 @@ ImmediateCheckpointRequested(void) * We don't need to acquire the ckpt_lck in this case because we're only * looking at a single flag bit. */ - if (cps->ckpt_flags & CHECKPOINT_IMMEDIATE) + if (cps->ckpt_flags & CHECKPOINT_FAST) return true; return false; } @@ -760,7 +776,7 @@ ImmediateCheckpointRequested(void) * checkpoint_completion_target. * * The checkpoint request flags should be passed in; currently the only one - * examined is CHECKPOINT_IMMEDIATE, which disables delays between writes. + * examined is CHECKPOINT_FAST, which disables delays between writes. * * 'progress' is an estimate of how much of the work has been done, as a * fraction between 0.0 meaning none, and 1.0 meaning all done. @@ -778,10 +794,10 @@ CheckpointWriteDelay(int flags, double progress) * Perform the usual duties and take a nap, unless we're behind schedule, * in which case we just try to catch up as quickly as possible. */ - if (!(flags & CHECKPOINT_IMMEDIATE) && + if (!(flags & CHECKPOINT_FAST) && !ShutdownXLOGPending && !ShutdownRequestPending && - !ImmediateCheckpointRequested() && + !FastCheckpointRequested() && IsCheckpointOnSchedule(progress)) { if (ConfigReloadPending) @@ -970,24 +986,75 @@ CheckpointerShmemInit(void) */ MemSet(CheckpointerShmem, 0, size); SpinLockInit(&CheckpointerShmem->ckpt_lck); - CheckpointerShmem->max_requests = NBuffers; + CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS); + CheckpointerShmem->head = CheckpointerShmem->tail = 0; ConditionVariableInit(&CheckpointerShmem->start_cv); ConditionVariableInit(&CheckpointerShmem->done_cv); } } /* + * ExecCheckpoint + * Primary entry point for manual CHECKPOINT commands + * + * This is mainly a wrapper for RequestCheckpoint(). + */ +void +ExecCheckpoint(ParseState *pstate, CheckPointStmt *stmt) +{ + bool fast = true; + bool unlogged = false; + + foreach_ptr(DefElem, opt, stmt->options) + { + if (strcmp(opt->defname, "mode") == 0) + { + char *mode = defGetString(opt); + + if (strcmp(mode, "spread") == 0) + fast = false; + else if (strcmp(mode, "fast") != 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized MODE option \"%s\"", mode), + parser_errposition(pstate, opt->location))); + } + else if (strcmp(opt->defname, "flush_unlogged") == 0) + unlogged = defGetBoolean(opt); + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized CHECKPOINT option \"%s\"", opt->defname), + parser_errposition(pstate, opt->location))); + } + + if (!has_privs_of_role(GetUserId(), ROLE_PG_CHECKPOINT)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + /* translator: %s is name of an SQL command (e.g., CHECKPOINT) */ + errmsg("permission denied to execute %s command", + "CHECKPOINT"), + errdetail("Only roles with privileges of the \"%s\" role may execute this command.", + "pg_checkpoint"))); + + RequestCheckpoint(CHECKPOINT_WAIT | + (fast ? CHECKPOINT_FAST : 0) | + (unlogged ? CHECKPOINT_FLUSH_UNLOGGED : 0) | + (RecoveryInProgress() ? 0 : CHECKPOINT_FORCE)); +} + +/* * RequestCheckpoint * Called in backend processes to request a checkpoint * * flags is a bitwise OR of the following: * CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown. * CHECKPOINT_END_OF_RECOVERY: checkpoint is for end of WAL recovery. - * CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP, + * CHECKPOINT_FAST: finish the checkpoint ASAP, * ignoring checkpoint_completion_target parameter. * CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occurred * since the last one (implied by CHECKPOINT_IS_SHUTDOWN or - * CHECKPOINT_END_OF_RECOVERY). + * CHECKPOINT_END_OF_RECOVERY, and the CHECKPOINT command). * CHECKPOINT_WAIT: wait for completion before returning (otherwise, * just signal checkpointer to do it, and return). * CHECKPOINT_CAUSE_XLOG: checkpoint is requested due to xlog filling. @@ -1009,7 +1076,7 @@ RequestCheckpoint(int flags) * There's no point in doing slow checkpoints in a standalone backend, * because there's no other backends the checkpoint could disrupt. */ - CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE); + CreateCheckPoint(flags | CHECKPOINT_FAST); /* Free all smgr objects, as CheckpointerMain() normally would. */ smgrdestroyall(); @@ -1148,6 +1215,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) { CheckpointerRequest *request; bool too_full; + int insert_pos; if (!IsUnderPostmaster) return false; /* probably shouldn't even get here */ @@ -1171,10 +1239,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) } /* OK, insert request */ - request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++]; + insert_pos = CheckpointerShmem->tail; + request = &CheckpointerShmem->requests[insert_pos]; request->ftag = *ftag; request->type = type; + CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests; + CheckpointerShmem->num_requests++; + /* If queue is more than half full, nudge the checkpointer to empty it */ too_full = (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests / 2); @@ -1216,12 +1288,16 @@ CompactCheckpointerRequestQueue(void) struct CheckpointerSlotMapping { CheckpointerRequest request; - int slot; + int ring_idx; }; - int n, - preserve_count; + int n; int num_skipped = 0; + int head; + int max_requests; + int num_requests; + int read_idx, + write_idx; HASHCTL ctl; HTAB *htab; bool *skip_slot; @@ -1233,8 +1309,13 @@ CompactCheckpointerRequestQueue(void) if (CritSectionCount > 0) return false; + max_requests = CheckpointerShmem->max_requests; + num_requests = CheckpointerShmem->num_requests; + /* Initialize skip_slot array */ - skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests); + skip_slot = palloc0(sizeof(bool) * max_requests); + + head = CheckpointerShmem->head; /* Initialize temporary hash table */ ctl.keysize = sizeof(CheckpointerRequest); @@ -1258,7 +1339,8 @@ CompactCheckpointerRequestQueue(void) * away preceding entries that would end up being canceled anyhow), but * it's not clear that the extra complexity would buy us anything. */ - for (n = 0; n < CheckpointerShmem->num_requests; n++) + read_idx = head; + for (n = 0; n < num_requests; n++) { CheckpointerRequest *request; struct CheckpointerSlotMapping *slotmap; @@ -1271,16 +1353,19 @@ CompactCheckpointerRequestQueue(void) * CheckpointerShmemInit. Note also that RelFileLocator had better * contain no pad bytes. */ - request = &CheckpointerShmem->requests[n]; + request = &CheckpointerShmem->requests[read_idx]; slotmap = hash_search(htab, request, HASH_ENTER, &found); if (found) { /* Duplicate, so mark the previous occurrence as skippable */ - skip_slot[slotmap->slot] = true; + skip_slot[slotmap->ring_idx] = true; num_skipped++; } /* Remember slot containing latest occurrence of this request value */ - slotmap->slot = n; + slotmap->ring_idx = read_idx; + + /* Move to the next request in the ring buffer */ + read_idx = (read_idx + 1) % max_requests; } /* Done with the hash table. */ @@ -1294,17 +1379,34 @@ CompactCheckpointerRequestQueue(void) } /* We found some duplicates; remove them. */ - preserve_count = 0; - for (n = 0; n < CheckpointerShmem->num_requests; n++) + read_idx = write_idx = head; + for (n = 0; n < num_requests; n++) { - if (skip_slot[n]) - continue; - CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n]; + /* If this slot is NOT skipped, keep it */ + if (!skip_slot[read_idx]) + { + /* If the read and write positions are different, copy the request */ + if (write_idx != read_idx) + CheckpointerShmem->requests[write_idx] = + CheckpointerShmem->requests[read_idx]; + + /* Advance the write position */ + write_idx = (write_idx + 1) % max_requests; + } + + read_idx = (read_idx + 1) % max_requests; } + + /* + * Update ring buffer state: head remains the same, tail moves, count + * decreases + */ + CheckpointerShmem->tail = write_idx; + CheckpointerShmem->num_requests -= num_skipped; + ereport(DEBUG1, (errmsg_internal("compacted fsync request queue from %d entries to %d entries", - CheckpointerShmem->num_requests, preserve_count))); - CheckpointerShmem->num_requests = preserve_count; + num_requests, CheckpointerShmem->num_requests))); /* Cleanup. */ pfree(skip_slot); @@ -1325,40 +1427,64 @@ AbsorbSyncRequests(void) { CheckpointerRequest *requests = NULL; CheckpointerRequest *request; - int n; + int n, + i; + bool loop; if (!AmCheckpointerProcess()) return; - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - - /* - * We try to avoid holding the lock for a long time by copying the request - * array, and processing the requests after releasing the lock. - * - * Once we have cleared the requests from shared memory, we have to PANIC - * if we then fail to absorb them (eg, because our hashtable runs out of - * memory). This is because the system cannot run safely if we are unable - * to fsync what we have been told to fsync. Fortunately, the hashtable - * is so small that the problem is quite unlikely to arise in practice. - */ - n = CheckpointerShmem->num_requests; - if (n > 0) + do { - requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest)); - } + LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); + + /*--- + * We try to avoid holding the lock for a long time by: + * 1. Copying the request array and processing the requests after + * releasing the lock; + * 2. Processing not the whole queue, but only batches of + * CKPT_REQ_BATCH_SIZE at once. + * + * Once we have cleared the requests from shared memory, we must + * PANIC if we then fail to absorb them (e.g., because our hashtable + * runs out of memory). This is because the system cannot run safely + * if we are unable to fsync what we have been told to fsync. + * Fortunately, the hashtable is so small that the problem is quite + * unlikely to arise in practice. + * + * Note: The maximum possible size of a ring buffer is + * MAX_CHECKPOINT_REQUESTS entries, which fit into a maximum palloc + * allocation size of 1Gb. Our maximum batch size, + * CKPT_REQ_BATCH_SIZE, is even smaller. + */ + n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE); + if (n > 0) + { + if (!requests) + requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - START_CRIT_SECTION(); + for (i = 0; i < n; i++) + { + requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head]; + CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests; + } - CheckpointerShmem->num_requests = 0; + CheckpointerShmem->num_requests -= n; - LWLockRelease(CheckpointerCommLock); + } + + START_CRIT_SECTION(); + + /* Are there any requests in the queue? If so, keep going. */ + loop = CheckpointerShmem->num_requests != 0; + + LWLockRelease(CheckpointerCommLock); - for (request = requests; n > 0; request++, n--) - RememberSyncRequest(&request->ftag, request->type); + for (request = requests; n > 0; request++, n--) + RememberSyncRequest(&request->ftag, request->type); - END_CRIT_SECTION(); + END_CRIT_SECTION(); + } while (loop); if (requests) pfree(requests); |