aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/vacuumlazy.c1256
-rw-r--r--src/backend/access/transam/parallel.c26
-rw-r--r--src/backend/commands/vacuum.c135
-rw-r--r--src/backend/postmaster/autovacuum.c2
-rw-r--r--src/bin/psql/tab-complete.c2
-rw-r--r--src/include/access/heapam.h3
-rw-r--r--src/include/access/parallel.h4
-rw-r--r--src/include/commands/vacuum.h12
-rw-r--r--src/test/regress/expected/vacuum.out34
-rw-r--r--src/test/regress/sql/vacuum.sql31
-rw-r--r--src/tools/pgindent/typedefs.list4
11 files changed, 1387 insertions, 122 deletions
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index a5fe90485fa..b331f4c279b 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -22,6 +22,20 @@
* of index scans performed. So we don't use maintenance_work_mem memory for
* the TID array, just enough to hold as many heap tuples as fit on one page.
*
+ * Lazy vacuum supports parallel execution with parallel worker processes. In
+ * a parallel vacuum, we perform both index vacuum and index cleanup with
+ * parallel worker processes. Individual indexes are processed by one vacuum
+ * process. At the beginning of a lazy vacuum (at lazy_scan_heap) we prepare
+ * the parallel context and initialize the DSM segment that contains shared
+ * information as well as the memory space for storing dead tuples. When
+ * starting either index vacuum or index cleanup, we launch parallel worker
+ * processes. Once all indexes are processed the parallel worker processes
+ * exit. After that, the leader process re-initializes the parallel context
+ * so that it can use the same DSM for multiple passes of index vacuum and
+ * for performing index cleanup. For updating the index statistics, we need
+ * to update the system table and since updates are not allowed during
+ * parallel mode we update the index statistics after exiting from the
+ * parallel mode.
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -36,25 +50,30 @@
#include <math.h>
+#include "access/amapi.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/multixact.h"
+#include "access/parallel.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
+#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/storage.h"
#include "commands/dbcommands.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
+#include "optimizer/paths.h"
#include "pgstat.h"
#include "portability/instr_time.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
@@ -110,6 +129,142 @@
*/
#define PREFETCH_SIZE ((BlockNumber) 32)
+/*
+ * DSM keys for parallel vacuum. Unlike other parallel execution code, since
+ * we don't need to worry about DSM keys conflicting with plan_node_id we can
+ * use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED 1
+#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
+
+/*
+ * Macro to check if we are in a parallel vacuum. If true, we are in the
+ * parallel mode and the DSM segment is initialized.
+ */
+#define ParallelVacuumIsActive(lps) PointerIsValid(lps)
+
+/*
+ * LVDeadTuples stores the dead tuple TIDs collected during the heap scan.
+ * This is allocated in the DSM segment in parallel mode and in local memory
+ * in non-parallel mode.
+ */
+typedef struct LVDeadTuples
+{
+ int max_tuples; /* # slots allocated in array */
+ int num_tuples; /* current # of entries */
+ /* List of TIDs of tuples we intend to delete */
+ /* NB: this list is ordered by TID address */
+ ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER]; /* array of
+ * ItemPointerData */
+} LVDeadTuples;
+
+#define SizeOfLVDeadTuples(cnt) \
+ add_size((offsetof(LVDeadTuples, itemptrs)), \
+ mul_size(sizeof(ItemPointerData), cnt))
+
+/*
+ * Shared information among parallel workers. So this is allocated in the DSM
+ * segment.
+ */
+typedef struct LVShared
+{
+ /*
+ * Target table relid and log level. These fields are not modified during
+ * the lazy vacuum.
+ */
+ Oid relid;
+ int elevel;
+
+ /*
+ * An indication for vacuum workers to perform either index vacuum or
+ * index cleanup. first_time is true only if for_cleanup is true and
+ * bulk-deletion is not performed yet.
+ */
+ bool for_cleanup;
+ bool first_time;
+
+ /*
+ * Fields for both index vacuum and cleanup.
+ *
+ * reltuples is the total number of input heap tuples. We set either old
+ * live tuples in the index vacuum case or the new live tuples in the
+ * index cleanup case.
+ *
+ * estimated_count is true if the reltuples is an estimated value.
+ */
+ double reltuples;
+ bool estimated_count;
+
+ /*
+ * In single process lazy vacuum we could consume more memory during index
+ * vacuuming or cleanup apart from the memory for heap scanning. In
+ * parallel vacuum, since individual vacuum workers can consume memory
+ * equal to maintenance_work_mem, the new maintenance_work_mem for each
+ * worker is set such that the parallel operation doesn't consume more
+ * memory than single process lazy vacuum.
+ */
+ int maintenance_work_mem_worker;
+
+ /*
+ * Shared vacuum cost balance. During parallel vacuum,
+ * VacuumSharedCostBalance points to this value and it accumulates the
+ * balance of each parallel vacuum worker.
+ */
+ pg_atomic_uint32 cost_balance;
+
+ /*
+ * Number of active parallel workers. This is used for computing the
+ * minimum threshold of the vacuum cost balance for a worker to go for the
+ * delay.
+ */
+ pg_atomic_uint32 active_nworkers;
+
+ /*
+ * Variables to control parallel vacuum. We have a bitmap to indicate
+ * which index has stats in shared memory. The set bit in the map
+ * indicates that the particular index supports a parallel vacuum.
+ */
+ pg_atomic_uint32 idx; /* counter for vacuuming and clean up */
+ uint32 offset; /* sizeof header incl. bitmap */
+ bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */
+
+ /* Shared index statistics data follows at end of struct */
+} LVShared;
+
+#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
+#define GetSharedIndStats(s) \
+ ((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
+#define IndStatsIsNull(s, i) \
+ (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
+
+/*
+ * Struct for an index bulk-deletion statistic used for parallel vacuum. This
+ * is allocated in the DSM segment.
+ */
+typedef struct LVSharedIndStats
+{
+ bool updated; /* are the stats updated? */
+ IndexBulkDeleteResult stats;
+} LVSharedIndStats;
+
+/* Struct for maintaining a parallel vacuum state. */
+typedef struct LVParallelState
+{
+ ParallelContext *pcxt;
+
+ /* Shared information among parallel vacuum workers */
+ LVShared *lvshared;
+
+ /*
+ * The number of indexes that support parallel index bulk-deletion and
+ * parallel index cleanup respectively.
+ */
+ int nindexes_parallel_bulkdel;
+ int nindexes_parallel_cleanup;
+ int nindexes_parallel_condcleanup;
+} LVParallelState;
+
typedef struct LVRelStats
{
/* useindex = true means two-pass strategy; false means one-pass */
@@ -128,11 +283,7 @@ typedef struct LVRelStats
BlockNumber pages_removed;
double tuples_deleted;
BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
- /* List of TIDs of tuples we intend to delete */
- /* NB: this list is ordered by TID address */
- int num_dead_tuples; /* current # of entries */
- int max_dead_tuples; /* # slots allocated in array */
- ItemPointer dead_tuples; /* array of ItemPointerData */
+ LVDeadTuples *dead_tuples;
int num_index_scans;
TransactionId latestRemovedXid;
bool lock_waiter_detected;
@@ -155,15 +306,15 @@ static void lazy_scan_heap(Relation onerel, VacuumParams *params,
bool aggressive);
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
-static void lazy_vacuum_index(Relation indrel,
- IndexBulkDeleteResult **stats,
- LVRelStats *vacrelstats);
-static void lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
- Relation *Irel, int nindexes,
- IndexBulkDeleteResult **indstats);
+static void lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
+ IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes);
+static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+ LVDeadTuples *dead_tuples, double reltuples);
static void lazy_cleanup_index(Relation indrel,
- IndexBulkDeleteResult *stats,
- LVRelStats *vacrelstats);
+ IndexBulkDeleteResult **stats,
+ double reltuples, bool estimated_count);
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
static bool should_attempt_truncation(VacuumParams *params,
@@ -172,12 +323,41 @@ static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
static BlockNumber count_nondeletable_pages(Relation onerel,
LVRelStats *vacrelstats);
static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
+static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples,
ItemPointer itemptr);
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
static int vac_cmp_itemptr(const void *left, const void *right);
static bool heap_page_is_all_visible(Relation rel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
+static void lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes);
+static void parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVShared *lvshared, LVDeadTuples *dead_tuples,
+ int nindexes);
+static void vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes);
+static void vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
+ LVShared *lvshared, LVSharedIndStats *shared_indstats,
+ LVDeadTuples *dead_tuples);
+static void lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes);
+static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
+static int compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
+ bool *can_parallel_vacuum);
+static void prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
+ int nindexes);
+static void update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
+ int nindexes);
+static LVParallelState *begin_parallel_vacuum(Oid relid, Relation *Irel,
+ LVRelStats *vacrelstats, BlockNumber nblocks,
+ int nindexes, int nrequested);
+static void end_parallel_vacuum(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVParallelState *lps, int nindexes);
+static LVSharedIndStats *get_indstats(LVShared *lvshared, int n);
+static bool skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared);
/*
@@ -491,6 +671,18 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
* dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
* to reclaim dead line pointers.
*
+ * If the table has at least two indexes, we execute both index vacuum
+ * and index cleanup with parallel workers unless the parallel vacuum is
+ * disabled. In a parallel vacuum, we enter parallel mode and then
+ * create both the parallel context and the DSM segment before starting
+ * heap scan so that we can record dead tuples to the DSM segment. All
+ * parallel workers are launched at beginning of index vacuuming and
+ * index cleanup and they exit once done with all indexes. At the end of
+ * this function we exit from parallel mode. Index bulk-deletion results
+ * are stored in the DSM segment and we update index statistics for all
+ * the indexes after exiting from parallel mode since writes are not
+ * allowed during parallel mode.
+ *
* If there are no indexes then we can reclaim line pointers on the fly;
* dead line pointers need only be retained until all index pointers that
* reference them have been killed.
@@ -499,6 +691,8 @@ static void
lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
Relation *Irel, int nindexes, bool aggressive)
{
+ LVParallelState *lps = NULL;
+ LVDeadTuples *dead_tuples;
BlockNumber nblocks,
blkno;
HeapTupleData tuple;
@@ -556,13 +750,48 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
vacrelstats->nonempty_pages = 0;
vacrelstats->latestRemovedXid = InvalidTransactionId;
- lazy_space_alloc(vacrelstats, nblocks);
+ /*
+ * Initialize the state for a parallel vacuum. As of now, only one worker
+ * can be used for an index, so we invoke parallelism only if there are at
+ * least two indexes on a table.
+ */
+ if (params->nworkers >= 0 && vacrelstats->useindex && nindexes > 1)
+ {
+ /*
+ * Since parallel workers cannot access data in temporary tables, we
+ * can't perform parallel vacuum on them.
+ */
+ if (RelationUsesLocalBuffers(onerel))
+ {
+ /*
+ * Give warning only if the user explicitly tries to perform a
+ * parallel vacuum on the temporary table.
+ */
+ if (params->nworkers > 0)
+ ereport(WARNING,
+ (errmsg("disabling parallel option of vacuum on \"%s\" --- cannot vacuum temporary tables in parallel",
+ RelationGetRelationName(onerel))));
+ }
+ else
+ lps = begin_parallel_vacuum(RelationGetRelid(onerel), Irel,
+ vacrelstats, nblocks, nindexes,
+ params->nworkers);
+ }
+
+ /*
+ * Allocate the space for dead tuples in case the parallel vacuum is not
+ * initialized.
+ */
+ if (!ParallelVacuumIsActive(lps))
+ lazy_space_alloc(vacrelstats, nblocks);
+
+ dead_tuples = vacrelstats->dead_tuples;
frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
/* Report that we're scanning the heap, advertising total # of blocks */
initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
initprog_val[1] = nblocks;
- initprog_val[2] = vacrelstats->max_dead_tuples;
+ initprog_val[2] = dead_tuples->max_tuples;
pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
/*
@@ -740,8 +969,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* If we are close to overrunning the available space for dead-tuple
* TIDs, pause and do a cycle of vacuuming before we tackle this page.
*/
- if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
- vacrelstats->num_dead_tuples > 0)
+ if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
+ dead_tuples->num_tuples > 0)
{
/*
* Before beginning index vacuuming, we release any pin we may
@@ -756,8 +985,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
}
/* Work on all the indexes, then the heap */
- lazy_vacuum_all_indexes(onerel, vacrelstats, Irel,
- nindexes, indstats);
+ lazy_vacuum_all_indexes(onerel, Irel, indstats,
+ vacrelstats, lps, nindexes);
/* Remove tuples from heap */
lazy_vacuum_heap(onerel, vacrelstats);
@@ -767,7 +996,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* not to reset latestRemovedXid since we want that value to be
* valid.
*/
- vacrelstats->num_dead_tuples = 0;
+ dead_tuples->num_tuples = 0;
/*
* Vacuum the Free Space Map to make newly-freed space visible on
@@ -962,7 +1191,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
has_dead_tuples = false;
nfrozen = 0;
hastup = false;
- prev_dead_count = vacrelstats->num_dead_tuples;
+ prev_dead_count = dead_tuples->num_tuples;
maxoff = PageGetMaxOffsetNumber(page);
/*
@@ -1001,7 +1230,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
*/
if (ItemIdIsDead(itemid))
{
- lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
all_visible = false;
continue;
}
@@ -1147,7 +1376,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
if (tupgone)
{
- lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
&vacrelstats->latestRemovedXid);
tups_vacuumed += 1;
@@ -1217,7 +1446,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* doing a second scan. Also we don't do that but forget dead tuples
* when index cleanup is disabled.
*/
- if (!vacrelstats->useindex && vacrelstats->num_dead_tuples > 0)
+ if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
{
if (nindexes == 0)
{
@@ -1246,7 +1475,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* not to reset latestRemovedXid since we want that value to be
* valid.
*/
- vacrelstats->num_dead_tuples = 0;
+ dead_tuples->num_tuples = 0;
/*
* Periodically do incremental FSM vacuuming to make newly-freed
@@ -1361,7 +1590,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* page, so remember its free space as-is. (This path will always be
* taken if there are no indexes.)
*/
- if (vacrelstats->num_dead_tuples == prev_dead_count)
+ if (dead_tuples->num_tuples == prev_dead_count)
RecordPageWithFreeSpace(onerel, blkno, freespace);
}
@@ -1395,11 +1624,11 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
/* If any tuples need to be deleted, perform final vacuum cycle */
/* XXX put a threshold on min number of tuples here? */
- if (vacrelstats->num_dead_tuples > 0)
+ if (dead_tuples->num_tuples > 0)
{
/* Work on all the indexes, and then the heap */
- lazy_vacuum_all_indexes(onerel, vacrelstats, Irel, nindexes,
- indstats);
+ lazy_vacuum_all_indexes(onerel, Irel, indstats, vacrelstats,
+ lps, nindexes);
/* Remove tuples from heap */
lazy_vacuum_heap(onerel, vacrelstats);
@@ -1412,17 +1641,22 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
if (blkno > next_fsm_block_to_vacuum)
FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno);
- /* report all blocks vacuumed; and that we're cleaning up */
+ /* report all blocks vacuumed */
pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
- pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
- PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
- /* Do post-vacuum cleanup and statistics update for each index */
+ /* Do post-vacuum cleanup */
if (vacrelstats->useindex)
- {
- for (i = 0; i < nindexes; i++)
- lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
- }
+ lazy_cleanup_all_indexes(Irel, indstats, vacrelstats, lps, nindexes);
+
+ /*
+ * End parallel mode before updating index statistics as we cannot write
+ * during parallel mode.
+ */
+ if (ParallelVacuumIsActive(lps))
+ end_parallel_vacuum(Irel, indstats, lps, nindexes);
+
+ /* Update index statistics */
+ update_index_statistics(Irel, indstats, nindexes);
/* If no indexes, make log report that lazy_vacuum_heap would've made */
if (vacuumed_pages)
@@ -1467,15 +1701,16 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
/*
* lazy_vacuum_all_indexes() -- vacuum all indexes of relation.
*
- * This is a utility wrapper for lazy_vacuum_index(), able to do
- * progress reporting.
+ * We process the indexes serially unless we are doing parallel vacuum.
*/
static void
-lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
- Relation *Irel, int nindexes,
- IndexBulkDeleteResult **indstats)
+lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
+ IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes)
{
- int i;
+ Assert(!IsParallelWorker());
+ Assert(nindexes > 0);
/* Log cleanup info before we touch indexes */
vacuum_log_cleanup_info(onerel, vacrelstats);
@@ -1484,9 +1719,30 @@ lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
- /* Remove index entries */
- for (i = 0; i < nindexes; i++)
- lazy_vacuum_index(Irel[i], &indstats[i], vacrelstats);
+ /* Perform index vacuuming with parallel workers for parallel vacuum. */
+ if (ParallelVacuumIsActive(lps))
+ {
+ /* Tell parallel workers to do index vacuuming */
+ lps->lvshared->for_cleanup = false;
+ lps->lvshared->first_time = false;
+
+ /*
+ * We can only provide an approximate value of num_heap_tuples in
+ * vacuum cases.
+ */
+ lps->lvshared->reltuples = vacrelstats->old_live_tuples;
+ lps->lvshared->estimated_count = true;
+
+ lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
+ }
+ else
+ {
+ int idx;
+
+ for (idx = 0; idx < nindexes; idx++)
+ lazy_vacuum_index(Irel[idx], &stats[idx], vacrelstats->dead_tuples,
+ vacrelstats->old_live_tuples);
+ }
/* Increase and report the number of index scans */
vacrelstats->num_index_scans++;
@@ -1522,7 +1778,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
npages = 0;
tupindex = 0;
- while (tupindex < vacrelstats->num_dead_tuples)
+ while (tupindex < vacrelstats->dead_tuples->num_tuples)
{
BlockNumber tblk;
Buffer buf;
@@ -1531,7 +1787,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
vacuum_delay_point();
- tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+ tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
vac_strategy);
if (!ConditionalLockBufferForCleanup(buf))
@@ -1579,6 +1835,7 @@ static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
{
+ LVDeadTuples *dead_tuples = vacrelstats->dead_tuples;
Page page = BufferGetPage(buffer);
OffsetNumber unused[MaxOffsetNumber];
int uncnt = 0;
@@ -1589,16 +1846,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
START_CRIT_SECTION();
- for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+ for (; tupindex < dead_tuples->num_tuples; tupindex++)
{
BlockNumber tblk;
OffsetNumber toff;
ItemId itemid;
- tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+ tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
if (tblk != blkno)
break; /* past end of tuples for this block */
- toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+ toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
itemid = PageGetItemId(page, toff);
ItemIdSetUnused(itemid);
unused[uncnt++] = toff;
@@ -1719,19 +1976,344 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
return false;
}
+/*
+ * Perform index vacuum or index cleanup with parallel workers. This function
+ * must be used by the parallel vacuum leader process. The caller must set
+ * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
+ * cleanup.
+ */
+static void
+lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes)
+{
+ int nworkers;
+
+ Assert(!IsParallelWorker());
+ Assert(ParallelVacuumIsActive(lps));
+ Assert(nindexes > 0);
+
+ /* Determine the number of parallel workers to launch */
+ if (lps->lvshared->for_cleanup)
+ {
+ if (lps->lvshared->first_time)
+ nworkers = lps->nindexes_parallel_cleanup +
+ lps->nindexes_parallel_condcleanup;
+ else
+ nworkers = lps->nindexes_parallel_cleanup;
+ }
+ else
+ nworkers = lps->nindexes_parallel_bulkdel;
+
+ /* The leader process will participate */
+ nworkers--;
+
+ /*
+ * It is possible that parallel context is initialized with fewer workers
+ * than the number of indexes that need a separate worker in the current
+ * phase, so we need to consider it. See compute_parallel_vacuum_workers.
+ */
+ nworkers = Min(nworkers, lps->pcxt->nworkers);
+
+ /* Setup the shared cost-based vacuum delay and launch workers */
+ if (nworkers > 0)
+ {
+ if (vacrelstats->num_index_scans > 0)
+ {
+ /* Reset the parallel index processing counter */
+ pg_atomic_write_u32(&(lps->lvshared->idx), 0);
+
+ /* Reinitialize the parallel context to relaunch parallel workers */
+ ReinitializeParallelDSM(lps->pcxt);
+ }
+
+ /*
+ * Set up shared cost balance and the number of active workers for
+ * vacuum delay. We need to do this before launching workers as
+ * otherwise, they might not see the updated values for these
+ * parameters.
+ */
+ pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
+ pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
+
+ /*
+ * The number of workers can vary between bulkdelete and cleanup
+ * phase.
+ */
+ ReinitializeParallelWorkers(lps->pcxt, nworkers);
+
+ LaunchParallelWorkers(lps->pcxt);
+
+ if (lps->pcxt->nworkers_launched > 0)
+ {
+ /*
+ * Reset the local cost values for leader backend as we have
+ * already accumulated the remaining balance of heap.
+ */
+ VacuumCostBalance = 0;
+ VacuumCostBalanceLocal = 0;
+
+ /* Enable shared cost balance for leader backend */
+ VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
+ VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
+ }
+
+ if (lps->lvshared->for_cleanup)
+ ereport(elevel,
+ (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+ "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+ lps->pcxt->nworkers_launched),
+ lps->pcxt->nworkers_launched, nworkers)));
+ else
+ ereport(elevel,
+ (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+ "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+ lps->pcxt->nworkers_launched),
+ lps->pcxt->nworkers_launched, nworkers)));
+ }
+
+ /* Process the indexes that can be processed by only leader process */
+ vacuum_indexes_leader(Irel, stats, vacrelstats, lps, nindexes);
+
+ /*
+ * Join as a parallel worker. The leader process alone processes all the
+ * indexes in the case where no workers are launched.
+ */
+ parallel_vacuum_index(Irel, stats, lps->lvshared,
+ vacrelstats->dead_tuples, nindexes);
+
+ /* Wait for all vacuum workers to finish */
+ WaitForParallelWorkersToFinish(lps->pcxt);
+
+ /*
+ * Carry the shared balance value to heap scan and disable shared costing
+ */
+ if (VacuumSharedCostBalance)
+ {
+ VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+ VacuumSharedCostBalance = NULL;
+ VacuumActiveNWorkers = NULL;
+ }
+}
+
+/*
+ * Index vacuum/cleanup routine used by the leader process and parallel
+ * vacuum worker processes to process the indexes in parallel.
+ */
+static void
+parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVShared *lvshared, LVDeadTuples *dead_tuples,
+ int nindexes)
+{
+ /*
+ * Increment the active worker count if we are able to launch any worker.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ /* Loop until all indexes are vacuumed */
+ for (;;)
+ {
+ int idx;
+ LVSharedIndStats *shared_indstats;
+
+ /* Get an index number to process */
+ idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
+
+ /* Done for all indexes? */
+ if (idx >= nindexes)
+ break;
+
+ /* Get the index statistics of this index from DSM */
+ shared_indstats = get_indstats(lvshared, idx);
+
+ /*
+ * Skip processing indexes that doesn't participate in parallel
+ * operation
+ */
+ if (shared_indstats == NULL ||
+ skip_parallel_vacuum_index(Irel[idx], lvshared))
+ continue;
+
+ /* Do vacuum or cleanup of the index */
+ vacuum_one_index(Irel[idx], &(stats[idx]), lvshared, shared_indstats,
+ dead_tuples);
+ }
+
+ /*
+ * We have completed the index vacuum so decrement the active worker
+ * count.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Vacuum or cleanup indexes that can be processed by only the leader process
+ * because these indexes don't support parallel operation at that phase.
+ */
+static void
+vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes)
+{
+ int i;
+
+ Assert(!IsParallelWorker());
+
+ /*
+ * Increment the active worker count if we are able to launch any worker.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ for (i = 0; i < nindexes; i++)
+ {
+ LVSharedIndStats *shared_indstats;
+
+ shared_indstats = get_indstats(lps->lvshared, i);
+
+ /* Process the indexes skipped by parallel workers */
+ if (shared_indstats == NULL ||
+ skip_parallel_vacuum_index(Irel[i], lps->lvshared))
+ vacuum_one_index(Irel[i], &(stats[i]), lps->lvshared,
+ shared_indstats, vacrelstats->dead_tuples);
+ }
+
+ /*
+ * We have completed the index vacuum so decrement the active worker
+ * count.
+ */
+ if (VacuumActiveNWorkers)
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Vacuum or cleanup index either by leader process or by one of the worker
+ * process. After processing the index this function copies the index
+ * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
+ * segment.
+ */
+static void
+vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
+ LVShared *lvshared, LVSharedIndStats *shared_indstats,
+ LVDeadTuples *dead_tuples)
+{
+ IndexBulkDeleteResult *bulkdelete_res = NULL;
+
+ if (shared_indstats)
+ {
+ /* Get the space for IndexBulkDeleteResult */
+ bulkdelete_res = &(shared_indstats->stats);
+
+ /*
+ * Update the pointer to the corresponding bulk-deletion result if
+ * someone has already updated it.
+ */
+ if (shared_indstats->updated && *stats == NULL)
+ *stats = bulkdelete_res;
+ }
+
+ /* Do vacuum or cleanup of the index */
+ if (lvshared->for_cleanup)
+ lazy_cleanup_index(indrel, stats, lvshared->reltuples,
+ lvshared->estimated_count);
+ else
+ lazy_vacuum_index(indrel, stats, dead_tuples,
+ lvshared->reltuples);
+
+ /*
+ * Copy the index bulk-deletion result returned from ambulkdelete and
+ * amvacuumcleanup to the DSM segment if it's the first time to get it
+ * from them, because they allocate it locally and it's possible that an
+ * index will be vacuumed by the different vacuum process at the next
+ * time. The copying of the result normally happens only after the first
+ * time of index vacuuming. From the second time, we pass the result on
+ * the DSM segment so that they then update it directly.
+ *
+ * Since all vacuum workers write the bulk-deletion result at different
+ * slots we can write them without locking.
+ */
+ if (shared_indstats && !shared_indstats->updated && *stats != NULL)
+ {
+ memcpy(bulkdelete_res, *stats, sizeof(IndexBulkDeleteResult));
+ shared_indstats->updated = true;
+
+ /*
+ * Now that the stats[idx] points to the DSM segment, we don't need
+ * the locally allocated results.
+ */
+ pfree(*stats);
+ *stats = bulkdelete_res;
+ }
+}
+
+/*
+ * lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
+ *
+ * Cleanup indexes. We process the indexes serially unless we are doing
+ * parallel vacuum.
+ */
+static void
+lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVRelStats *vacrelstats, LVParallelState *lps,
+ int nindexes)
+{
+ int idx;
+
+ Assert(!IsParallelWorker());
+ Assert(nindexes > 0);
+
+ /* Report that we are now cleaning up indexes */
+ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+ PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+
+ /*
+ * If parallel vacuum is active we perform index cleanup with parallel
+ * workers.
+ */
+ if (ParallelVacuumIsActive(lps))
+ {
+ /* Tell parallel workers to do index cleanup */
+ lps->lvshared->for_cleanup = true;
+ lps->lvshared->first_time =
+ (vacrelstats->num_index_scans == 0);
+
+ /*
+ * Now we can provide a better estimate of total number of surviving
+ * tuples (we assume indexes are more interested in that than in the
+ * number of nominally live tuples).
+ */
+ lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
+ lps->lvshared->estimated_count =
+ (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+
+ lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
+ }
+ else
+ {
+ for (idx = 0; idx < nindexes; idx++)
+ lazy_cleanup_index(Irel[idx], &stats[idx],
+ vacrelstats->new_rel_tuples,
+ vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+ }
+}
/*
* lazy_vacuum_index() -- vacuum one index relation.
*
* Delete all the index entries pointing to tuples listed in
- * vacrelstats->dead_tuples, and update running statistics.
+ * dead_tuples, and update running statistics.
+ *
+ * reltuples is the number of heap tuples to be passed to the
+ * bulkdelete callback.
*/
static void
-lazy_vacuum_index(Relation indrel,
- IndexBulkDeleteResult **stats,
- LVRelStats *vacrelstats)
+lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
+ LVDeadTuples *dead_tuples, double reltuples)
{
IndexVacuumInfo ivinfo;
+ const char *msg;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -1741,30 +2323,38 @@ lazy_vacuum_index(Relation indrel,
ivinfo.report_progress = false;
ivinfo.estimated_count = true;
ivinfo.message_level = elevel;
- /* We can only provide an approximate value of num_heap_tuples here */
- ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
+ ivinfo.num_heap_tuples = reltuples;
ivinfo.strategy = vac_strategy;
/* Do bulk deletion */
*stats = index_bulk_delete(&ivinfo, *stats,
- lazy_tid_reaped, (void *) vacrelstats);
+ lazy_tid_reaped, (void *) dead_tuples);
+
+ if (IsParallelWorker())
+ msg = gettext_noop("scanned index \"%s\" to remove %d row versions by parallel vacuum worker");
+ else
+ msg = gettext_noop("scanned index \"%s\" to remove %d row versions");
ereport(elevel,
- (errmsg("scanned index \"%s\" to remove %d row versions",
+ (errmsg(msg,
RelationGetRelationName(indrel),
- vacrelstats->num_dead_tuples),
+ dead_tuples->num_tuples),
errdetail_internal("%s", pg_rusage_show(&ru0))));
}
/*
* lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
+ *
+ * reltuples is the number of heap tuples and estimated_count is true
+ * if the reltuples is an estimated value.
*/
static void
lazy_cleanup_index(Relation indrel,
- IndexBulkDeleteResult *stats,
- LVRelStats *vacrelstats)
+ IndexBulkDeleteResult **stats,
+ double reltuples, bool estimated_count)
{
IndexVacuumInfo ivinfo;
+ const char *msg;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -1772,49 +2362,33 @@ lazy_cleanup_index(Relation indrel,
ivinfo.index = indrel;
ivinfo.analyze_only = false;
ivinfo.report_progress = false;
- ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
+ ivinfo.estimated_count = estimated_count;
ivinfo.message_level = elevel;
- /*
- * Now we can provide a better estimate of total number of surviving
- * tuples (we assume indexes are more interested in that than in the
- * number of nominally live tuples).
- */
- ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
+ ivinfo.num_heap_tuples = reltuples;
ivinfo.strategy = vac_strategy;
- stats = index_vacuum_cleanup(&ivinfo, stats);
+ *stats = index_vacuum_cleanup(&ivinfo, *stats);
- if (!stats)
+ if (!(*stats))
return;
- /*
- * Now update statistics in pg_class, but only if the index says the count
- * is accurate.
- */
- if (!stats->estimated_count)
- vac_update_relstats(indrel,
- stats->num_pages,
- stats->num_index_tuples,
- 0,
- false,
- InvalidTransactionId,
- InvalidMultiXactId,
- false);
+ if (IsParallelWorker())
+ msg = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages as reported by parallel vacuum worker");
+ else
+ msg = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages");
ereport(elevel,
- (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+ (errmsg(msg,
RelationGetRelationName(indrel),
- stats->num_index_tuples,
- stats->num_pages),
+ (*stats)->num_index_tuples,
+ (*stats)->num_pages),
errdetail("%.0f index row versions were removed.\n"
"%u index pages have been deleted, %u are currently reusable.\n"
"%s.",
- stats->tuples_removed,
- stats->pages_deleted, stats->pages_free,
+ (*stats)->tuples_removed,
+ (*stats)->pages_deleted, (*stats)->pages_free,
pg_rusage_show(&ru0))));
-
- pfree(stats);
}
/*
@@ -2122,19 +2696,17 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
}
/*
- * lazy_space_alloc - space allocation decisions for lazy vacuum
- *
- * See the comments at the head of this file for rationale.
+ * Return the maximum number of dead tuples we can record.
*/
-static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+static long
+compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
{
long maxtuples;
int vac_work_mem = IsAutoVacuumWorkerProcess() &&
autovacuum_work_mem != -1 ?
autovacuum_work_mem : maintenance_work_mem;
- if (vacrelstats->useindex)
+ if (useindex)
{
maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
maxtuples = Min(maxtuples, INT_MAX);
@@ -2148,34 +2720,48 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
}
else
- {
maxtuples = MaxHeapTuplesPerPage;
- }
- vacrelstats->num_dead_tuples = 0;
- vacrelstats->max_dead_tuples = (int) maxtuples;
- vacrelstats->dead_tuples = (ItemPointer)
- palloc(maxtuples * sizeof(ItemPointerData));
+ return maxtuples;
+}
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ */
+static void
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+{
+ LVDeadTuples *dead_tuples = NULL;
+ long maxtuples;
+
+ maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
+
+ dead_tuples = (LVDeadTuples *) palloc(SizeOfLVDeadTuples(maxtuples));
+ dead_tuples->num_tuples = 0;
+ dead_tuples->max_tuples = (int) maxtuples;
+
+ vacrelstats->dead_tuples = dead_tuples;
}
/*
* lazy_record_dead_tuple - remember one deletable tuple
*/
static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
- ItemPointer itemptr)
+lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
{
/*
* The array shouldn't overflow under normal behavior, but perhaps it
* could if we are given a really small maintenance_work_mem. In that
* case, just forget the last few tuples (we'll get 'em next time).
*/
- if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+ if (dead_tuples->num_tuples < dead_tuples->max_tuples)
{
- vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
- vacrelstats->num_dead_tuples++;
+ dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
+ dead_tuples->num_tuples++;
pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
- vacrelstats->num_dead_tuples);
+ dead_tuples->num_tuples);
}
}
@@ -2189,12 +2775,12 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
static bool
lazy_tid_reaped(ItemPointer itemptr, void *state)
{
- LVRelStats *vacrelstats = (LVRelStats *) state;
+ LVDeadTuples *dead_tuples = (LVDeadTuples *) state;
ItemPointer res;
res = (ItemPointer) bsearch((void *) itemptr,
- (void *) vacrelstats->dead_tuples,
- vacrelstats->num_dead_tuples,
+ (void *) dead_tuples->itemptrs,
+ dead_tuples->num_tuples,
sizeof(ItemPointerData),
vac_cmp_itemptr);
@@ -2342,3 +2928,447 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
return all_visible;
}
+
+/*
+ * Compute the number of parallel worker processes to request. Both index
+ * vacuum and index cleanup can be executed with parallel workers. The index
+ * is eligible for parallel vacuum iff it's size is greater than
+ * min_parallel_index_scan_size as invoking workers for very small indexes
+ * can hurt the performance.
+ *
+ * nrequested is the number of parallel workers that user requested. If
+ * nrequested is 0, we compute the parallel degree based on nindexes, that is
+ * the number of indexes that support parallel vacuum. This function also
+ * sets can_parallel_vacuum to remember indexes that participate in parallel
+ * vacuum.
+ */
+static int
+compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
+ bool *can_parallel_vacuum)
+{
+ int nindexes_parallel = 0;
+ int nindexes_parallel_bulkdel = 0;
+ int nindexes_parallel_cleanup = 0;
+ int parallel_workers;
+ int i;
+
+ /*
+ * We don't allow to perform parallel operation in standalone backend or
+ * when parallelism is disabled.
+ */
+ if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+ return 0;
+
+ /*
+ * Compute the number of indexes that can participate in parallel vacuum.
+ */
+ for (i = 0; i < nindexes; i++)
+ {
+ uint8 vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
+
+ if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+ RelationGetNumberOfBlocks(Irel[i]) < min_parallel_index_scan_size)
+ continue;
+
+ can_parallel_vacuum[i] = true;
+
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+ nindexes_parallel_bulkdel++;
+ if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+ nindexes_parallel_cleanup++;
+ }
+
+ nindexes_parallel = Max(nindexes_parallel_bulkdel,
+ nindexes_parallel_cleanup);
+
+ /* The leader process takes one index */
+ nindexes_parallel--;
+
+ /* No index supports parallel vacuum */
+ if (nindexes_parallel <= 0)
+ return 0;
+
+ /* Compute the parallel degree */
+ parallel_workers = (nrequested > 0) ?
+ Min(nrequested, nindexes_parallel) : nindexes_parallel;
+
+ /* Cap by max_parallel_maintenance_workers */
+ parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+ return parallel_workers;
+}
+
+/*
+ * Initialize variables for shared index statistics, set NULL bitmap and the
+ * size of stats for each index.
+ */
+static void
+prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
+ int nindexes)
+{
+ int i;
+
+ /* Currently, we don't support parallel vacuum for autovacuum */
+ Assert(!IsAutoVacuumWorkerProcess());
+
+ /* Set NULL for all indexes */
+ memset(lvshared->bitmap, 0x00, BITMAPLEN(nindexes));
+
+ for (i = 0; i < nindexes; i++)
+ {
+ if (!can_parallel_vacuum[i])
+ continue;
+
+ /* Set NOT NULL as this index do support parallelism */
+ lvshared->bitmap[i >> 3] |= 1 << (i & 0x07);
+ }
+}
+
+/*
+ * Update index statistics in pg_class if the statistics is accurate.
+ */
+static void
+update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
+ int nindexes)
+{
+ int i;
+
+ Assert(!IsInParallelMode());
+
+ for (i = 0; i < nindexes; i++)
+ {
+ if (stats[i] == NULL || stats[i]->estimated_count)
+ continue;
+
+ /* Update index statistics */
+ vac_update_relstats(Irel[i],
+ stats[i]->num_pages,
+ stats[i]->num_index_tuples,
+ 0,
+ false,
+ InvalidTransactionId,
+ InvalidMultiXactId,
+ false);
+ pfree(stats[i]);
+ }
+}
+
+/*
+ * This function prepares and returns parallel vacuum state if we can launch
+ * even one worker. This function is responsible to enter parallel mode,
+ * create a parallel context, and then initialize the DSM segment.
+ */
+static LVParallelState *
+begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
+ BlockNumber nblocks, int nindexes, int nrequested)
+{
+ LVParallelState *lps = NULL;
+ ParallelContext *pcxt;
+ LVShared *shared;
+ LVDeadTuples *dead_tuples;
+ bool *can_parallel_vacuum;
+ long maxtuples;
+ char *sharedquery;
+ Size est_shared;
+ Size est_deadtuples;
+ int nindexes_mwm = 0;
+ int parallel_workers = 0;
+ int querylen;
+ int i;
+
+ /*
+ * A parallel vacuum must be requested and there must be indexes on the
+ * relation
+ */
+ Assert(nrequested >= 0);
+ Assert(nindexes > 0);
+
+ /*
+ * Compute the number of parallel vacuum workers to launch
+ */
+ can_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
+ parallel_workers = compute_parallel_vacuum_workers(Irel, nindexes,
+ nrequested,
+ can_parallel_vacuum);
+
+ /* Can't perform vacuum in parallel */
+ if (parallel_workers <= 0)
+ {
+ pfree(can_parallel_vacuum);
+ return lps;
+ }
+
+ lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
+
+ EnterParallelMode();
+ pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+ parallel_workers);
+ Assert(pcxt->nworkers > 0);
+ lps->pcxt = pcxt;
+
+ /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+ est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
+ for (i = 0; i < nindexes; i++)
+ {
+ uint8 vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
+
+ /*
+ * Cleanup option should be either disabled, always performing in
+ * parallel or conditionally performing in parallel.
+ */
+ Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+ Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+ /* Skip indexes that don't participate in parallel vacuum */
+ if (!can_parallel_vacuum[i])
+ continue;
+
+ if (Irel[i]->rd_indam->amusemaintenanceworkmem)
+ nindexes_mwm++;
+
+ est_shared = add_size(est_shared, sizeof(LVSharedIndStats));
+
+ /*
+ * Remember the number of indexes that support parallel operation for
+ * each phase.
+ */
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+ lps->nindexes_parallel_bulkdel++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+ lps->nindexes_parallel_cleanup++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+ lps->nindexes_parallel_condcleanup++;
+ }
+ shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
+ maxtuples = compute_max_dead_tuples(nblocks, true);
+ est_deadtuples = MAXALIGN(SizeOfLVDeadTuples(maxtuples));
+ shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
+ querylen = strlen(debug_query_string);
+ shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ InitializeParallelDSM(pcxt);
+
+ /* Prepare shared information */
+ shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
+ MemSet(shared, 0, est_shared);
+ shared->relid = relid;
+ shared->elevel = elevel;
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+ maintenance_work_mem;
+
+ pg_atomic_init_u32(&(shared->cost_balance), 0);
+ pg_atomic_init_u32(&(shared->active_nworkers), 0);
+ pg_atomic_init_u32(&(shared->idx), 0);
+ shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
+ prepare_index_statistics(shared, can_parallel_vacuum, nindexes);
+
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+ lps->lvshared = shared;
+
+ /* Prepare the dead tuple space */
+ dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
+ dead_tuples->max_tuples = maxtuples;
+ dead_tuples->num_tuples = 0;
+ MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
+ vacrelstats->dead_tuples = dead_tuples;
+
+ /* Store query string for workers */
+ sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+ memcpy(sharedquery, debug_query_string, querylen + 1);
+ sharedquery[querylen] = '\0';
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+
+ pfree(can_parallel_vacuum);
+ return lps;
+}
+
+/*
+ * Destroy the parallel context, and end parallel mode.
+ *
+ * Since writes are not allowed during the parallel mode, so we copy the
+ * updated index statistics from DSM in local memory and then later use that
+ * to update the index statistics. One might think that we can exit from
+ * parallel mode, update the index statistics and then destroy parallel
+ * context, but that won't be safe (see ExitParallelMode).
+ */
+static void
+end_parallel_vacuum(Relation *Irel, IndexBulkDeleteResult **stats,
+ LVParallelState *lps, int nindexes)
+{
+ int i;
+
+ Assert(!IsParallelWorker());
+
+ /* Copy the updated statistics */
+ for (i = 0; i < nindexes; i++)
+ {
+ LVSharedIndStats *indstats = get_indstats(lps->lvshared, i);
+
+ /*
+ * Skip unused slot. The statistics of this index are already stored
+ * in local memory.
+ */
+ if (indstats == NULL)
+ continue;
+
+ if (indstats->updated)
+ {
+ stats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ memcpy(stats[i], &(indstats->stats), sizeof(IndexBulkDeleteResult));
+ }
+ else
+ stats[i] = NULL;
+ }
+
+ DestroyParallelContext(lps->pcxt);
+ ExitParallelMode();
+
+ /* Deactivate parallel vacuum */
+ pfree(lps);
+ lps = NULL;
+}
+
+/* Return the Nth index statistics or NULL */
+static LVSharedIndStats *
+get_indstats(LVShared *lvshared, int n)
+{
+ int i;
+ char *p;
+
+ if (IndStatsIsNull(lvshared, n))
+ return NULL;
+
+ p = (char *) GetSharedIndStats(lvshared);
+ for (i = 0; i < n; i++)
+ {
+ if (IndStatsIsNull(lvshared, i))
+ continue;
+
+ p += sizeof(LVSharedIndStats);
+ }
+
+ return (LVSharedIndStats *) p;
+}
+
+/*
+ * Returns true, if the given index can't participate in parallel index vacuum
+ * or parallel index cleanup, false, otherwise.
+ */
+static bool
+skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared)
+{
+ uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+ /* first_time must be true only if for_cleanup is true */
+ Assert(lvshared->for_cleanup || !lvshared->first_time);
+
+ if (lvshared->for_cleanup)
+ {
+ /* Skip, if the index does not support parallel cleanup */
+ if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+ return true;
+
+ /*
+ * Skip, if the index supports parallel cleanup conditionally, but we
+ * have already processed the index (for bulkdelete). See the
+ * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
+ * when indexes support parallel cleanup conditionally.
+ */
+ if (!lvshared->first_time &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+ return true;
+ }
+ else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
+ {
+ /* Skip if the index does not support parallel bulk deletion */
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers perform only index vacuum or index cleanup,
+ * we don't need to report the progress information.
+ */
+void
+parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+ Relation onerel;
+ Relation *indrels;
+ LVShared *lvshared;
+ LVDeadTuples *dead_tuples;
+ int nindexes;
+ char *sharedquery;
+ IndexBulkDeleteResult **stats;
+
+ lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
+ false);
+ elevel = lvshared->elevel;
+
+ ereport(DEBUG1,
+ (errmsg("starting parallel vacuum worker for %s",
+ lvshared->for_cleanup ? "cleanup" : "bulk delete")));
+
+ /* Set debug_query_string for individual workers */
+ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, false);
+ debug_query_string = sharedquery;
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+ /*
+ * Open table. The lock mode is the same as the leader process. It's
+ * okay because the lock mode does not conflict among the parallel
+ * workers.
+ */
+ onerel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
+
+ /*
+ * Open all indexes. indrels are sorted in order by OID, which should be
+ * matched to the leader's one.
+ */
+ vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
+ Assert(nindexes > 0);
+
+ /* Set dead tuple space */
+ dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
+ PARALLEL_VACUUM_KEY_DEAD_TUPLES,
+ false);
+
+ /* Set cost-based vacuum delay */
+ VacuumCostActive = (VacuumCostDelay > 0);
+ VacuumCostBalance = 0;
+ VacuumPageHit = 0;
+ VacuumPageMiss = 0;
+ VacuumPageDirty = 0;
+ VacuumCostBalanceLocal = 0;
+ VacuumSharedCostBalance = &(lvshared->cost_balance);
+ VacuumActiveNWorkers = &(lvshared->active_nworkers);
+
+ stats = (IndexBulkDeleteResult **)
+ palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
+
+ if (lvshared->maintenance_work_mem_worker > 0)
+ maintenance_work_mem = lvshared->maintenance_work_mem_worker;
+
+ /* Process indexes to perform vacuum/cleanup */
+ parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes);
+
+ vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+ table_close(onerel, ShareUpdateExclusiveLock);
+ pfree(stats);
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index f3e22549543..df06e7d1743 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
@@ -139,6 +140,9 @@ static const struct
},
{
"_bt_parallel_build_main", _bt_parallel_build_main
+ },
+ {
+ "parallel_vacuum_main", parallel_vacuum_main
}
};
@@ -174,6 +178,7 @@ CreateParallelContext(const char *library_name, const char *function_name,
pcxt = palloc0(sizeof(ParallelContext));
pcxt->subid = GetCurrentSubTransactionId();
pcxt->nworkers = nworkers;
+ pcxt->nworkers_to_launch = nworkers;
pcxt->library_name = pstrdup(library_name);
pcxt->function_name = pstrdup(function_name);
pcxt->error_context_stack = error_context_stack;
@@ -487,6 +492,23 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
}
/*
+ * Reinitialize parallel workers for a parallel context such that we could
+ * launch the different number of workers. This is required for cases where
+ * we need to reuse the same DSM segment, but the number of workers can
+ * vary from run-to-run.
+ */
+void
+ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
+{
+ /*
+ * The number of workers that need to be launched must be less than the
+ * number of workers with which the parallel context is initialized.
+ */
+ Assert(pcxt->nworkers >= nworkers_to_launch);
+ pcxt->nworkers_to_launch = nworkers_to_launch;
+}
+
+/*
* Launch parallel workers.
*/
void
@@ -498,7 +520,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
bool any_registrations_failed = false;
/* Skip this if we have no workers. */
- if (pcxt->nworkers == 0)
+ if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
return;
/* We need to be a lock group leader. */
@@ -533,7 +555,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
* fails. It wouldn't help much anyway, because registering the worker in
* no way guarantees that it will start up and initialize successfully.
*/
- for (i = 0; i < pcxt->nworkers; ++i)
+ for (i = 0; i < pcxt->nworkers_to_launch; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index bb34e252e45..d625d17bf46 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -42,6 +42,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker_internals.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -68,6 +69,14 @@ static MemoryContext vac_context = NULL;
static BufferAccessStrategy vac_strategy;
+/*
+ * Variables for cost-based parallel vacuum. See comments atop
+ * compute_parallel_delay to understand how it works.
+ */
+pg_atomic_uint32 *VacuumSharedCostBalance = NULL;
+pg_atomic_uint32 *VacuumActiveNWorkers = NULL;
+int VacuumCostBalanceLocal = 0;
+
/* non-export function prototypes */
static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
static List *get_all_vacuum_rels(int options);
@@ -76,6 +85,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
+static double compute_parallel_delay(void);
static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
/*
@@ -94,12 +104,16 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
bool freeze = false;
bool full = false;
bool disable_page_skipping = false;
+ bool parallel_option = false;
ListCell *lc;
/* Set default value */
params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT;
+ /* By default parallel vacuum is enabled */
+ params.nworkers = 0;
+
/* Parse options list */
foreach(lc, vacstmt->options)
{
@@ -129,6 +143,39 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
params.index_cleanup = get_vacopt_ternary_value(opt);
else if (strcmp(opt->defname, "truncate") == 0)
params.truncate = get_vacopt_ternary_value(opt);
+ else if (strcmp(opt->defname, "parallel") == 0)
+ {
+ parallel_option = true;
+ if (opt->arg == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parallel option requires a value between 0 and %d",
+ MAX_PARALLEL_WORKER_LIMIT),
+ parser_errposition(pstate, opt->location)));
+ }
+ else
+ {
+ int nworkers;
+
+ nworkers = defGetInt32(opt);
+ if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parallel vacuum degree must be between 0 and %d",
+ MAX_PARALLEL_WORKER_LIMIT),
+ parser_errposition(pstate, opt->location)));
+
+ /*
+ * Disable parallel vacuum, if user has specified parallel
+ * degree as zero.
+ */
+ if (nworkers == 0)
+ params.nworkers = -1;
+ else
+ params.nworkers = nworkers;
+ }
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -152,6 +199,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
!(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
Assert(!(params.options & VACOPT_SKIPTOAST));
+ if ((params.options & VACOPT_FULL) && parallel_option)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot specify both FULL and PARALLEL options")));
+
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present.
*/
@@ -383,6 +435,9 @@ vacuum(List *relations, VacuumParams *params,
VacuumPageHit = 0;
VacuumPageMiss = 0;
VacuumPageDirty = 0;
+ VacuumCostBalanceLocal = 0;
+ VacuumSharedCostBalance = NULL;
+ VacuumActiveNWorkers = NULL;
/*
* Loop to process each selected relation.
@@ -1941,16 +1996,26 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
void
vacuum_delay_point(void)
{
+ double msec = 0;
+
/* Always check for interrupts */
CHECK_FOR_INTERRUPTS();
- /* Nap if appropriate */
- if (VacuumCostActive && !InterruptPending &&
- VacuumCostBalance >= VacuumCostLimit)
- {
- double msec;
+ if (!VacuumCostActive || InterruptPending)
+ return;
+ /*
+ * For parallel vacuum, the delay is computed based on the shared cost
+ * balance. See compute_parallel_delay.
+ */
+ if (VacuumSharedCostBalance != NULL)
+ msec = compute_parallel_delay();
+ else if (VacuumCostBalance >= VacuumCostLimit)
msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
+
+ /* Nap if appropriate */
+ if (msec > 0)
+ {
if (msec > VacuumCostDelay * 4)
msec = VacuumCostDelay * 4;
@@ -1967,6 +2032,66 @@ vacuum_delay_point(void)
}
/*
+ * Computes the vacuum delay for parallel workers.
+ *
+ * The basic idea of a cost-based vacuum delay for parallel vacuum is to allow
+ * each worker to sleep proportional to the work done by it. We achieve this
+ * by allowing all parallel vacuum workers including the leader process to
+ * have a shared view of cost related parameters (mainly VacuumCostBalance).
+ * We allow each worker to update it as and when it has incurred any cost and
+ * then based on that decide whether it needs to sleep. We compute the time
+ * to sleep for a worker based on the cost it has incurred
+ * (VacuumCostBalanceLocal) and then reduce the VacuumSharedCostBalance by
+ * that amount. This avoids letting the workers sleep who have done less or
+ * no I/O as compared to other workers and therefore can ensure that workers
+ * who are doing more I/O got throttled more.
+ *
+ * We allow any worker to sleep only if it has performed the I/O above a
+ * certain threshold, which is calculated based on the number of active
+ * workers (VacuumActiveNWorkers), and the overall cost balance is more than
+ * VacuumCostLimit set by the system. The testing reveals that we achieve
+ * the required throttling if we allow a worker that has done more than 50%
+ * of its share of work to sleep.
+ */
+static double
+compute_parallel_delay(void)
+{
+ double msec = 0;
+ uint32 shared_balance;
+ int nworkers;
+
+ /* Parallel vacuum must be active */
+ Assert(VacuumSharedCostBalance);
+
+ nworkers = pg_atomic_read_u32(VacuumActiveNWorkers);
+
+ /* At least count itself */
+ Assert(nworkers >= 1);
+
+ /* Update the shared cost balance value atomically */
+ shared_balance = pg_atomic_add_fetch_u32(VacuumSharedCostBalance, VacuumCostBalance);
+
+ /* Compute the total local balance for the current worker */
+ VacuumCostBalanceLocal += VacuumCostBalance;
+
+ if ((shared_balance >= VacuumCostLimit) &&
+ (VacuumCostBalanceLocal > 0.5 * (VacuumCostLimit / nworkers)))
+ {
+ /* Compute sleep time based on the local cost balance */
+ msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit;
+ pg_atomic_sub_fetch_u32(VacuumSharedCostBalance, VacuumCostBalanceLocal);
+ VacuumCostBalanceLocal = 0;
+ }
+
+ /*
+ * Reset the local balance as we accumulated it into the shared value.
+ */
+ VacuumCostBalance = 0;
+
+ return msec;
+}
+
+/*
* A wrapper function of defGetBoolean().
*
* This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index f0e40e36af6..6d1f28c3276 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -2886,6 +2886,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
(!wraparound ? VACOPT_SKIP_LOCKED : 0);
tab->at_params.index_cleanup = VACOPT_TERNARY_DEFAULT;
tab->at_params.truncate = VACOPT_TERNARY_DEFAULT;
+ /* As of now, we don't support parallel vacuum for autovacuum */
+ tab->at_params.nworkers = -1;
tab->at_params.freeze_min_age = freeze_min_age;
tab->at_params.freeze_table_age = freeze_table_age;
tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index b52396c17a4..052d98b5c08 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3597,7 +3597,7 @@ psql_completion(const char *text, int start, int end)
if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
COMPLETE_WITH("FULL", "FREEZE", "ANALYZE", "VERBOSE",
"DISABLE_PAGE_SKIPPING", "SKIP_LOCKED",
- "INDEX_CLEANUP", "TRUNCATE");
+ "INDEX_CLEANUP", "TRUNCATE", "PARALLEL");
else if (TailMatches("FULL|FREEZE|ANALYZE|VERBOSE|DISABLE_PAGE_SKIPPING|SKIP_LOCKED|INDEX_CLEANUP|TRUNCATE"))
COMPLETE_WITH("ON", "OFF");
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 580b4caef7f..00a17f5f717 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -23,7 +23,9 @@
#include "nodes/lockoptions.h"
#include "nodes/primnodes.h"
#include "storage/bufpage.h"
+#include "storage/dsm.h"
#include "storage/lockdefs.h"
+#include "storage/shm_toc.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
@@ -193,6 +195,7 @@ extern Size SyncScanShmemSize(void);
struct VacuumParams;
extern void heap_vacuum_rel(Relation onerel,
struct VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
/* in heap/heapam_visibility.c */
extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 646708ba5fb..fc6a5603bb6 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -33,7 +33,8 @@ typedef struct ParallelContext
{
dlist_node node;
SubTransactionId subid;
- int nworkers;
+ int nworkers; /* Maximum number of workers to launch */
+ int nworkers_to_launch; /* Actual number of workers to launch */
int nworkers_launched;
char *library_name;
char *function_name;
@@ -63,6 +64,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name,
const char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
+extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index b3351ad4062..c27d255d8da 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -222,6 +222,13 @@ typedef struct VacuumParams
* default value depends on reloptions */
VacOptTernaryValue truncate; /* Truncate empty pages at the end,
* default value depends on reloptions */
+
+ /*
+ * The number of parallel vacuum workers. 0 by default which means choose
+ * based on the number of indexes. -1 indicates a parallel vacuum is
+ * disabled.
+ */
+ int nworkers;
} VacuumParams;
/* GUC parameters */
@@ -231,6 +238,11 @@ extern int vacuum_freeze_table_age;
extern int vacuum_multixact_freeze_min_age;
extern int vacuum_multixact_freeze_table_age;
+/* Variables for cost-based parallel vacuum */
+extern pg_atomic_uint32 *VacuumSharedCostBalance;
+extern pg_atomic_uint32 *VacuumActiveNWorkers;
+extern int VacuumCostBalanceLocal;
+
/* in commands/vacuum.c */
extern void ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel);
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index 9996d882d16..f4250a433ae 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -92,6 +92,40 @@ CONTEXT: SQL function "do_analyze" statement 1
SQL function "wrap_do_analyze" statement 1
VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+-- PARALLEL option
+CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off);
+INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i;
+CREATE INDEX btree_pvactst ON pvactst USING btree (i);
+CREATE INDEX hash_pvactst ON pvactst USING hash (i);
+CREATE INDEX brin_pvactst ON pvactst USING brin (i);
+CREATE INDEX gin_pvactst ON pvactst USING gin (a);
+CREATE INDEX gist_pvactst ON pvactst USING gist (p);
+CREATE INDEX spgist_pvactst ON pvactst USING spgist (p);
+-- VACUUM invokes parallel index cleanup
+SET min_parallel_index_scan_size to 0;
+VACUUM (PARALLEL 2) pvactst;
+-- VACUUM invokes parallel bulk-deletion
+UPDATE pvactst SET i = i WHERE i < 1000;
+VACUUM (PARALLEL 2) pvactst;
+UPDATE pvactst SET i = i WHERE i < 1000;
+VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum
+VACUUM (PARALLEL -1) pvactst; -- error
+ERROR: parallel vacuum degree must be between 0 and 1024
+LINE 1: VACUUM (PARALLEL -1) pvactst;
+ ^
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst;
+VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL
+ERROR: cannot specify both FULL and PARALLEL options
+VACUUM (PARALLEL) pvactst; -- error, cannot use PARALLEL option without parallel degree
+ERROR: parallel option requires a value between 0 and 1024
+LINE 1: VACUUM (PARALLEL) pvactst;
+ ^
+CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
+CREATE INDEX tmp_idx1 ON tmp (a);
+VACUUM (PARALLEL 1) tmp; -- disables parallel vacuum option
+WARNING: disabling parallel option of vacuum on "tmp" --- cannot vacuum temporary tables in parallel
+RESET min_parallel_index_scan_size;
+DROP TABLE pvactst;
-- INDEX_CLEANUP option
CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
-- Use uncompressed data stored in toast.
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 69987f75e9b..cf741f7b114 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -75,6 +75,37 @@ VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+-- PARALLEL option
+CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off);
+INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i;
+CREATE INDEX btree_pvactst ON pvactst USING btree (i);
+CREATE INDEX hash_pvactst ON pvactst USING hash (i);
+CREATE INDEX brin_pvactst ON pvactst USING brin (i);
+CREATE INDEX gin_pvactst ON pvactst USING gin (a);
+CREATE INDEX gist_pvactst ON pvactst USING gist (p);
+CREATE INDEX spgist_pvactst ON pvactst USING spgist (p);
+
+-- VACUUM invokes parallel index cleanup
+SET min_parallel_index_scan_size to 0;
+VACUUM (PARALLEL 2) pvactst;
+
+-- VACUUM invokes parallel bulk-deletion
+UPDATE pvactst SET i = i WHERE i < 1000;
+VACUUM (PARALLEL 2) pvactst;
+
+UPDATE pvactst SET i = i WHERE i < 1000;
+VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum
+
+VACUUM (PARALLEL -1) pvactst; -- error
+VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst;
+VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL
+VACUUM (PARALLEL) pvactst; -- error, cannot use PARALLEL option without parallel degree
+CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
+CREATE INDEX tmp_idx1 ON tmp (a);
+VACUUM (PARALLEL 1) tmp; -- disables parallel vacuum option
+RESET min_parallel_index_scan_size;
+DROP TABLE pvactst;
+
-- INDEX_CLEANUP option
CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
-- Use uncompressed data stored in toast.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index caf6b86f92a..e216de9570b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1216,7 +1216,11 @@ LPVOID
LPWSTR
LSEG
LUID
+LVDeadTuples
+LVParallelState
LVRelStats
+LVShared
+LVSharedIndStats
LWLock
LWLockHandle
LWLockMinimallyPadded