diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/heap/vacuumlazy.c | 1002 | ||||
-rw-r--r-- | src/backend/access/transam/parallel.c | 2 |
2 files changed, 29 insertions, 975 deletions
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index d8f12175040..cd603e6aa41 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -40,7 +40,6 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" -#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" #include "access/xact.h" @@ -121,22 +120,10 @@ #define PREFETCH_SIZE ((BlockNumber) 32) /* - * DSM keys for parallel vacuum. Unlike other parallel execution code, since - * we don't need to worry about DSM keys conflicting with plan_node_id we can - * use small integers. - */ -#define PARALLEL_VACUUM_KEY_SHARED 1 -#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 -#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 - -/* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ -#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL) +#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -149,135 +136,6 @@ typedef enum VACUUM_ERRCB_PHASE_TRUNCATE } VacErrPhase; -/* - * Shared information among parallel workers. So this is allocated in the DSM - * segment. - */ -typedef struct LVShared -{ - /* - * Target table relid and log level. These fields are not modified during - * the lazy vacuum. - */ - Oid relid; - int elevel; - - /* - * Fields for both index vacuum and cleanup. - * - * reltuples is the total number of input heap tuples. We set either old - * live tuples in the index vacuum case or the new live tuples in the - * index cleanup case. - * - * estimated_count is true if reltuples is an estimated value. (Note that - * reltuples could be -1 in this case, indicating we have no idea.) - */ - double reltuples; - bool estimated_count; - - /* - * In single process lazy vacuum we could consume more memory during index - * vacuuming or cleanup apart from the memory for heap scanning. In - * parallel vacuum, since individual vacuum workers can consume memory - * equal to maintenance_work_mem, the new maintenance_work_mem for each - * worker is set such that the parallel operation doesn't consume more - * memory than single process lazy vacuum. - */ - int maintenance_work_mem_worker; - - /* - * Shared vacuum cost balance. During parallel vacuum, - * VacuumSharedCostBalance points to this value and it accumulates the - * balance of each parallel vacuum worker. - */ - pg_atomic_uint32 cost_balance; - - /* - * Number of active parallel workers. This is used for computing the - * minimum threshold of the vacuum cost balance before a worker sleeps for - * cost-based delay. - */ - pg_atomic_uint32 active_nworkers; - - /* Counter for vacuuming and cleanup */ - pg_atomic_uint32 idx; -} LVShared; - -/* Status used during parallel index vacuum or cleanup */ -typedef enum LVParallelIndVacStatus -{ - PARALLEL_INDVAC_STATUS_INITIAL = 0, - PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, - PARALLEL_INDVAC_STATUS_NEED_CLEANUP, - PARALLEL_INDVAC_STATUS_COMPLETED -} LVParallelIndVacStatus; - -/* - * Struct for index vacuum statistics of an index that is used for parallel vacuum. - * This includes the status of parallel index vacuum as well as index statistics. - */ -typedef struct LVParallelIndStats -{ - /* - * The following two fields are set by leader process before executing - * parallel index vacuum or parallel index cleanup. These fields are not - * fixed for the entire VACUUM operation. They are only fixed for an - * individual parallel index vacuum and cleanup. - * - * parallel_workers_can_process is true if both leader and worker can - * process the index, otherwise only leader can process it. - */ - LVParallelIndVacStatus status; - bool parallel_workers_can_process; - - /* - * Individual worker or leader stores the result of index vacuum or - * cleanup. - */ - bool istat_updated; /* are the stats updated? */ - IndexBulkDeleteResult istat; -} LVParallelIndStats; - -/* Struct for maintaining a parallel vacuum state. */ -typedef struct LVParallelState -{ - ParallelContext *pcxt; - - /* Shared information among parallel vacuum workers */ - LVShared *lvshared; - - /* - * Shared index statistics among parallel vacuum workers. The array - * element is allocated for every index, even those indexes where parallel - * index vacuuming is unsafe or not worthwhile (e.g., - * will_parallel_vacuum[] is false). During parallel vacuum, - * IndexBulkDeleteResult of each index is kept in DSM and is copied into - * local memory at the end of parallel vacuum. - */ - LVParallelIndStats *lvpindstats; - - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; - - /* - * False if the index is totally unsuitable target for all parallel - * processing. For example, the index could be < - * min_parallel_index_scan_size cutoff. - */ - bool *will_parallel_vacuum; - - /* - * The number of indexes that support parallel index bulk-deletion and - * parallel index cleanup respectively. - */ - int nindexes_parallel_bulkdel; - int nindexes_parallel_cleanup; - int nindexes_parallel_condcleanup; -} LVParallelState; - typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -295,9 +153,9 @@ typedef struct LVRelState bool do_index_cleanup; bool do_rel_truncate; - /* Buffer access strategy and parallel state */ + /* Buffer access strategy and parallel vacuum state */ BufferAccessStrategy bstrategy; - LVParallelState *lps; + ParallelVacuumState *pvs; /* rel's initial relfrozenxid and relminmxid */ TransactionId relfrozenxid; @@ -399,13 +257,6 @@ static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); static void lazy_cleanup_all_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum); -static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats); -static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, - LVParallelIndStats *pindstats); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, double reltuples, @@ -419,18 +270,11 @@ static bool should_attempt_truncation(LVRelState *vacrel); static void lazy_truncate_heap(LVRelState *vacrel); static BlockNumber count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected); -static int dead_items_max_items(LVRelState *vacrel); static void dead_items_alloc(LVRelState *vacrel, int nworkers); static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum); static void update_index_statistics(LVRelState *vacrel); -static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested); -static void parallel_vacuum_end(LVRelState *vacrel); -static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -1601,7 +1445,8 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive) /* * Free resources managed by dead_items_alloc. This will end parallel - * mode when needed (it must end before we update index statistics). + * mode when needed (it must end before updating index statistics as we + * can't write in parallel mode). */ dead_items_cleanup(vacrel); @@ -2066,7 +1911,6 @@ lazy_vacuum(LVRelState *vacrel) /* Should not end up here with no indexes */ Assert(vacrel->nindexes > 0); - Assert(!IsParallelWorker()); Assert(vacrel->lpdead_item_pages > 0); if (!vacrel->do_index_vacuuming) @@ -2195,7 +2039,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) { bool allindexes = true; - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); Assert(vacrel->do_index_vacuuming); Assert(vacrel->do_index_cleanup); @@ -2235,7 +2078,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, true); + parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, + vacrel->num_index_scans); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2609,352 +2453,11 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) } /* - * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. - */ -static void -parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum) -{ - LVParallelState *lps = vacrel->lps; - LVParallelIndVacStatus new_status; - int nworkers; - - Assert(!IsParallelWorker()); - Assert(ParallelVacuumIsActive(vacrel)); - Assert(vacrel->nindexes > 0); - - if (vacuum) - { - /* - * We can only provide an approximate value of num_heap_tuples, at - * least for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_bulkdel; - } - else - { - /* - * We can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - /* Add conditionally parallel-aware indexes if in the first time call */ - if (vacrel->num_index_scans == 0) - nworkers += vacrel->lps->nindexes_parallel_condcleanup; - } - - /* The leader process will participate */ - nworkers--; - - /* - * It is possible that parallel context is initialized with fewer workers - * than the number of indexes that need a separate worker in the current - * phase, so we need to consider it. See - * parallel_vacuum_compute_workers(). - */ - nworkers = Min(nworkers, lps->pcxt->nworkers); - - /* - * Set index vacuum status and mark whether parallel vacuum worker can - * process it. - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]); - - Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL); - pindstats->status = new_status; - pindstats->parallel_workers_can_process = - (lps->will_parallel_vacuum[i] & - parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i], - vacuum)); - } - - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Setup the shared cost-based vacuum delay and launch workers */ - if (nworkers > 0) - { - /* Reinitialize parallel context to relaunch parallel workers */ - if (vacrel->num_index_scans > 0) - ReinitializeParallelDSM(lps->pcxt); - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(lps->pcxt, nworkers); - - LaunchParallelWorkers(lps->pcxt); - - if (lps->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(lps->lvshared->cost_balance); - VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); - } - - if (vacuum) - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", - "launched %d parallel vacuum workers for index vacuuming (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - else - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - } - - /* Process the indexes that can be processed by only leader process */ - parallel_vacuum_process_unsafe_indexes(vacrel); - - /* - * Join as a parallel worker. The leader process alone processes all - * parallel-safe indexes in the case where no workers are launched. - */ - parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats); - - /* - * Next, accumulate buffer and WAL usage. (This must wait for the workers - * to finish, or we might get incomplete data.) - */ - if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(lps->pcxt); - - for (int i = 0; i < lps->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); - } - - /* - * Reset all index status back to initial (while checking that we have - * processed all indexes). - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[i]); - - if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) - elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", - RelationGetRelationName(vacrel->indrels[i])); - - pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL; - } - - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } -} - -/* - * Index vacuum/cleanup routine used by the leader process and parallel - * vacuum worker processes to process the indexes in parallel. - */ -static void -parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats) -{ - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - /* Loop until all indexes are vacuumed */ - for (;;) - { - int idx; - LVParallelIndStats *pis; - - /* Get an index number to process */ - idx = pg_atomic_fetch_add_u32(&(shared->idx), 1); - - /* Done for all indexes? */ - if (idx >= vacrel->nindexes) - break; - - pis = &(pindstats[idx]); - - /* - * Skip processing index that is unsafe for workers or has an - * unsuitable target for parallel index vacuum (this is processed in - * parallel_vacuum_process_unsafe_indexes() by the leader). - */ - if (!pis->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - shared, pis); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Perform parallel processing of indexes in leader process. - * - * Handles index vacuuming (or index cleanup) for indexes that are not - * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing index cleanup right now. - * - * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by parallel_vacuum_compute_workers(). - */ -static void -parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel) -{ - LVParallelState *lps = vacrel->lps; - - Assert(!IsParallelWorker()); - - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - /* Skip, indexes that are safe for workers */ - if (pindstats->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - lps->lvshared, pindstats); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Vacuum or cleanup index either by leader process or by one of the worker - * process. After processing the index this function copies the index - * statistics returned from ambulkdelete and amvacuumcleanup to the DSM - * segment. - */ -static void -parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, LVParallelIndStats *pindstats) -{ - IndexBulkDeleteResult *istat = NULL; - IndexBulkDeleteResult *istat_res; - - /* - * Update the pointer to the corresponding bulk-deletion result if someone - * has already updated it - */ - if (pindstats->istat_updated) - istat = &(pindstats->istat); - - switch (pindstats->status) - { - case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: - istat_res = lazy_vacuum_one_index(indrel, istat, - shared->reltuples, vacrel); - break; - case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: - istat_res = lazy_cleanup_one_index(indrel, istat, - shared->reltuples, - shared->estimated_count, - vacrel); - break; - default: - elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", - pindstats->status, - RelationGetRelationName(indrel)); - } - - /* - * Copy the index bulk-deletion result returned from ambulkdelete and - * amvacuumcleanup to the DSM segment if it's the first cycle because they - * allocate locally and it's possible that an index will be vacuumed by a - * different vacuum process the next cycle. Copying the result normally - * happens only the first time an index is vacuumed. For any additional - * vacuum pass, we directly point to the result on the DSM segment and - * pass it to vacuum index APIs so that workers can update it directly. - * - * Since all vacuum workers write the bulk-deletion result at different - * slots we can write them without locking. - */ - if (!pindstats->istat_updated && istat_res != NULL) - { - memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); - pindstats->istat_updated = true; - - /* Free the locally-allocated bulk-deletion result */ - pfree(istat_res); - } - - /* - * Update the status to completed. No need to lock here since each worker - * touches different indexes. - */ - pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; -} - -/* * lazy_cleanup_all_indexes() -- cleanup all indexes of relation. */ static void lazy_cleanup_all_indexes(LVRelState *vacrel) { - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); /* Report that we are now cleaning up indexes */ @@ -2980,7 +2483,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, false); + parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, + vacrel->num_index_scans, + (vacrel->tupcount_pages < vacrel->rel_pages)); } } @@ -3409,8 +2914,6 @@ dead_items_max_items(LVRelState *vacrel) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - Assert(!IsParallelWorker()); - if (vacrel->nindexes > 0) { BlockNumber rel_pages = vacrel->rel_pages; @@ -3448,6 +2951,9 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) VacDeadItems *dead_items; int max_items; + max_items = dead_items_max_items(vacrel); + Assert(max_items >= MaxHeapTuplesPerPage); + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3471,15 +2977,20 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else - parallel_vacuum_begin(vacrel, nworkers); + vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, + vacrel->nindexes, nworkers, + max_items, elevel, + vacrel->bstrategy); - /* If parallel mode started, vacrel->dead_items allocated in DSM */ + /* If parallel mode started, dead_items space is allocated in DSM */ if (ParallelVacuumIsActive(vacrel)) + { + vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs); return; + } } /* Serial VACUUM case */ - max_items = dead_items_max_items(vacrel); dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items)); dead_items->max_items = max_items; dead_items->num_items = 0; @@ -3499,11 +3010,9 @@ dead_items_cleanup(LVRelState *vacrel) return; } - /* - * End parallel mode before updating index statistics as we cannot write - * during parallel mode. - */ - parallel_vacuum_end(vacrel); + /* End parallel mode */ + parallel_vacuum_end(vacrel->pvs, vacrel->indstats); + vacrel->pvs = NULL; } /* @@ -3628,77 +3137,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, } /* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. The index - * is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. - * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. - */ -static int -parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum) -{ - int nindexes_parallel = 0; - int nindexes_parallel_bulkdel = 0; - int nindexes_parallel_cleanup = 0; - int parallel_workers; - - /* - * We don't allow performing parallel operation in standalone backend or - * when parallelism is disabled. - */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) - return 0; - - /* - * Compute the number of indexes that can participate in parallel vacuum. - */ - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - Relation indrel = vacrel->indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* Skip index that is not a suitable target for parallel index vacuum */ - if (vacoptions == VACUUM_OPTION_NO_PARALLEL || - RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) - continue; - - will_parallel_vacuum[idx] = true; - - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - nindexes_parallel_bulkdel++; - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - nindexes_parallel_cleanup++; - } - - nindexes_parallel = Max(nindexes_parallel_bulkdel, - nindexes_parallel_cleanup); - - /* The leader process takes one index */ - nindexes_parallel--; - - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; - - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); - - return parallel_workers; -} - -/* * Update index statistics in pg_class if the statistics are accurate. */ static void @@ -3731,394 +3169,10 @@ update_index_statistics(LVRelState *vacrel) } /* - * Try to enter parallel mode and create a parallel context. Then initialize - * shared memory state. - * - * On success (when we can launch one or more workers), will set dead_items and - * lps in vacrel for caller. A set lps in vacrel state indicates that parallel - * VACUUM is currently active. - */ -static void -parallel_vacuum_begin(LVRelState *vacrel, int nrequested) -{ - LVParallelState *lps; - Relation *indrels = vacrel->indrels; - int nindexes = vacrel->nindexes; - ParallelContext *pcxt; - LVShared *shared; - VacDeadItems *dead_items; - LVParallelIndStats *pindstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - bool *will_parallel_vacuum; - int max_items; - Size est_pindstats_len; - Size est_shared_len; - Size est_dead_items_len; - int nindexes_mwm = 0; - int parallel_workers = 0; - int querylen; - - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ - Assert(nrequested >= 0); - Assert(nindexes > 0); - - /* - * Compute the number of parallel vacuum workers to launch - */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested, - will_parallel_vacuum); - if (parallel_workers <= 0) - { - /* Can't perform vacuum in parallel -- lps not set in vacrel */ - pfree(will_parallel_vacuum); - return; - } - - lps = (LVParallelState *) palloc0(sizeof(LVParallelState)); - - EnterParallelMode(); - pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", - parallel_workers); - Assert(pcxt->nworkers > 0); - lps->pcxt = pcxt; - lps->will_parallel_vacuum = will_parallel_vacuum; - - /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ - est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes); - shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared_len = sizeof(LVShared); - shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ - max_items = dead_items_max_items(vacrel); - est_dead_items_len = vac_max_items_to_alloc_size(max_items); - shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ - - InitializeParallelDSM(pcxt); - - /* Prepare index vacuum stats */ - pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats); - lps->lvpindstats = pindstats; - - /* Prepare shared information */ - shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); - MemSet(shared, 0, est_shared_len); - shared->relid = RelationGetRelid(vacrel->rel); - shared->elevel = elevel; - shared->maintenance_work_mem_worker = - (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; - - pg_atomic_init_u32(&(shared->cost_balance), 0); - pg_atomic_init_u32(&(shared->active_nworkers), 0); - pg_atomic_init_u32(&(shared->idx), 0); - - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); - lps->lvshared = shared; - - /* Prepare the dead_items space */ - dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, - est_dead_items_len); - dead_items->max_items = max_items; - dead_items->num_items = 0; - MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); - - /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize - */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - lps->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - lps->wal_usage = wal_usage; - - /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } - - /* Success -- set dead_items and lps in leader's vacrel state */ - vacrel->dead_items = dead_items; - vacrel->lps = lps; -} - -/* - * Destroy the parallel context, and end parallel mode. - * - * Since writes are not allowed during parallel mode, copy the - * updated index statistics from DSM into local memory and then later use that - * to update the index statistics. One might think that we can exit from - * parallel mode, update the index statistics and then destroy parallel - * context, but that won't be safe (see ExitParallelMode). - */ -static void -parallel_vacuum_end(LVRelState *vacrel) -{ - IndexBulkDeleteResult **indstats = vacrel->indstats; - LVParallelState *lps = vacrel->lps; - int nindexes = vacrel->nindexes; - - Assert(!IsParallelWorker()); - - /* Copy the updated statistics */ - for (int idx = 0; idx < nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - if (pindstats->istat_updated) - { - indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult)); - } - else - indstats[idx] = NULL; - } - - DestroyParallelContext(lps->pcxt); - ExitParallelMode(); - - /* Deactivate parallel vacuum */ - pfree(lps->will_parallel_vacuum); - pfree(lps); - vacrel->lps = NULL; -} - -/* - * Returns false, if the given index can't participate in the next execution of - * parallel index vacuum or parallel index cleanup. - */ -static bool -parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum) -{ - uint8 vacoptions; - - vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* In parallel vacuum case, check if it supports parallel bulk-deletion */ - if (vacuum) - return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); - - /* Not safe, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return false; - - /* - * Not safe, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). We do this to avoid - * the need to invoke workers when parallel index cleanup doesn't need to - * scan the index. See the comments for option - * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support - * parallel cleanup conditionally. - */ - if (vacrel->num_index_scans > 0 && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - - return true; -} - -/* - * Perform work within a launched parallel process. - * - * Since parallel vacuum workers perform only index vacuum or index cleanup, - * we don't need to report progress information. - */ -void -parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) -{ - Relation rel; - Relation *indrels; - LVParallelIndStats *lvpindstats; - LVShared *lvshared; - VacDeadItems *dead_items; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - int nindexes; - char *sharedquery; - LVRelState vacrel; - ErrorContextCallback errcallback; - - /* - * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we - * don't support parallel vacuum for autovacuum as of now. - */ - Assert(MyProc->statusFlags == PROC_IN_VACUUM); - - lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, - false); - elevel = lvshared->elevel; - - elog(DEBUG1, "starting parallel vacuum worker"); - - /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); - - /* - * Open table. The lock mode is the same as the leader process. It's - * okay because the lock mode does not conflict among the parallel - * workers. - */ - rel = table_open(lvshared->relid, ShareUpdateExclusiveLock); - - /* - * Open all indexes. indrels are sorted in order by OID, which should be - * matched to the leader's one. - */ - vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); - - /* Set index statistics */ - lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_INDEX_STATS, - false); - - /* Set dead_items space (set as worker's vacrel dead_items below) */ - dead_items = (VacDeadItems *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_DEAD_ITEMS, - false); - - /* Set cost-based vacuum delay */ - VacuumCostActive = (VacuumCostDelay > 0); - VacuumCostBalance = 0; - VacuumPageHit = 0; - VacuumPageMiss = 0; - VacuumPageDirty = 0; - VacuumCostBalanceLocal = 0; - VacuumSharedCostBalance = &(lvshared->cost_balance); - VacuumActiveNWorkers = &(lvshared->active_nworkers); - - vacrel.rel = rel; - vacrel.indrels = indrels; - vacrel.nindexes = nindexes; - /* Each parallel VACUUM worker gets its own access strategy */ - vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM); - vacrel.indstats = (IndexBulkDeleteResult **) - palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - - if (lvshared->maintenance_work_mem_worker > 0) - maintenance_work_mem = lvshared->maintenance_work_mem_worker; - - /* - * Initialize vacrel for use as error callback arg by parallel worker. - */ - vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel)); - vacrel.relname = pstrdup(RelationGetRelationName(rel)); - vacrel.indname = NULL; - vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */ - vacrel.dead_items = dead_items; - - /* Setup error traceback support for ereport() */ - errcallback.callback = vacuum_error_callback; - errcallback.arg = &vacrel; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); - - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats); - - /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - - vac_close_indexes(nindexes, indrels, RowExclusiveLock); - table_close(rel, ShareUpdateExclusiveLock); - FreeAccessStrategy(vacrel.bstrategy); - pfree(vacrel.indstats); -} - -/* - * Error context callback for errors occurring during vacuum. + * Error context callback for errors occurring during vacuum. The error + * context messages for index phases should match the messages set in parallel + * vacuum. If you change this function for those phases, change + * parallel_vacuum_error_callback() as well. */ static void vacuum_error_callback(void *arg) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index bb1881f5736..ae7c7133dd4 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,7 +14,6 @@ #include "postgres.h" -#include "access/heapam.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -25,6 +24,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" |