diff options
author | Robert Haas <rhaas@postgresql.org> | 2023-12-20 08:41:09 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2023-12-20 08:42:28 -0500 |
commit | 174c480508ac25568561443e6d4a82d5c1103487 (patch) | |
tree | f42caba7a5f9a468e927107a58406a28a9f28ef2 | |
parent | 00498b718564cee3530b76d860b328718aed672b (diff) | |
download | postgresql-174c480508ac25568561443e6d4a82d5c1103487.tar.gz postgresql-174c480508ac25568561443e6d4a82d5c1103487.zip |
Add a new WAL summarizer process.
When active, this process writes WAL summary files to
$PGDATA/pg_wal/summaries. Each summary file contains information for a
certain range of LSNs on a certain TLI. For each relation, it stores a
"limit block" which is 0 if a relation is created or destroyed within
a certain range of WAL records, or otherwise the shortest length to
which the relation was truncated during that range of WAL records, or
otherwise InvalidBlockNumber. In addition, it stores a list of blocks
which have been modified during that range of WAL records, but
excluding blocks which were removed by truncation after they were
modified and never subsequently modified again.
In other words, it tells us which blocks need to copied in case of an
incremental backup covering that range of WAL records. But this
doesn't yet add the capability to actually perform an incremental
backup; the next patch will do that.
A new parameter summarize_wal enables or disables this new background
process. The background process also automatically deletes summary
files that are older than wal_summarize_keep_time, if that parameter
has a non-zero value and the summarizer is configured to run.
Patch by me, with some design help from Dilip Kumar and Andres Freund.
Reviewed by Matthias van de Meent, Dilip Kumar, Jakub Wartak, Peter
Eisentraut, and Álvaro Herrera.
Discussion: http://postgr.es/m/CA+TgmoYOYZfMCyOXFyC-P+-mdrZqm5pP2N7S-r0z3_402h9rsA@mail.gmail.com
30 files changed, 3743 insertions, 11 deletions
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 44cada2b403..ee985850275 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4150,6 +4150,67 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows </variablelist> </sect2> + <sect2 id="runtime-config-wal-summarization"> + <title>WAL Summarization</title> + + <!-- + <para> + These settings control WAL summarization, a feature which must be + enabled in order to perform an + <link linkend="backup-incremental-backup">incremental backup</link>. + </para> + --> + + <variablelist> + <varlistentry id="guc-summarize-wal" xreflabel="summarize_wal"> + <term><varname>summarize_wal</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>summarize_wal</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Enables the WAL summarizer process. Note that WAL summarization can + be enabled either on a primary or on a standby. WAL summarization + cannot be enabled when <varname>wal_level</varname> is set to + <literal>minimal</literal>. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command line. + The default is <literal>off</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-wal-summary-keep-time" xreflabel="wal_summary_keep_time"> + <term><varname>wal_summary_keep_time</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>wal_summary_keep_time</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Configures the amount of time after which the WAL summarizer + automatically removes old WAL summaries. The file timestamp is used to + determine which files are old enough to remove. Typically, you should set + this comfortably higher than the time that could pass between a backup + and a later incremental backup that depends on it. WAL summaries must + be available for the entire range of WAL records between the preceding + backup and the new one being taken; if not, the incremental backup will + fail. If this parameter is set to zero, WAL summaries will not be + automatically deleted, but it is safe to manually remove files that you + know will not be required for future incremental backups. + This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command line. + The default is 10 days. If <literal>summarize_wal = off</literal>, + existing WAL summaries will not be removed regardless of the value of + this parameter, because the WAL summarizer will not run. + </para> + </listitem> + </varlistentry> + + </variablelist> + + </sect2> + </sect1> <sect1 id="runtime-config-replication"> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 56e4d6fb022..1e9019156a5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -77,6 +77,7 @@ #include "port/pg_iovec.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logical.h" #include "replication/origin.h" @@ -3592,6 +3593,43 @@ XLogGetLastRemovedSegno(void) return lastRemovedSegNo; } +/* + * Return the oldest WAL segment on the given TLI that still exists in + * XLOGDIR, or 0 if none. + */ +XLogSegNo +XLogGetOldestSegno(TimeLineID tli) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo oldest_segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID file_tli; + XLogSegNo file_segno; + + /* Ignore files that are not XLOG segments. */ + if (!IsXLogFileName(xlde->d_name)) + continue; + + /* Parse filename to get TLI and segno. */ + XLogFromFileName(xlde->d_name, &file_tli, &file_segno, + wal_segment_size); + + /* Ignore anything that's not from the TLI of interest. */ + if (tli != file_tli) + continue; + + /* If it's the oldest so far, update oldest_segno. */ + if (oldest_segno == 0 || file_segno < oldest_segno) + oldest_segno = file_segno; + } + + FreeDir(xldir); + return oldest_segno; +} /* * Update the last removed segno pointer in shared memory, to reflect that the @@ -3872,8 +3910,8 @@ RemoveXlogFile(const struct dirent *segment_de, } /* - * Verify whether pg_wal and pg_wal/archive_status exist. - * If the latter does not exist, recreate it. + * Verify whether pg_wal, pg_wal/archive_status, and pg_wal/summaries exist. + * If the latter do not exist, recreate them. * * It is not the goal of this function to verify the contents of these * directories, but to help in cases where someone has performed a cluster @@ -3916,6 +3954,26 @@ ValidateXLOGDirectoryStructure(void) (errmsg("could not create missing directory \"%s\": %m", path))); } + + /* Check for summaries */ + snprintf(path, MAXPGPATH, XLOGDIR "/summaries"); + if (stat(path, &stat_buf) == 0) + { + /* Check for weird cases where it exists but isn't a directory */ + if (!S_ISDIR(stat_buf.st_mode)) + ereport(FATAL, + (errmsg("required WAL directory \"%s\" does not exist", + path))); + } + else + { + ereport(LOG, + (errmsg("creating missing WAL directory \"%s\"", path))); + if (MakePGDirectory(path) < 0) + ereport(FATAL, + (errmsg("could not create missing directory \"%s\": %m", + path))); + } } /* @@ -5243,9 +5301,9 @@ StartupXLOG(void) #endif /* - * Verify that pg_wal and pg_wal/archive_status exist. In cases where - * someone has performed a copy for PITR, these directories may have been - * excluded and need to be re-created. + * Verify that pg_wal, pg_wal/archive_status, and pg_wal/summaries exist. + * In cases where someone has performed a copy for PITR, these directories + * may have been excluded and need to be re-created. */ ValidateXLOGDirectoryStructure(); @@ -6963,6 +7021,25 @@ CreateCheckPoint(int flags) END_CRIT_SECTION(); /* + * WAL summaries end when the next XLOG_CHECKPOINT_REDO or + * XLOG_CHECKPOINT_SHUTDOWN record is reached. This is the first point + * where (a) we're not inside of a critical section and (b) we can be + * certain that the relevant record has been flushed to disk, which must + * happen before it can be summarized. + * + * If this is a shutdown checkpoint, then this happens reasonably + * promptly: we've only just inserted and flushed the + * XLOG_CHECKPOINT_SHUTDOWN record. If this is not a shutdown checkpoint, + * then this might not be very prompt at all: the XLOG_CHECKPOINT_REDO + * record was written before we began flushing data to disk, and that + * could be many minutes ago at this point. However, we don't XLogFlush() + * after inserting that record, so we're not guaranteed that it's on disk + * until after the above call that flushes the XLOG_CHECKPOINT_ONLINE + * record. + */ + SetWalSummarizerLatch(); + + /* * Let smgr do post-checkpoint cleanup (eg, deleting old files). */ SyncPostCheckpoint(); @@ -7636,6 +7713,20 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) } } + /* + * If WAL summarization is in use, don't remove WAL that has yet to be + * summarized. + */ + keep = GetOldestUnsummarizedLSN(NULL, NULL, false); + if (keep != InvalidXLogRecPtr) + { + XLogSegNo unsummarized_segno; + + XLByteToSeg(keep, unsummarized_segno, wal_segment_size); + if (unsummarized_segno < segno) + segno = unsummarized_segno; + } + /* but, keep at least wal_keep_size if that's set */ if (wal_keep_size_mb > 0) { diff --git a/src/backend/backup/Makefile b/src/backend/backup/Makefile index b21bd8ff436..a67b3c58d47 100644 --- a/src/backend/backup/Makefile +++ b/src/backend/backup/Makefile @@ -25,6 +25,8 @@ OBJS = \ basebackup_server.o \ basebackup_sink.o \ basebackup_target.o \ - basebackup_throttle.o + basebackup_throttle.o \ + walsummary.o \ + walsummaryfuncs.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/backup/meson.build b/src/backend/backup/meson.build index 11a79bbf803..5d4ebe3ebed 100644 --- a/src/backend/backup/meson.build +++ b/src/backend/backup/meson.build @@ -12,4 +12,6 @@ backend_sources += files( 'basebackup_target.c', 'basebackup_throttle.c', 'basebackup_zstd.c', + 'walsummary.c', + 'walsummaryfuncs.c', ) diff --git a/src/backend/backup/walsummary.c b/src/backend/backup/walsummary.c new file mode 100644 index 00000000000..271d199874b --- /dev/null +++ b/src/backend/backup/walsummary.c @@ -0,0 +1,356 @@ +/*------------------------------------------------------------------------- + * + * walsummary.c + * Functions for accessing and managing WAL summary data. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/backend/backup/walsummary.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <sys/stat.h> +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "backup/walsummary.h" +#include "utils/wait_event.h" + +static bool IsWalSummaryFilename(char *filename); +static int ListComparatorForWalSummaryFiles(const ListCell *a, + const ListCell *b); + +/* + * Get a list of WAL summaries. + * + * If tli != 0, only WAL summaries with the indicated TLI will be included. + * + * If start_lsn != InvalidXLogRecPtr, only summaries that end after the + * indicated LSN will be included. + * + * If end_lsn != InvalidXLogRecPtr, only summaries that start before the + * indicated LSN will be included. + * + * The intent is that you can call GetWalSummaries(tli, start_lsn, end_lsn) + * to get all WAL summaries on the indicated timeline that overlap the + * specified LSN range. + */ +List * +GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + DIR *sdir; + struct dirent *dent; + List *result = NIL; + + sdir = AllocateDir(XLOGDIR "/summaries"); + while ((dent = ReadDir(sdir, XLOGDIR "/summaries")) != NULL) + { + WalSummaryFile *ws; + uint32 tmp[5]; + TimeLineID file_tli; + XLogRecPtr file_start_lsn; + XLogRecPtr file_end_lsn; + + /* Decode filename, or skip if it's not in the expected format. */ + if (!IsWalSummaryFilename(dent->d_name)) + continue; + sscanf(dent->d_name, "%08X%08X%08X%08X%08X", + &tmp[0], &tmp[1], &tmp[2], &tmp[3], &tmp[4]); + file_tli = tmp[0]; + file_start_lsn = ((uint64) tmp[1]) << 32 | tmp[2]; + file_end_lsn = ((uint64) tmp[3]) << 32 | tmp[4]; + + /* Skip if it doesn't match the filter criteria. */ + if (tli != 0 && tli != file_tli) + continue; + if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn >= file_end_lsn) + continue; + if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn <= file_start_lsn) + continue; + + /* Add it to the list. */ + ws = palloc(sizeof(WalSummaryFile)); + ws->tli = file_tli; + ws->start_lsn = file_start_lsn; + ws->end_lsn = file_end_lsn; + result = lappend(result, ws); + } + FreeDir(sdir); + + return result; +} + +/* + * Build a new list of WAL summaries based on an existing list, but filtering + * out summaries that don't match the search parameters. + * + * If tli != 0, only WAL summaries with the indicated TLI will be included. + * + * If start_lsn != InvalidXLogRecPtr, only summaries that end after the + * indicated LSN will be included. + * + * If end_lsn != InvalidXLogRecPtr, only summaries that start before the + * indicated LSN will be included. + */ +List * +FilterWalSummaries(List *wslist, TimeLineID tli, + XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + List *result = NIL; + ListCell *lc; + + /* Loop over input. */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + /* Skip if it doesn't match the filter criteria. */ + if (tli != 0 && tli != ws->tli) + continue; + if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn > ws->end_lsn) + continue; + if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn < ws->start_lsn) + continue; + + /* Add it to the result list. */ + result = lappend(result, ws); + } + + return result; +} + +/* + * Check whether the supplied list of WalSummaryFile objects covers the + * whole range of LSNs from start_lsn to end_lsn. This function ignores + * timelines, so the caller should probably filter using the appropriate + * timeline before calling this. + * + * If the whole range of LSNs is covered, returns true, otherwise false. + * If false is returned, *missing_lsn is set either to InvalidXLogRecPtr + * if there are no WAL summary files in the input list, or to the first LSN + * in the range that is not covered by a WAL summary file in the input list. + */ +bool +WalSummariesAreComplete(List *wslist, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, XLogRecPtr *missing_lsn) +{ + XLogRecPtr current_lsn = start_lsn; + ListCell *lc; + + /* Special case for empty list. */ + if (wslist == NIL) + { + *missing_lsn = InvalidXLogRecPtr; + return false; + } + + /* Make a private copy of the list and sort it by start LSN. */ + wslist = list_copy(wslist); + list_sort(wslist, ListComparatorForWalSummaryFiles); + + /* + * Consider summary files in order of increasing start_lsn, advancing the + * known-summarized range from start_lsn toward end_lsn. + * + * Normally, the summary files should cover non-overlapping WAL ranges, + * but this algorithm is intended to be correct even in case of overlap. + */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + if (ws->start_lsn > current_lsn) + { + /* We found a gap. */ + break; + } + if (ws->end_lsn > current_lsn) + { + /* + * Next summary extends beyond end of previous summary, so extend + * the end of the range known to be summarized. + */ + current_lsn = ws->end_lsn; + + /* + * If the range we know to be summarized has reached the required + * end LSN, we have proved completeness. + */ + if (current_lsn >= end_lsn) + return true; + } + } + + /* + * We either ran out of summary files without reaching the end LSN, or we + * hit a gap in the sequence that resulted in us bailing out of the loop + * above. + */ + *missing_lsn = current_lsn; + return false; +} + +/* + * Open a WAL summary file. + * + * This will throw an error in case of trouble. As an exception, if + * missing_ok = true and the trouble is specifically that the file does + * not exist, it will not throw an error and will return a value less than 0. + */ +File +OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok) +{ + char path[MAXPGPATH]; + File file; + + snprintf(path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + ws->tli, + LSN_FORMAT_ARGS(ws->start_lsn), + LSN_FORMAT_ARGS(ws->end_lsn)); + + file = PathNameOpenFile(path, O_RDONLY); + if (file < 0 && (errno != EEXIST || !missing_ok)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + return file; +} + +/* + * Remove a WAL summary file if the last modification time precedes the + * cutoff time. + */ +void +RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time) +{ + char path[MAXPGPATH]; + struct stat statbuf; + + snprintf(path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + ws->tli, + LSN_FORMAT_ARGS(ws->start_lsn), + LSN_FORMAT_ARGS(ws->end_lsn)); + + if (lstat(path, &statbuf) != 0) + { + if (errno == ENOENT) + return; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + } + if (statbuf.st_mtime >= cutoff_time) + return; + if (unlink(path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + ereport(DEBUG2, + (errmsg_internal("removing file \"%s\"", path))); +} + +/* + * Test whether a filename looks like a WAL summary file. + */ +static bool +IsWalSummaryFilename(char *filename) +{ + return strspn(filename, "0123456789ABCDEF") == 40 && + strcmp(filename + 40, ".summary") == 0; +} + +/* + * Data read callback for use with CreateBlockRefTableReader. + */ +int +ReadWalSummary(void *wal_summary_io, void *data, int length) +{ + WalSummaryIO *io = wal_summary_io; + int nbytes; + + nbytes = FileRead(io->file, data, length, io->filepos, + WAIT_EVENT_WAL_SUMMARY_READ); + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(io->file)))); + + io->filepos += nbytes; + return nbytes; +} + +/* + * Data write callback for use with WriteBlockRefTable. + */ +int +WriteWalSummary(void *wal_summary_io, void *data, int length) +{ + WalSummaryIO *io = wal_summary_io; + int nbytes; + + nbytes = FileWrite(io->file, data, length, io->filepos, + WAIT_EVENT_WAL_SUMMARY_WRITE); + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + FilePathName(io->file)))); + if (nbytes != length) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u", + FilePathName(io->file), nbytes, + length, (unsigned) io->filepos), + errhint("Check free disk space."))); + + io->filepos += nbytes; + return nbytes; +} + +/* + * Error-reporting callback for use with CreateBlockRefTableReader. + */ +void +ReportWalSummaryError(void *callback_arg, char *fmt,...) +{ + StringInfoData buf; + va_list ap; + int needed; + + initStringInfo(&buf); + for (;;) + { + va_start(ap, fmt); + needed = appendStringInfoVA(&buf, fmt, ap); + va_end(ap); + if (needed == 0) + break; + enlargeStringInfo(&buf, needed); + } + ereport(ERROR, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("%s", buf.data)); +} + +/* + * Comparator to sort a List of WalSummaryFile objects by start_lsn. + */ +static int +ListComparatorForWalSummaryFiles(const ListCell *a, const ListCell *b) +{ + WalSummaryFile *ws1 = lfirst(a); + WalSummaryFile *ws2 = lfirst(b); + + if (ws1->start_lsn < ws2->start_lsn) + return -1; + if (ws1->start_lsn > ws2->start_lsn) + return 1; + return 0; +} diff --git a/src/backend/backup/walsummaryfuncs.c b/src/backend/backup/walsummaryfuncs.c new file mode 100644 index 00000000000..a1f69ad4bac --- /dev/null +++ b/src/backend/backup/walsummaryfuncs.c @@ -0,0 +1,169 @@ +/*------------------------------------------------------------------------- + * + * walsummaryfuncs.c + * SQL-callable functions for accessing WAL summary data. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/backend/backup/walsummaryfuncs.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "backup/walsummary.h" +#include "common/blkreftable.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" + +#define NUM_WS_ATTS 3 +#define NUM_SUMMARY_ATTS 6 +#define MAX_BLOCKS_PER_CALL 256 + +/* + * List the WAL summary files available in pg_wal/summaries. + */ +Datum +pg_available_wal_summaries(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsi; + List *wslist; + ListCell *lc; + Datum values[NUM_WS_ATTS]; + bool nulls[NUM_WS_ATTS]; + + InitMaterializedSRF(fcinfo, 0); + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + + memset(nulls, 0, sizeof(nulls)); + + wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr); + foreach(lc, wslist) + { + WalSummaryFile *ws = (WalSummaryFile *) lfirst(lc); + HeapTuple tuple; + + CHECK_FOR_INTERRUPTS(); + + values[0] = Int64GetDatum((int64) ws->tli); + values[1] = LSNGetDatum(ws->start_lsn); + values[2] = LSNGetDatum(ws->end_lsn); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + + return (Datum) 0; +} + +/* + * List the contents of a WAL summary file identified by TLI, start LSN, + * and end LSN. + */ +Datum +pg_wal_summary_contents(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsi; + Datum values[NUM_SUMMARY_ATTS]; + bool nulls[NUM_SUMMARY_ATTS]; + WalSummaryFile ws; + WalSummaryIO io; + BlockRefTableReader *reader; + int64 raw_tli; + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber limit_block; + + InitMaterializedSRF(fcinfo, 0); + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + memset(nulls, 0, sizeof(nulls)); + + /* + * Since the timeline could at least in theory be more than 2^31, and + * since we don't have unsigned types at the SQL level, it is passed as a + * 64-bit integer. Test whether it's out of range. + */ + raw_tli = PG_GETARG_INT64(0); + if (raw_tli < 1 || raw_tli > PG_INT32_MAX) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeline %lld", (long long) raw_tli)); + + /* Prepare to read the specified WAL summry file. */ + ws.tli = (TimeLineID) raw_tli; + ws.start_lsn = PG_GETARG_LSN(1); + ws.end_lsn = PG_GETARG_LSN(2); + io.filepos = 0; + io.file = OpenWalSummaryFile(&ws, false); + reader = CreateBlockRefTableReader(ReadWalSummary, &io, + FilePathName(io.file), + ReportWalSummaryError, NULL); + + /* Loop over relation forks. */ + while (BlockRefTableReaderNextRelation(reader, &rlocator, &forknum, + &limit_block)) + { + BlockNumber blocks[MAX_BLOCKS_PER_CALL]; + HeapTuple tuple; + + CHECK_FOR_INTERRUPTS(); + + values[0] = ObjectIdGetDatum(rlocator.relNumber); + values[1] = ObjectIdGetDatum(rlocator.spcOid); + values[2] = ObjectIdGetDatum(rlocator.dbOid); + values[3] = Int16GetDatum((int16) forknum); + + /* Loop over blocks within the current relation fork. */ + while (1) + { + unsigned nblocks; + unsigned i; + + CHECK_FOR_INTERRUPTS(); + + nblocks = BlockRefTableReaderGetBlocks(reader, blocks, + MAX_BLOCKS_PER_CALL); + if (nblocks == 0) + break; + + /* + * For each block that we specifically know to have been modified, + * emit a row with that block number and limit_block = false. + */ + values[5] = BoolGetDatum(false); + for (i = 0; i < nblocks; ++i) + { + values[4] = Int64GetDatum((int64) blocks[i]); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + + /* + * If the limit block is not InvalidBlockNumber, emit an exta row + * with that block number and limit_block = true. + * + * There is no point in doing this when the limit_block is + * InvalidBlockNumber, because no block with that number or any + * higher number can ever exist. + */ + if (BlockNumberIsValid(limit_block)) + { + values[4] = Int64GetDatum((int64) limit_block); + values[5] = BoolGetDatum(true); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + } + } + + /* Cleanup */ + DestroyBlockRefTableReader(reader); + FileClose(io.file); + + return (Datum) 0; +} diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 047448b34eb..367a46c6177 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -24,6 +24,7 @@ OBJS = \ postmaster.o \ startup.o \ syslogger.o \ + walsummarizer.o \ walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c index bae6f68c402..5f244216a63 100644 --- a/src/backend/postmaster/auxprocess.c +++ b/src/backend/postmaster/auxprocess.c @@ -21,6 +21,7 @@ #include "postmaster/auxprocess.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/walreceiver.h" #include "storage/bufmgr.h" @@ -80,6 +81,9 @@ AuxiliaryProcessMain(AuxProcType auxtype) case WalReceiverProcess: MyBackendType = B_WAL_RECEIVER; break; + case WalSummarizerProcess: + MyBackendType = B_WAL_SUMMARIZER; + break; default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); MyBackendType = B_INVALID; @@ -158,6 +162,10 @@ AuxiliaryProcessMain(AuxProcType auxtype) WalReceiverMain(); proc_exit(1); + case WalSummarizerProcess: + WalSummarizerMain(); + proc_exit(1); + default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); proc_exit(1); diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build index cda921fd10b..a30eb6692f9 100644 --- a/src/backend/postmaster/meson.build +++ b/src/backend/postmaster/meson.build @@ -12,5 +12,6 @@ backend_sources += files( 'postmaster.c', 'startup.c', 'syslogger.c', + 'walsummarizer.c', 'walwriter.c', ) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 651b85ea746..b163e89cbb5 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -113,6 +113,7 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -250,6 +251,7 @@ static pid_t StartupPID = 0, CheckpointerPID = 0, WalWriterPID = 0, WalReceiverPID = 0, + WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, SysLoggerPID = 0; @@ -441,6 +443,7 @@ static bool CreateOptsFile(int argc, char *argv[], char *fullprogname); static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); +static void MaybeStartWalSummarizer(void); static void InitPostmasterDeathWatchHandle(void); /* @@ -564,6 +567,7 @@ static void ShmemBackendArrayRemove(Backend *bn); #define StartCheckpointer() StartChildProcess(CheckpointerProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) #define StartWalReceiver() StartChildProcess(WalReceiverProcess) +#define StartWalSummarizer() StartChildProcess(WalSummarizerProcess) /* Macros to check exit status of a child process */ #define EXIT_STATUS_0(st) ((st) == 0) @@ -933,6 +937,9 @@ PostmasterMain(int argc, char *argv[]) if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"replica\" or \"logical\""))); + if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL) + ereport(ERROR, + (errmsg("WAL cannot be summarized when wal_level is \"minimal\""))); /* * Other one-time internal sanity checks can go here, if they are fast. @@ -1835,6 +1842,9 @@ ServerLoop(void) if (WalReceiverRequested) MaybeStartWalReceiver(); + /* If we need to start a WAL summarizer, try to do that now */ + MaybeStartWalSummarizer(); + /* Get other worker processes running, if needed */ if (StartWorkerNeeded || HaveCrashedWorker) maybe_start_bgworkers(); @@ -2659,6 +2669,8 @@ process_pm_reload_request(void) signal_child(WalWriterPID, SIGHUP); if (WalReceiverPID != 0) signal_child(WalReceiverPID, SIGHUP); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, SIGHUP); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGHUP); if (PgArchPID != 0) @@ -3012,6 +3024,7 @@ process_pm_child_exit(void) BgWriterPID = StartBackgroundWriter(); if (WalWriterPID == 0) WalWriterPID = StartWalWriter(); + MaybeStartWalSummarizer(); /* * Likewise, start other special children as needed. In a restart @@ -3131,6 +3144,20 @@ process_pm_child_exit(void) } /* + * Was it the wal summarizer? Normal exit can be ignored; we'll start + * a new one at the next iteration of the postmaster's main loop, if + * necessary. Any other exit condition is treated as a crash. + */ + if (pid == WalSummarizerPID) + { + WalSummarizerPID = 0; + if (!EXIT_STATUS_0(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("WAL summarizer process")); + continue; + } + + /* * Was it the autovacuum launcher? Normal exit can be ignored; we'll * start a new one at the next iteration of the postmaster's main * loop, if necessary. Any other exit condition is treated as a @@ -3525,6 +3552,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (WalReceiverPID != 0 && take_action) sigquit_child(WalReceiverPID); + /* Take care of the walsummarizer too */ + if (pid == WalSummarizerPID) + WalSummarizerPID = 0; + else if (WalSummarizerPID != 0 && take_action) + sigquit_child(WalSummarizerPID); + /* Take care of the autovacuum launcher too */ if (pid == AutoVacPID) AutoVacPID = 0; @@ -3675,6 +3708,8 @@ PostmasterStateMachine(void) signal_child(StartupPID, SIGTERM); if (WalReceiverPID != 0) signal_child(WalReceiverPID, SIGTERM); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3701,6 +3736,7 @@ PostmasterStateMachine(void) if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && WalReceiverPID == 0 && + WalSummarizerPID == 0 && BgWriterPID == 0 && (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && @@ -3798,6 +3834,7 @@ PostmasterStateMachine(void) /* These other guys should be dead already */ Assert(StartupPID == 0); Assert(WalReceiverPID == 0); + Assert(WalSummarizerPID == 0); Assert(BgWriterPID == 0); Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); @@ -4019,6 +4056,8 @@ TerminateChildren(int signal) signal_child(WalWriterPID, signal); if (WalReceiverPID != 0) signal_child(WalReceiverPID, signal); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, signal); if (AutoVacPID != 0) signal_child(AutoVacPID, signal); if (PgArchPID != 0) @@ -5326,6 +5365,10 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork WAL receiver process: %m"))); break; + case WalSummarizerProcess: + ereport(LOG, + (errmsg("could not fork WAL summarizer process: %m"))); + break; default: ereport(LOG, (errmsg("could not fork process: %m"))); @@ -5462,6 +5505,19 @@ MaybeStartWalReceiver(void) } } +/* + * MaybeStartWalSummarizer + * Start the WAL summarizer process, if not running and our state allows. + */ +static void +MaybeStartWalSummarizer(void) +{ + if (summarize_wal && WalSummarizerPID == 0 && + (pmState == PM_RUN || pmState == PM_HOT_STANDBY) && + Shutdown <= SmartShutdown) + WalSummarizerPID = StartWalSummarizer(); +} + /* * Create the opts file diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c new file mode 100644 index 00000000000..9fa155349e8 --- /dev/null +++ b/src/backend/postmaster/walsummarizer.c @@ -0,0 +1,1398 @@ +/*------------------------------------------------------------------------- + * + * walsummarizer.c + * + * Background process to perform WAL summarization, if it is enabled. + * It continuously scans the write-ahead log and periodically emits a + * summary file which indicates which blocks in which relation forks + * were modified by WAL records in the LSN range covered by the summary + * file. See walsummary.c and blkreftable.c for more details on the + * naming and contents of WAL summary files. + * + * If configured to do, this background process will also remove WAL + * summary files when the file timestamp is older than a configurable + * threshold (but only if the WAL has been removed first). + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/postmaster/walsummarizer.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/timeline.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "backup/walsummary.h" +#include "catalog/storage_xlog.h" +#include "common/blkreftable.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "postmaster/bgwriter.h" +#include "postmaster/interrupt.h" +#include "postmaster/walsummarizer.h" +#include "replication/walreceiver.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/wait_event.h" + +/* + * Data in shared memory related to WAL summarization. + */ +typedef struct +{ + /* + * These fields are protected by WALSummarizerLock. + * + * Until we've discovered what summary files already exist on disk and + * stored that information in shared memory, initialized is false and the + * other fields here contain no meaningful information. After that has + * been done, initialized is true. + * + * summarized_tli and summarized_lsn indicate the last LSN and TLI at + * which the next summary file will start. Normally, these are the LSN and + * TLI at which the last file ended; in such case, lsn_is_exact is true. + * If, however, the LSN is just an approximation, then lsn_is_exact is + * false. This can happen if, for example, there are no existing WAL + * summary files at startup. In that case, we have to derive the position + * at which to start summarizing from the WAL files that exist on disk, + * and so the LSN might point to the start of the next file even though + * that might happen to be in the middle of a WAL record. + * + * summarizer_pgprocno is the pgprocno value for the summarizer process, + * if one is running, or else INVALID_PGPROCNO. + * + * pending_lsn is used by the summarizer to advertise the ending LSN of a + * record it has recently read. It shouldn't ever be less than + * summarized_lsn, but might be greater, because the summarizer buffers + * data for a range of LSNs in memory before writing out a new file. + */ + bool initialized; + TimeLineID summarized_tli; + XLogRecPtr summarized_lsn; + bool lsn_is_exact; + int summarizer_pgprocno; + XLogRecPtr pending_lsn; + + /* + * This field handles its own synchronizaton. + */ + ConditionVariable summary_file_cv; +} WalSummarizerData; + +/* + * Private data for our xlogreader's page read callback. + */ +typedef struct +{ + TimeLineID tli; + bool historic; + XLogRecPtr read_upto; + bool end_of_wal; +} SummarizerReadLocalXLogPrivate; + +/* Pointer to shared memory state. */ +static WalSummarizerData *WalSummarizerCtl; + +/* + * When we reach end of WAL and need to read more, we sleep for a number of + * milliseconds that is a integer multiple of MS_PER_SLEEP_QUANTUM. This is + * the multiplier. It should vary between 1 and MAX_SLEEP_QUANTA, depending + * on system activity. See summarizer_wait_for_wal() for how we adjust this. + */ +static long sleep_quanta = 1; + +/* + * The sleep time will always be a multiple of 200ms and will not exceed + * thirty seconds (150 * 200 = 30 * 1000). Note that the timeout here needs + * to be substntially less than the maximum amount of time for which an + * incremental backup will wait for this process to catch up. Otherwise, an + * incremental backup might time out on an idle system just because we sleep + * for too long. + */ +#define MAX_SLEEP_QUANTA 150 +#define MS_PER_SLEEP_QUANTUM 200 + +/* + * This is a count of the number of pages of WAL that we've read since the + * last time we waited for more WAL to appear. + */ +static long pages_read_since_last_sleep = 0; + +/* + * Most recent RedoRecPtr value observed by MaybeRemoveOldWalSummaries. + */ +static XLogRecPtr redo_pointer_at_last_summary_removal = InvalidXLogRecPtr; + +/* + * GUC parameters + */ +bool summarize_wal = false; +int wal_summary_keep_time = 10 * 24 * 60; + +static XLogRecPtr GetLatestLSN(TimeLineID *tli); +static void HandleWalSummarizerInterrupts(void); +static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, + bool exact, XLogRecPtr switch_lsn, + XLogRecPtr maximum_lsn); +static void SummarizeSmgrRecord(XLogReaderState *xlogreader, + BlockRefTable *brtab); +static void SummarizeXactRecord(XLogReaderState *xlogreader, + BlockRefTable *brtab); +static bool SummarizeXlogRecord(XLogReaderState *xlogreader); +static int summarizer_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, + int reqLen, + XLogRecPtr targetRecPtr, + char *cur_page); +static void summarizer_wait_for_wal(void); +static void MaybeRemoveOldWalSummaries(void); + +/* + * Amount of shared memory required for this module. + */ +Size +WalSummarizerShmemSize(void) +{ + return sizeof(WalSummarizerData); +} + +/* + * Create or attach to shared memory segment for this module. + */ +void +WalSummarizerShmemInit(void) +{ + bool found; + + WalSummarizerCtl = (WalSummarizerData *) + ShmemInitStruct("Wal Summarizer Ctl", WalSummarizerShmemSize(), + &found); + + if (!found) + { + /* + * First time through, so initialize. + * + * We're just filling in dummy values here -- the real initialization + * will happen when GetOldestUnsummarizedLSN() is called for the first + * time. + */ + WalSummarizerCtl->initialized = false; + WalSummarizerCtl->summarized_tli = 0; + WalSummarizerCtl->summarized_lsn = InvalidXLogRecPtr; + WalSummarizerCtl->lsn_is_exact = false; + WalSummarizerCtl->summarizer_pgprocno = INVALID_PGPROCNO; + WalSummarizerCtl->pending_lsn = InvalidXLogRecPtr; + ConditionVariableInit(&WalSummarizerCtl->summary_file_cv); + } +} + +/* + * Entry point for walsummarizer process. + */ +void +WalSummarizerMain(void) +{ + sigjmp_buf local_sigjmp_buf; + MemoryContext context; + + /* + * Within this function, 'current_lsn' and 'current_tli' refer to the + * point from which the next WAL summary file should start. 'exact' is + * true if 'current_lsn' is known to be the start of a WAL recod or WAL + * segment, and false if it might be in the middle of a record someplace. + * + * 'switch_lsn' and 'switch_tli', if set, are the LSN at which we need to + * switch to a new timeline and the timeline to which we need to switch. + * If not set, we either haven't figured out the answers yet or we're + * already on the latest timeline. + */ + XLogRecPtr current_lsn; + TimeLineID current_tli; + bool exact; + XLogRecPtr switch_lsn = InvalidXLogRecPtr; + TimeLineID switch_tli = 0; + + ereport(DEBUG1, + (errmsg_internal("WAL summarizer started"))); + + /* + * Properly accept or ignore signals the postmaster might send us + * + * We have no particular use for SIGINT at the moment, but seems + * reasonable to treat like SIGTERM. + */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + /* SIGQUIT handler was already set up by InitPostmasterChild */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); /* not used */ + + /* Advertise ourselves. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + WalSummarizerCtl->summarizer_pgprocno = MyProc->pgprocno; + LWLockRelease(WALSummarizerLock); + + /* Create and switch to a memory context that we can reset on error. */ + context = AllocSetContextCreate(TopMemoryContext, + "Wal Summarizer", + ALLOCSET_DEFAULT_SIZES); + MemoryContextSwitchTo(context); + + /* + * Reset some signals that are accepted by postmaster but not here + */ + pqsignal(SIGCHLD, SIG_DFL); + + /* + * If an exception is encountered, processing resumes here. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Release resources we might have acquired. */ + LWLockReleaseAll(); + ConditionVariableCancelSleep(); + pgstat_report_wait_end(); + ReleaseAuxProcessResources(false); + AtEOXact_Files(false); + AtEOXact_HashTables(false); + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(context); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextReset(context); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep for 10 seconds before attempting to resume operations in + * order to avoid excessing logging. + * + * Many of the likely error conditions are things that will repeat + * every time. For example, if the WAL can't be read or the summary + * can't be written, only administrator action will cure the problem. + * So a really fast retry time doesn't seem to be especially + * beneficial, and it will clutter the logs. + */ + (void) WaitLatch(MyLatch, + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10000, + WAIT_EVENT_WAL_SUMMARIZER_ERROR); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + /* + * Fetch information about previous progress from shared memory, and ask + * GetOldestUnsummarizedLSN to reset pending_lsn to summarized_lsn. We + * might be recovering from an error, and if so, pending_lsn might have + * advanced past summarized_lsn, but any WAL we read previously has been + * lost and will need to be reread. + * + * If we discover that WAL summarization is not enabled, just exit. + */ + current_lsn = GetOldestUnsummarizedLSN(¤t_tli, &exact, true); + if (XLogRecPtrIsInvalid(current_lsn)) + proc_exit(0); + + /* + * Loop forever + */ + for (;;) + { + XLogRecPtr latest_lsn; + TimeLineID latest_tli; + XLogRecPtr end_of_summary_lsn; + + /* Flush any leaked data in the top-level context */ + MemoryContextReset(context); + + /* Process any signals received recently. */ + HandleWalSummarizerInterrupts(); + + /* If it's time to remove any old WAL summaries, do that now. */ + MaybeRemoveOldWalSummaries(); + + /* Find the LSN and TLI up to which we can safely summarize. */ + latest_lsn = GetLatestLSN(&latest_tli); + + /* + * If we're summarizing a historic timeline and we haven't yet + * computed the point at which to switch to the next timeline, do that + * now. + * + * Note that if this is a standby, what was previously the current + * timeline could become historic at any time. + * + * We could try to make this more efficient by caching the results of + * readTimeLineHistory when latest_tli has not changed, but since we + * only have to do this once per timeline switch, we probably wouldn't + * save any significant amount of work in practice. + */ + if (current_tli != latest_tli && XLogRecPtrIsInvalid(switch_lsn)) + { + List *tles = readTimeLineHistory(latest_tli); + + switch_lsn = tliSwitchPoint(current_tli, tles, &switch_tli); + ereport(DEBUG1, + errmsg("switch point from TLI %u to TLI %u is at %X/%X", + current_tli, switch_tli, LSN_FORMAT_ARGS(switch_lsn))); + } + + /* + * If we've reached the switch LSN, we can't summarize anything else + * on this timeline. Switch to the next timeline and go around again. + */ + if (!XLogRecPtrIsInvalid(switch_lsn) && current_lsn >= switch_lsn) + { + current_tli = switch_tli; + switch_lsn = InvalidXLogRecPtr; + switch_tli = 0; + continue; + } + + /* Summarize WAL. */ + end_of_summary_lsn = SummarizeWAL(current_tli, + current_lsn, exact, + switch_lsn, latest_lsn); + Assert(!XLogRecPtrIsInvalid(end_of_summary_lsn)); + Assert(end_of_summary_lsn >= current_lsn); + + /* + * Update state for next loop iteration. + * + * Next summary file should start from exactly where this one ended. + */ + current_lsn = end_of_summary_lsn; + exact = true; + + /* Update state in shared memory. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + Assert(WalSummarizerCtl->pending_lsn <= end_of_summary_lsn); + WalSummarizerCtl->summarized_lsn = end_of_summary_lsn; + WalSummarizerCtl->summarized_tli = current_tli; + WalSummarizerCtl->lsn_is_exact = true; + WalSummarizerCtl->pending_lsn = end_of_summary_lsn; + LWLockRelease(WALSummarizerLock); + + /* Wake up anyone waiting for more summary files to be written. */ + ConditionVariableBroadcast(&WalSummarizerCtl->summary_file_cv); + } +} + +/* + * Get the oldest LSN in this server's timeline history that has not yet been + * summarized. + * + * If *tli != NULL, it will be set to the TLI for the LSN that is returned. + * + * If *lsn_is_exact != NULL, it will be set to true if the returned LSN is + * necessarily the start of a WAL record and false if it's just the beginning + * of a WAL segment. + * + * If reset_pending_lsn is true, resets the pending_lsn in shared memory to + * be equal to the summarized_lsn. + */ +XLogRecPtr +GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact, + bool reset_pending_lsn) +{ + TimeLineID latest_tli; + LWLockMode mode = reset_pending_lsn ? LW_EXCLUSIVE : LW_SHARED; + int n; + List *tles; + XLogRecPtr unsummarized_lsn; + TimeLineID unsummarized_tli = 0; + bool should_make_exact = false; + List *existing_summaries; + ListCell *lc; + + /* If not summarizing WAL, do nothing. */ + if (!summarize_wal) + return InvalidXLogRecPtr; + + /* + * Unless we need to reset the pending_lsn, we initally acquire the lock + * in shared mode and try to fetch the required information. If we acquire + * in shared mode and find that the data structure hasn't been + * initialized, we reacquire the lock in exclusive mode so that we can + * initialize it. However, if someone else does that first before we get + * the lock, then we can just return the requested information after all. + */ + while (1) + { + LWLockAcquire(WALSummarizerLock, mode); + + if (WalSummarizerCtl->initialized) + { + unsummarized_lsn = WalSummarizerCtl->summarized_lsn; + if (tli != NULL) + *tli = WalSummarizerCtl->summarized_tli; + if (lsn_is_exact != NULL) + *lsn_is_exact = WalSummarizerCtl->lsn_is_exact; + if (reset_pending_lsn) + WalSummarizerCtl->pending_lsn = + WalSummarizerCtl->summarized_lsn; + LWLockRelease(WALSummarizerLock); + return unsummarized_lsn; + } + + if (mode == LW_EXCLUSIVE) + break; + + LWLockRelease(WALSummarizerLock); + mode = LW_EXCLUSIVE; + } + + /* + * The data structure needs to be initialized, and we are the first to + * obtain the lock in exclusive mode, so it's our job to do that + * initialization. + * + * So, find the oldest timeline on which WAL still exists, and the + * earliest segment for which it exists. + */ + (void) GetLatestLSN(&latest_tli); + tles = readTimeLineHistory(latest_tli); + for (n = list_length(tles) - 1; n >= 0; --n) + { + TimeLineHistoryEntry *tle = list_nth(tles, n); + XLogSegNo oldest_segno; + + oldest_segno = XLogGetOldestSegno(tle->tli); + if (oldest_segno != 0) + { + /* Compute oldest LSN that still exists on disk. */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, + unsummarized_lsn); + + unsummarized_tli = tle->tli; + break; + } + } + + /* It really should not be possible for us to find no WAL. */ + if (unsummarized_tli == 0) + ereport(ERROR, + errcode(ERRCODE_INTERNAL_ERROR), + errmsg_internal("no WAL found on timeline %d", latest_tli)); + + /* + * Don't try to summarize anything older than the end LSN of the newest + * summary file that exists for this timeline. + */ + existing_summaries = + GetWalSummaries(unsummarized_tli, + InvalidXLogRecPtr, InvalidXLogRecPtr); + foreach(lc, existing_summaries) + { + WalSummaryFile *ws = lfirst(lc); + + if (ws->end_lsn > unsummarized_lsn) + { + unsummarized_lsn = ws->end_lsn; + should_make_exact = true; + } + } + + /* Update shared memory with the discovered values. */ + WalSummarizerCtl->initialized = true; + WalSummarizerCtl->summarized_lsn = unsummarized_lsn; + WalSummarizerCtl->summarized_tli = unsummarized_tli; + WalSummarizerCtl->lsn_is_exact = should_make_exact; + WalSummarizerCtl->pending_lsn = unsummarized_lsn; + + /* Also return the to the caller as required. */ + if (tli != NULL) + *tli = WalSummarizerCtl->summarized_tli; + if (lsn_is_exact != NULL) + *lsn_is_exact = WalSummarizerCtl->lsn_is_exact; + LWLockRelease(WALSummarizerLock); + + return unsummarized_lsn; +} + +/* + * Attempt to set the WAL summarizer's latch. + * + * This might not work, because there's no guarantee that the WAL summarizer + * process was successfully started, and it also might have started but + * subsequently terminated. So, under normal circumstances, this will get the + * latch set, but there's no guarantee. + */ +void +SetWalSummarizerLatch(void) +{ + int pgprocno; + + if (WalSummarizerCtl == NULL) + return; + + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + pgprocno = WalSummarizerCtl->summarizer_pgprocno; + LWLockRelease(WALSummarizerLock); + + if (pgprocno != INVALID_PGPROCNO) + SetLatch(&ProcGlobal->allProcs[pgprocno].procLatch); +} + +/* + * Wait until WAL summarization reaches the given LSN, but not longer than + * the given timeout. + * + * The return value is the first still-unsummarized LSN. If it's greater than + * or equal to the passed LSN, then that LSN was reached. If not, we timed out. + * + * Either way, *pending_lsn is set to the value taken from WalSummarizerCtl. + */ +XLogRecPtr +WaitForWalSummarization(XLogRecPtr lsn, long timeout, XLogRecPtr *pending_lsn) +{ + TimestampTz start_time = GetCurrentTimestamp(); + TimestampTz deadline = TimestampTzPlusMilliseconds(start_time, timeout); + XLogRecPtr summarized_lsn; + + Assert(!XLogRecPtrIsInvalid(lsn)); + Assert(timeout > 0); + + while (1) + { + TimestampTz now; + long remaining_timeout; + + /* + * If the LSN summarized on disk has reached the target value, stop. + */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + summarized_lsn = WalSummarizerCtl->summarized_lsn; + *pending_lsn = WalSummarizerCtl->pending_lsn; + LWLockRelease(WALSummarizerLock); + if (summarized_lsn >= lsn) + break; + + /* Timeout reached? If yes, stop. */ + now = GetCurrentTimestamp(); + remaining_timeout = TimestampDifferenceMilliseconds(now, deadline); + if (remaining_timeout <= 0) + break; + + /* Wait and see. */ + ConditionVariableTimedSleep(&WalSummarizerCtl->summary_file_cv, + remaining_timeout, + WAIT_EVENT_WAL_SUMMARY_READY); + } + + return summarized_lsn; +} + +/* + * Get the latest LSN that is eligible to be summarized, and set *tli to the + * corresponding timeline. + */ +static XLogRecPtr +GetLatestLSN(TimeLineID *tli) +{ + if (!RecoveryInProgress()) + { + /* Don't summarize WAL before it's flushed. */ + return GetFlushRecPtr(tli); + } + else + { + XLogRecPtr flush_lsn; + TimeLineID flush_tli; + XLogRecPtr replay_lsn; + TimeLineID replay_tli; + + /* + * What we really want to know is how much WAL has been flushed to + * disk, but the only flush position available is the one provided by + * the walreceiver, which may not be running, because this could be + * crash recovery or recovery via restore_command. So use either the + * WAL receiver's flush position or the replay position, whichever is + * further ahead, on the theory that if the WAL has been replayed then + * it must also have been flushed to disk. + */ + flush_lsn = GetWalRcvFlushRecPtr(NULL, &flush_tli); + replay_lsn = GetXLogReplayRecPtr(&replay_tli); + if (flush_lsn > replay_lsn) + { + *tli = flush_tli; + return flush_lsn; + } + else + { + *tli = replay_tli; + return replay_lsn; + } + } +} + +/* + * Interrupt handler for main loop of WAL summarizer process. + */ +static void +HandleWalSummarizerInterrupts(void) +{ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (ShutdownRequestPending || !summarize_wal) + { + ereport(DEBUG1, + errmsg_internal("WAL summarizer shutting down")); + proc_exit(0); + } + + /* Perform logging of memory contexts of this process */ + if (LogMemoryContextPending) + ProcessLogMemoryContextInterrupt(); +} + +/* + * Summarize a range of WAL records on a single timeline. + * + * 'tli' is the timeline to be summarized. + * + * 'start_lsn' is the point at which we should start summarizing. If this + * value comes from the end LSN of the previous record as returned by the + * xlograder machinery, 'exact' should be true; otherwise, 'exact' should + * be false, and this function will search forward for the start of a valid + * WAL record. + * + * 'switch_lsn' is the point at which we should switch to a later timeline, + * if we're summarizing a historic timeline. + * + * 'maximum_lsn' identifies the point beyond which we can't count on being + * able to read any more WAL. It should be the switch point when reading a + * historic timeline, or the most-recently-measured end of WAL when reading + * the current timeline. + * + * The return value is the LSN at which the WAL summary actually ends. Most + * often, a summary file ends because we notice that a checkpoint has + * occurred and reach the redo pointer of that checkpoint, but sometimes + * we stop for other reasons, such as a timeline switch. + */ +static XLogRecPtr +SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, + XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn) +{ + SummarizerReadLocalXLogPrivate *private_data; + XLogReaderState *xlogreader; + XLogRecPtr summary_start_lsn; + XLogRecPtr summary_end_lsn = switch_lsn; + char temp_path[MAXPGPATH]; + char final_path[MAXPGPATH]; + WalSummaryIO io; + BlockRefTable *brtab = CreateEmptyBlockRefTable(); + + /* Initialize private data for xlogreader. */ + private_data = (SummarizerReadLocalXLogPrivate *) + palloc0(sizeof(SummarizerReadLocalXLogPrivate)); + private_data->tli = tli; + private_data->historic = !XLogRecPtrIsInvalid(switch_lsn); + private_data->read_upto = maximum_lsn; + + /* Create xlogreader. */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &summarizer_read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + private_data); + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + /* + * When exact = false, we're starting from an arbitrary point in the WAL + * and must search forward for the start of the next record. + * + * When exact = true, start_lsn should be either the LSN where a record + * begins, or the LSN of a page where the page header is immediately + * followed by the start of a new record. XLogBeginRead should tolerate + * either case. + * + * We need to allow for both cases because the behavior of xlogreader + * varies. When a record spans two or more xlog pages, the ending LSN + * reported by xlogreader will be the starting LSN of the following + * record, but when an xlog page boundary falls between two records, the + * end LSN for the first will be reported as the first byte of the + * following page. We can't know until we read that page how large the + * header will be, but we'll have to skip over it to find the next record. + */ + if (exact) + { + /* + * Even if start_lsn is the beginning of a page rather than the + * beginning of the first record on that page, we should still use it + * as the start LSN for the summary file. That's because we detect + * missing summary files by looking for cases where the end LSN of one + * file is less than the start LSN of the next file. When only a page + * header is skipped, nothing has been missed. + */ + XLogBeginRead(xlogreader, start_lsn); + summary_start_lsn = start_lsn; + } + else + { + summary_start_lsn = XLogFindNextRecord(xlogreader, start_lsn); + if (XLogRecPtrIsInvalid(summary_start_lsn)) + { + /* + * If we hit end-of-WAL while trying to find the next valid + * record, we must be on a historic timeline that has no valid + * records that begin after start_lsn and before end of WAL. + */ + if (private_data->end_of_wal) + { + ereport(DEBUG1, + errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X", + tli, + LSN_FORMAT_ARGS(start_lsn), + LSN_FORMAT_ARGS(private_data->read_upto))); + + /* + * The timeline ends at or after start_lsn, without containing + * any records. Thus, we must make sure the main loop does not + * iterate. If start_lsn is the end of the timeline, then we + * won't actually emit an empty summary file, but otherwise, + * we must, to capture the fact that the LSN range in question + * contains no interesting WAL records. + */ + summary_start_lsn = start_lsn; + summary_end_lsn = private_data->read_upto; + switch_lsn = xlogreader->EndRecPtr; + } + else + ereport(ERROR, + (errmsg("could not find a valid record after %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + } + + /* We shouldn't go backward. */ + Assert(summary_start_lsn >= start_lsn); + } + + /* + * Main loop: read xlog records one by one. + */ + while (1) + { + int block_id; + char *errormsg; + XLogRecord *record; + bool stop_requested = false; + + HandleWalSummarizerInterrupts(); + + /* We shouldn't go backward. */ + Assert(summary_start_lsn <= xlogreader->EndRecPtr); + + /* Now read the next record. */ + record = XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + { + if (private_data->end_of_wal) + { + /* + * This timeline must be historic and must end before we were + * able to read a complete record. + */ + ereport(DEBUG1, + errmsg_internal("could not read WAL from timeline %d at %X/%X: end of WAL at %X/%X", + tli, + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + LSN_FORMAT_ARGS(private_data->read_upto))); + /* Summary ends at end of WAL. */ + summary_end_lsn = private_data->read_upto; + break; + } + if (errormsg) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL from timeline %u at %X/%X: %s", + tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + errormsg))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL from timeline %u at %X/%X", + tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr)))); + } + + /* We shouldn't go backward. */ + Assert(summary_start_lsn <= xlogreader->EndRecPtr); + + if (!XLogRecPtrIsInvalid(switch_lsn) && + xlogreader->ReadRecPtr >= switch_lsn) + { + /* + * Woops! We've read a record that *starts* after the switch LSN, + * contrary to our goal of reading only until we hit the first + * record that ends at or after the switch LSN. Pretend we didn't + * read it after all by bailing out of this loop right here, + * before we do anything with this record. + * + * This can happen because the last record before the switch LSN + * might be continued across multiple pages, and then we might + * come to a page with XLP_FIRST_IS_OVERWRITE_CONTRECORD set. In + * that case, the record that was continued across multiple pages + * is incomplete and will be disregarded, and the read will + * restart from the beginning of the page that is flagged + * XLP_FIRST_IS_OVERWRITE_CONTRECORD. + * + * If this case occurs, we can fairly say that the current summary + * file ends at the switch LSN exactly. The first record on the + * page marked XLP_FIRST_IS_OVERWRITE_CONTRECORD will be + * discovered when generating the next summary file. + */ + summary_end_lsn = switch_lsn; + break; + } + + /* Special handling for particular types of WAL records. */ + switch (XLogRecGetRmid(xlogreader)) + { + case RM_SMGR_ID: + SummarizeSmgrRecord(xlogreader, brtab); + break; + case RM_XACT_ID: + SummarizeXactRecord(xlogreader, brtab); + break; + case RM_XLOG_ID: + stop_requested = SummarizeXlogRecord(xlogreader); + break; + default: + break; + } + + /* + * If we've been told that it's time to end this WAL summary file, do + * so. As an exception, if there's nothing included in this WAL + * summary file yet, then stopping doesn't make any sense, and we + * should wait until the next stop point instead. + */ + if (stop_requested && xlogreader->ReadRecPtr > summary_start_lsn) + { + summary_end_lsn = xlogreader->ReadRecPtr; + break; + } + + /* Feed block references from xlog record to block reference table. */ + for (block_id = 0; block_id <= XLogRecMaxBlockId(xlogreader); + block_id++) + { + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber blocknum; + + if (!XLogRecGetBlockTagExtended(xlogreader, block_id, &rlocator, + &forknum, &blocknum, NULL)) + continue; + + /* + * As we do elsewhere, ignore the FSM fork, because it's not fully + * WAL-logged. + */ + if (forknum != FSM_FORKNUM) + BlockRefTableMarkBlockModified(brtab, &rlocator, forknum, + blocknum); + } + + /* Update our notion of where this summary file ends. */ + summary_end_lsn = xlogreader->EndRecPtr; + + /* Also update shared memory. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + Assert(summary_end_lsn >= WalSummarizerCtl->pending_lsn); + Assert(summary_end_lsn >= WalSummarizerCtl->summarized_lsn); + WalSummarizerCtl->pending_lsn = summary_end_lsn; + LWLockRelease(WALSummarizerLock); + + /* + * If we have a switch LSN and have reached it, stop before reading + * the next record. + */ + if (!XLogRecPtrIsInvalid(switch_lsn) && + xlogreader->EndRecPtr >= switch_lsn) + break; + } + + /* Destroy xlogreader. */ + pfree(xlogreader->private_data); + XLogReaderFree(xlogreader); + + /* + * If a timeline switch occurs, we may fail to make any progress at all + * before exiting the loop above. If that happens, we don't write a WAL + * summary file at all. + */ + if (summary_end_lsn > summary_start_lsn) + { + /* Generate temporary and final path name. */ + snprintf(temp_path, MAXPGPATH, + XLOGDIR "/summaries/temp.summary"); + snprintf(final_path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + tli, + LSN_FORMAT_ARGS(summary_start_lsn), + LSN_FORMAT_ARGS(summary_end_lsn)); + + /* Open the temporary file for writing. */ + io.filepos = 0; + io.file = PathNameOpenFile(temp_path, O_WRONLY | O_CREAT | O_TRUNC); + if (io.file < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", temp_path))); + + /* Write the data. */ + WriteBlockRefTable(brtab, WriteWalSummary, &io); + + /* Close temporary file and shut down xlogreader. */ + FileClose(io.file); + + /* Tell the user what we did. */ + ereport(DEBUG1, + errmsg("summarized WAL on TLI %d from %X/%X to %X/%X", + tli, + LSN_FORMAT_ARGS(summary_start_lsn), + LSN_FORMAT_ARGS(summary_end_lsn))); + + /* Durably rename the new summary into place. */ + durable_rename(temp_path, final_path, ERROR); + } + + return summary_end_lsn; +} + +/* + * Special handling for WAL records with RM_SMGR_ID. + */ +static void +SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + if (info == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec; + + /* + * If a new relation fork is created on disk, there is no point + * tracking anything about which blocks have been modified, because + * the whole thing will be new. Hence, set the limit block for this + * fork to 0. + * + * Ignore the FSM fork, which is not fully WAL-logged. + */ + xlrec = (xl_smgr_create *) XLogRecGetData(xlogreader); + + if (xlrec->forkNum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + xlrec->forkNum, 0); + } + else if (info == XLOG_SMGR_TRUNCATE) + { + xl_smgr_truncate *xlrec; + + xlrec = (xl_smgr_truncate *) XLogRecGetData(xlogreader); + + /* + * If a relation fork is truncated on disk, there is no point in + * tracking anything about block modifications beyond the truncation + * point. + * + * We ignore SMGR_TRUNCATE_FSM here because the FSM isn't fully + * WAL-logged and thus we can't track modified blocks for it anyway. + */ + if ((xlrec->flags & SMGR_TRUNCATE_HEAP) != 0) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + MAIN_FORKNUM, xlrec->blkno); + if ((xlrec->flags & SMGR_TRUNCATE_VM) != 0) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + VISIBILITYMAP_FORKNUM, xlrec->blkno); + } +} + +/* + * Special handling for WAL recods with RM_XACT_ID. + */ +static void +SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + uint8 xact_info = info & XLOG_XACT_OPMASK; + + if (xact_info == XLOG_XACT_COMMIT || + xact_info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader); + xl_xact_parsed_commit parsed; + int i; + + /* + * Don't track modified blocks for any relations that were removed on + * commit. + */ + ParseCommitRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed); + for (i = 0; i < parsed.nrels; ++i) + { + ForkNumber forknum; + + for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum) + if (forknum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i], + forknum, 0); + } + } + else if (xact_info == XLOG_XACT_ABORT || + xact_info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader); + xl_xact_parsed_abort parsed; + int i; + + /* + * Don't track modified blocks for any relations that were removed on + * abort. + */ + ParseAbortRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed); + for (i = 0; i < parsed.nrels; ++i) + { + ForkNumber forknum; + + for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum) + if (forknum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i], + forknum, 0); + } + } +} + +/* + * Special handling for WAL recods with RM_XLOG_ID. + */ +static bool +SummarizeXlogRecord(XLogReaderState *xlogreader) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + if (info == XLOG_CHECKPOINT_REDO || info == XLOG_CHECKPOINT_SHUTDOWN) + { + /* + * This is an LSN at which redo might begin, so we'd like + * summarization to stop just before this WAL record. + */ + return true; + } + + return false; +} + +/* + * Similar to read_local_xlog_page, but limited to read from one particular + * timeline. If the end of WAL is reached, it will wait for more if reading + * from the current timeline, or give up if reading from a historic timeline. + * In the latter case, it will also set private_data->end_of_wal = true. + * + * Caller must set private_data->tli to the TLI of interest, + * private_data->read_upto to the lowest LSN that is not known to be safe + * to read on that timeline, and private_data->historic to true if and only + * if the timeline is not the current timeline. This function will update + * private_data->read_upto and private_data->historic if more WAL appears + * on the current timeline or if the current timeline becomes historic. + */ +static int +summarizer_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *cur_page) +{ + int count; + WALReadError errinfo; + SummarizerReadLocalXLogPrivate *private_data; + + HandleWalSummarizerInterrupts(); + + private_data = (SummarizerReadLocalXLogPrivate *) + state->private_data; + + while (1) + { + if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto) + { + /* + * more than one block available; read only that block, have + * caller come back if they need more. + */ + count = XLOG_BLCKSZ; + break; + } + else if (targetPagePtr + reqLen > private_data->read_upto) + { + /* We don't seem to have enough data. */ + if (private_data->historic) + { + /* + * This is a historic timeline, so there will never be any + * more data than we have currently. + */ + private_data->end_of_wal = true; + return -1; + } + else + { + XLogRecPtr latest_lsn; + TimeLineID latest_tli; + + /* + * This is - or at least was up until very recently - the + * current timeline, so more data might show up. Delay here + * so we don't tight-loop. + */ + HandleWalSummarizerInterrupts(); + summarizer_wait_for_wal(); + + /* Recheck end-of-WAL. */ + latest_lsn = GetLatestLSN(&latest_tli); + if (private_data->tli == latest_tli) + { + /* Still the current timeline, update max LSN. */ + Assert(latest_lsn >= private_data->read_upto); + private_data->read_upto = latest_lsn; + } + else + { + List *tles = readTimeLineHistory(latest_tli); + XLogRecPtr switchpoint; + + /* + * The timeline we're scanning is no longer the latest + * one. Figure out when it ended. + */ + private_data->historic = true; + switchpoint = tliSwitchPoint(private_data->tli, tles, + NULL); + + /* + * Allow reads up to exactly the switch point. + * + * It's possible that this will cause read_upto to move + * backwards, because walreceiver might have read a + * partial record and flushed it to disk, and we'd view + * that data as safe to read. However, the + * XLOG_END_OF_RECOVERY record will be written at the end + * of the last complete WAL record, not at the end of the + * WAL that we've flushed to disk. + * + * So switchpoint < private->read_upto is possible here, + * but switchpoint < state->EndRecPtr should not be. + */ + Assert(switchpoint >= state->EndRecPtr); + private_data->read_upto = switchpoint; + + /* Debugging output. */ + ereport(DEBUG1, + errmsg("timeline %u became historic, can read up to %X/%X", + private_data->tli, LSN_FORMAT_ARGS(private_data->read_upto))); + } + + /* Go around and try again. */ + } + } + else + { + /* enough bytes available to satisfy the request */ + count = private_data->read_upto - targetPagePtr; + break; + } + } + + /* + * Even though we just determined how much of the page can be validly read + * as 'count', read the whole page anyway. It's guaranteed to be + * zero-padded up to the page boundary if it's incomplete. + */ + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, + private_data->tli, &errinfo)) + WALReadRaiseError(&errinfo); + + /* Track that we read a page, for sleep time calculation. */ + ++pages_read_since_last_sleep; + + /* number of valid bytes in the buffer */ + return count; +} + +/* + * Sleep for long enough that we believe it's likely that more WAL will + * be available afterwards. + */ +static void +summarizer_wait_for_wal(void) +{ + if (pages_read_since_last_sleep == 0) + { + /* + * No pages were read since the last sleep, so double the sleep time, + * but not beyond the maximum allowable value. + */ + sleep_quanta = Min(sleep_quanta * 2, MAX_SLEEP_QUANTA); + } + else if (pages_read_since_last_sleep > 1) + { + /* + * Multiple pages were read since the last sleep, so reduce the sleep + * time. + * + * A large burst of activity should be able to quickly reduce the + * sleep time to the minimum, but we don't want a handful of extra WAL + * records to provoke a strong reaction. We choose to reduce the sleep + * time by 1 quantum for each page read beyond the first, which is a + * fairly arbitrary way of trying to be reactive without + * overrreacting. + */ + if (pages_read_since_last_sleep > sleep_quanta - 1) + sleep_quanta = 1; + else + sleep_quanta -= pages_read_since_last_sleep; + } + + /* OK, now sleep. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_quanta * MS_PER_SLEEP_QUANTUM, + WAIT_EVENT_WAL_SUMMARIZER_WAL); + ResetLatch(MyLatch); + + /* Reset count of pages read. */ + pages_read_since_last_sleep = 0; +} + +/* + * Most recent RedoRecPtr value observed by RemoveOldWalSummaries. + */ +static void +MaybeRemoveOldWalSummaries(void) +{ + XLogRecPtr redo_pointer = GetRedoRecPtr(); + List *wslist; + time_t cutoff_time; + + /* If WAL summary removal is disabled, don't do anything. */ + if (wal_summary_keep_time == 0) + return; + + /* + * If the redo pointer has not advanced, don't do anything. + * + * This has the effect that we only try to remove old WAL summary files + * once per checkpoint cycle. + */ + if (redo_pointer == redo_pointer_at_last_summary_removal) + return; + redo_pointer_at_last_summary_removal = redo_pointer; + + /* + * Files should only be removed if the last modification time precedes the + * cutoff time we compute here. + */ + cutoff_time = time(NULL) - 60 * wal_summary_keep_time; + + /* Get all the summaries that currently exist. */ + wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr); + + /* Loop until all summaries have been considered for removal. */ + while (wslist != NIL) + { + ListCell *lc; + XLogSegNo oldest_segno; + XLogRecPtr oldest_lsn = InvalidXLogRecPtr; + TimeLineID selected_tli; + + HandleWalSummarizerInterrupts(); + + /* + * Pick a timeline for which some summary files still exist on disk, + * and find the oldest LSN that still exists on disk for that + * timeline. + */ + selected_tli = ((WalSummaryFile *) linitial(wslist))->tli; + oldest_segno = XLogGetOldestSegno(selected_tli); + if (oldest_segno != 0) + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, + oldest_lsn); + + + /* Consider each WAL file on the selected timeline in turn. */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + HandleWalSummarizerInterrupts(); + + /* If it's not on this timeline, it's not time to consider it. */ + if (selected_tli != ws->tli) + continue; + + /* + * If the WAL doesn't exist any more, we can remove it if the file + * modification time is old enough. + */ + if (XLogRecPtrIsInvalid(oldest_lsn) || ws->end_lsn <= oldest_lsn) + RemoveWalSummaryIfOlderThan(ws, cutoff_time); + + /* + * Whether we removed the file or not, we need not consider it + * again. + */ + wslist = foreach_delete_current(wslist, lc); + pfree(ws); + } + } +} diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906cea..d621f5507f3 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +WALSummarizerLock 49 diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index d99ecdd4d8e..0dd9b98b3e4 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -306,7 +306,8 @@ pgstat_io_snapshot_cb(void) * - Syslogger because it is not connected to shared memory * - Archiver because most relevant archiving IO is delegated to a * specialized command or module -* - WAL Receiver and WAL Writer IO is not tracked in pg_stat_io for now +* - WAL Receiver, WAL Writer, and WAL Summarizer IO are not tracked in +* pg_stat_io for now * * Function returns true if BackendType participates in the cumulative stats * subsystem for IO and false if it does not. @@ -328,6 +329,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_LOGGER: case B_WAL_RECEIVER: case B_WAL_WRITER: + case B_WAL_SUMMARIZER: return false; case B_AUTOVAC_LAUNCHER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d7995931bd4..7e791634666 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -56,6 +56,7 @@ RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." +WAL_SUMMARIZER_WAL "Waiting in WAL summarizer for more WAL to be generated." WAL_WRITER_MAIN "Waiting in main loop of WAL writer process." @@ -142,6 +143,7 @@ SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFER SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." +WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at end of a parallel operation." @@ -162,6 +164,7 @@ REGISTER_SYNC_REQUEST "Waiting while sending synchronization requests to the che SPIN_DELAY "Waiting while acquiring a contended spinlock." VACUUM_DELAY "Waiting in a cost-based vacuum delay point." VACUUM_TRUNCATE "Waiting to acquire an exclusive lock to truncate off any empty pages at the end of a table vacuumed." +WAL_SUMMARIZER_ERROR "Waiting after a WAL summarizer error." # @@ -243,6 +246,8 @@ WAL_COPY_WRITE "Waiting for a write when creating a new WAL segment by copying a WAL_INIT_SYNC "Waiting for a newly initialized WAL file to reach durable storage." WAL_INIT_WRITE "Waiting for a write while initializing a new WAL file." WAL_READ "Waiting for a read from a WAL file." +WAL_SUMMARY_READ "Waiting for a read from a WAL summary file." +WAL_SUMMARY_WRITE "Waiting for a write to a WAL summary file." WAL_SYNC "Waiting for a WAL file to reach durable storage." WAL_SYNC_METHOD_ASSIGN "Waiting for data to reach durable storage while assigning a new WAL sync method." WAL_WRITE "Waiting for a write to a WAL file." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 819936ec024..5c9b6f991e0 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -305,6 +305,9 @@ GetBackendTypeDesc(BackendType backendType) case B_WAL_SENDER: backendDesc = "walsender"; break; + case B_WAL_SUMMARIZER: + backendDesc = "walsummarizer"; + break; case B_WAL_WRITER: backendDesc = "walwriter"; break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f7c9882f7c5..9f59440526f 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -63,6 +63,7 @@ #include "postmaster/postmaster.h" #include "postmaster/startup.h" #include "postmaster/syslogger.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" #include "replication/slot.h" @@ -703,6 +704,8 @@ const char *const config_group_names[] = gettext_noop("Write-Ahead Log / Archive Recovery"), /* WAL_RECOVERY_TARGET */ gettext_noop("Write-Ahead Log / Recovery Target"), + /* WAL_SUMMARIZATION */ + gettext_noop("Write-Ahead Log / Summarization"), /* REPLICATION_SENDING */ gettext_noop("Replication / Sending Servers"), /* REPLICATION_PRIMARY */ @@ -1787,6 +1790,16 @@ struct config_bool ConfigureNamesBool[] = }, { + {"summarize_wal", PGC_SIGHUP, WAL_SUMMARIZATION, + gettext_noop("Starts the WAL summarizer process to enable incremental backup."), + NULL + }, + &summarize_wal, + false, + NULL, NULL, NULL + }, + + { {"hot_standby", PGC_POSTMASTER, REPLICATION_STANDBY, gettext_noop("Allows connections and queries during recovery."), NULL @@ -3201,6 +3214,19 @@ struct config_int ConfigureNamesInt[] = }, { + {"wal_summary_keep_time", PGC_SIGHUP, WAL_SUMMARIZATION, + gettext_noop("Time for which WAL summary files should be kept."), + NULL, + GUC_UNIT_MIN, + }, + &wal_summary_keep_time, + 10 * 24 * 60, /* 10 days */ + 0, + INT_MAX, + NULL, NULL, NULL + }, + + { {"autovacuum_naptime", PGC_SIGHUP, AUTOVACUUM, gettext_noop("Time to sleep between autovacuum runs."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index cf9f283cfee..b2809c711a1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -302,6 +302,11 @@ #recovery_target_action = 'pause' # 'pause', 'promote', 'shutdown' # (change requires restart) +# - WAL Summarization - + +#summarize_wal = off # run WAL summarizer process? +#wal_summary_keep_time = '10d' # when to remove old summary files, 0 = never + #------------------------------------------------------------------------------ # REPLICATION diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 0c6f5ceb0ac..e68b40d2b59 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -227,6 +227,7 @@ static char *extra_options = ""; static const char *const subdirs[] = { "global", "pg_wal/archive_status", + "pg_wal/summaries", "pg_commit_ts", "pg_dynshmem", "pg_notify", diff --git a/src/common/Makefile b/src/common/Makefile index 3f9067e0a2f..2ba5069dca4 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -47,6 +47,7 @@ OBJS_COMMON = \ archive.o \ base64.o \ binaryheap.o \ + blkreftable.o \ checksum_helper.o \ compression.o \ config_info.o \ diff --git a/src/common/blkreftable.c b/src/common/blkreftable.c new file mode 100644 index 00000000000..21ee6f5968f --- /dev/null +++ b/src/common/blkreftable.c @@ -0,0 +1,1308 @@ +/*------------------------------------------------------------------------- + * + * blkreftable.c + * Block reference tables. + * + * A block reference table is used to keep track of which blocks have + * been modified by WAL records within a certain LSN range. + * + * For each relation fork, we keep track of all blocks that have appeared + * in block reference in the WAL. We also keep track of the "limit block", + * which is the smallest relation length in blocks known to have occurred + * during that range of WAL records. This should be set to 0 if the relation + * fork is created or destroyed, and to the post-truncation length if + * truncated. + * + * Whenever we set the limit block, we also forget about any modified blocks + * beyond that point. Those blocks don't exist any more. Such blocks can + * later be marked as modified again; if that happens, it means the relation + * was re-extended. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/common/blkreftable.c + * + *------------------------------------------------------------------------- + */ + + +#ifndef FRONTEND +#include "postgres.h" +#else +#include "postgres_fe.h" +#endif + +#ifdef FRONTEND +#include "common/logging.h" +#endif + +#include "common/blkreftable.h" +#include "common/hashfn.h" +#include "port/pg_crc32c.h" + +/* + * A block reference table keeps track of the status of each relation + * fork individually. + */ +typedef struct BlockRefTableKey +{ + RelFileLocator rlocator; + ForkNumber forknum; +} BlockRefTableKey; + +/* + * We could need to store data either for a relation in which only a + * tiny fraction of the blocks have been modified or for a relation in + * which nearly every block has been modified, and we want a + * space-efficient representation in both cases. To accomplish this, + * we divide the relation into chunks of 2^16 blocks and choose between + * an array representation and a bitmap representation for each chunk. + * + * When the number of modified blocks in a given chunk is small, we + * essentially store an array of block numbers, but we need not store the + * entire block number: instead, we store each block number as a 2-byte + * offset from the start of the chunk. + * + * When the number of modified blocks in a given chunk is large, we switch + * to a bitmap representation. + * + * These same basic representational choices are used both when a block + * reference table is stored in memory and when it is serialized to disk. + * + * In the in-memory representation, we initially allocate each chunk with + * space for a number of entries given by INITIAL_ENTRIES_PER_CHUNK and + * increase that as necessary until we reach MAX_ENTRIES_PER_CHUNK. + * Any chunk whose allocated size reaches MAX_ENTRIES_PER_CHUNK is converted + * to a bitmap, and thus never needs to grow further. + */ +#define BLOCKS_PER_CHUNK (1 << 16) +#define BLOCKS_PER_ENTRY (BITS_PER_BYTE * sizeof(uint16)) +#define MAX_ENTRIES_PER_CHUNK (BLOCKS_PER_CHUNK / BLOCKS_PER_ENTRY) +#define INITIAL_ENTRIES_PER_CHUNK 16 +typedef uint16 *BlockRefTableChunk; + +/* + * State for one relation fork. + * + * 'rlocator' and 'forknum' identify the relation fork to which this entry + * pertains. + * + * 'limit_block' is the shortest known length of the relation in blocks + * within the LSN range covered by a particular block reference table. + * It should be set to 0 if the relation fork is created or dropped. If the + * relation fork is truncated, it should be set to the number of blocks that + * remain after truncation. + * + * 'nchunks' is the allocated length of each of the three arrays that follow. + * We can only represent the status of block numbers less than nchunks * + * BLOCKS_PER_CHUNK. + * + * 'chunk_size' is an array storing the allocated size of each chunk. + * + * 'chunk_usage' is an array storing the number of elements used in each + * chunk. If that value is less than MAX_ENTRIES_PER_CHUNK, the corresonding + * chunk is used as an array; else the corresponding chunk is used as a bitmap. + * When used as a bitmap, the least significant bit of the first array element + * is the status of the lowest-numbered block covered by this chunk. + * + * 'chunk_data' is the array of chunks. + */ +struct BlockRefTableEntry +{ + BlockRefTableKey key; + BlockNumber limit_block; + char status; + uint32 nchunks; + uint16 *chunk_size; + uint16 *chunk_usage; + BlockRefTableChunk *chunk_data; +}; + +/* Declare and define a hash table over type BlockRefTableEntry. */ +#define SH_PREFIX blockreftable +#define SH_ELEMENT_TYPE BlockRefTableEntry +#define SH_KEY_TYPE BlockRefTableKey +#define SH_KEY key +#define SH_HASH_KEY(tb, key) \ + hash_bytes((const unsigned char *) &key, sizeof(BlockRefTableKey)) +#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(BlockRefTableKey)) == 0) +#define SH_SCOPE static inline +#ifdef FRONTEND +#define SH_RAW_ALLOCATOR pg_malloc0 +#endif +#define SH_DEFINE +#define SH_DECLARE +#include "lib/simplehash.h" + +/* + * A block reference table is basically just the hash table, but we don't + * want to expose that to outside callers. + * + * We keep track of the memory context in use explicitly too, so that it's + * easy to place all of our allocations in the same context. + */ +struct BlockRefTable +{ + blockreftable_hash *hash; +#ifndef FRONTEND + MemoryContext mcxt; +#endif +}; + +/* + * On-disk serialization format for block reference table entries. + */ +typedef struct BlockRefTableSerializedEntry +{ + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber limit_block; + uint32 nchunks; +} BlockRefTableSerializedEntry; + +/* + * Buffer size, so that we avoid doing many small I/Os. + */ +#define BUFSIZE 65536 + +/* + * Ad-hoc buffer for file I/O. + */ +typedef struct BlockRefTableBuffer +{ + io_callback_fn io_callback; + void *io_callback_arg; + char data[BUFSIZE]; + int used; + int cursor; + pg_crc32c crc; +} BlockRefTableBuffer; + +/* + * State for keeping track of progress while incrementally reading a block + * table reference file from disk. + * + * total_chunks means the number of chunks for the RelFileLocator/ForkNumber + * combination that is curently being read, and consumed_chunks is the number + * of those that have been read. (We always read all the information for + * a single chunk at one time, so we don't need to be able to represent the + * state where a chunk has been partially read.) + * + * chunk_size is the array of chunk sizes. The length is given by total_chunks. + * + * chunk_data holds the current chunk. + * + * chunk_position helps us figure out how much progress we've made in returning + * the block numbers for the current chunk to the caller. If the chunk is a + * bitmap, it's the number of bits we've scanned; otherwise, it's the number + * of chunk entries we've scanned. + */ +struct BlockRefTableReader +{ + BlockRefTableBuffer buffer; + char *error_filename; + report_error_fn error_callback; + void *error_callback_arg; + uint32 total_chunks; + uint32 consumed_chunks; + uint16 *chunk_size; + uint16 chunk_data[MAX_ENTRIES_PER_CHUNK]; + uint32 chunk_position; +}; + +/* + * State for keeping track of progress while incrementally writing a block + * reference table file to disk. + */ +struct BlockRefTableWriter +{ + BlockRefTableBuffer buffer; +}; + +/* Function prototypes. */ +static int BlockRefTableComparator(const void *a, const void *b); +static void BlockRefTableFlush(BlockRefTableBuffer *buffer); +static void BlockRefTableRead(BlockRefTableReader *reader, void *data, + int length); +static void BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data, + int length); +static void BlockRefTableFileTerminate(BlockRefTableBuffer *buffer); + +/* + * Create an empty block reference table. + */ +BlockRefTable * +CreateEmptyBlockRefTable(void) +{ + BlockRefTable *brtab = palloc(sizeof(BlockRefTable)); + + /* + * Even completely empty database has a few hundred relation forks, so it + * seems best to size the hash on the assumption that we're going to have + * at least a few thousand entries. + */ +#ifdef FRONTEND + brtab->hash = blockreftable_create(4096, NULL); +#else + brtab->mcxt = CurrentMemoryContext; + brtab->hash = blockreftable_create(brtab->mcxt, 4096, NULL); +#endif + + return brtab; +} + +/* + * Set the "limit block" for a relation fork and forget any modified blocks + * with equal or higher block numbers. + * + * The "limit block" is the shortest known length of the relation within the + * range of WAL records covered by this block reference table. + */ +void +BlockRefTableSetLimitBlock(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber limit_block) +{ + BlockRefTableEntry *brtentry; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + bool found; + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + brtentry = blockreftable_insert(brtab->hash, key, &found); + + if (!found) + { + /* + * We have no existing data about this relation fork, so just record + * the limit_block value supplied by the caller, and make sure other + * parts of the entry are properly initialized. + */ + brtentry->limit_block = limit_block; + brtentry->nchunks = 0; + brtentry->chunk_size = NULL; + brtentry->chunk_usage = NULL; + brtentry->chunk_data = NULL; + return; + } + + BlockRefTableEntrySetLimitBlock(brtentry, limit_block); +} + +/* + * Mark a block in a given relation fork as known to have been modified. + */ +void +BlockRefTableMarkBlockModified(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber blknum) +{ + BlockRefTableEntry *brtentry; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + bool found; +#ifndef FRONTEND + MemoryContext oldcontext = MemoryContextSwitchTo(brtab->mcxt); +#endif + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + brtentry = blockreftable_insert(brtab->hash, key, &found); + + if (!found) + { + /* + * We want to set the initial limit block value to something higher + * than any legal block number. InvalidBlockNumber fits the bill. + */ + brtentry->limit_block = InvalidBlockNumber; + brtentry->nchunks = 0; + brtentry->chunk_size = NULL; + brtentry->chunk_usage = NULL; + brtentry->chunk_data = NULL; + } + + BlockRefTableEntryMarkBlockModified(brtentry, forknum, blknum); + +#ifndef FRONTEND + MemoryContextSwitchTo(oldcontext); +#endif +} + +/* + * Get an entry from a block reference table. + * + * If the entry does not exist, this function returns NULL. Otherwise, it + * returns the entry and sets *limit_block to the value from the entry. + */ +BlockRefTableEntry * +BlockRefTableGetEntry(BlockRefTable *brtab, const RelFileLocator *rlocator, + ForkNumber forknum, BlockNumber *limit_block) +{ + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + BlockRefTableEntry *entry; + + Assert(limit_block != NULL); + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + entry = blockreftable_lookup(brtab->hash, key); + + if (entry != NULL) + *limit_block = entry->limit_block; + + return entry; +} + +/* + * Get block numbers from a table entry. + * + * 'blocks' must point to enough space to hold at least 'nblocks' block + * numbers, and any block numbers we manage to get will be written there. + * The return value is the number of block numbers actually written. + * + * We do not return block numbers unless they are greater than or equal to + * start_blkno and strictly less than stop_blkno. + */ +int +BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry, + BlockNumber start_blkno, + BlockNumber stop_blkno, + BlockNumber *blocks, + int nblocks) +{ + uint32 start_chunkno; + uint32 stop_chunkno; + uint32 chunkno; + int nresults = 0; + + Assert(entry != NULL); + + /* + * Figure out which chunks could potentially contain blocks of interest. + * + * We need to be careful about overflow here, because stop_blkno could be + * InvalidBlockNumber or something very close to it. + */ + start_chunkno = start_blkno / BLOCKS_PER_CHUNK; + stop_chunkno = stop_blkno / BLOCKS_PER_CHUNK; + if ((stop_blkno % BLOCKS_PER_CHUNK) != 0) + ++stop_chunkno; + if (stop_chunkno > entry->nchunks) + stop_chunkno = entry->nchunks; + + /* + * Loop over chunks. + */ + for (chunkno = start_chunkno; chunkno < stop_chunkno; ++chunkno) + { + uint16 chunk_usage = entry->chunk_usage[chunkno]; + BlockRefTableChunk chunk_data = entry->chunk_data[chunkno]; + unsigned start_offset = 0; + unsigned stop_offset = BLOCKS_PER_CHUNK; + + /* + * If the start and/or stop block number falls within this chunk, the + * whole chunk may not be of interest. Figure out which portion we + * care about, if it's not the whole thing. + */ + if (chunkno == start_chunkno) + start_offset = start_blkno % BLOCKS_PER_CHUNK; + if (chunkno == stop_chunkno - 1) + stop_offset = stop_blkno % BLOCKS_PER_CHUNK; + + /* + * Handling differs depending on whether this is an array of offsets + * or a bitmap. + */ + if (chunk_usage == MAX_ENTRIES_PER_CHUNK) + { + unsigned i; + + /* It's a bitmap, so test every relevant bit. */ + for (i = start_offset; i < stop_offset; ++i) + { + uint16 w = chunk_data[i / BLOCKS_PER_ENTRY]; + + if ((w & (1 << (i % BLOCKS_PER_ENTRY))) != 0) + { + BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + i; + + blocks[nresults++] = blkno; + + /* Early exit if we run out of output space. */ + if (nresults == nblocks) + return nresults; + } + } + } + else + { + unsigned i; + + /* It's an array of offsets, so check each one. */ + for (i = 0; i < chunk_usage; ++i) + { + uint16 offset = chunk_data[i]; + + if (offset >= start_offset && offset < stop_offset) + { + BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + offset; + + blocks[nresults++] = blkno; + + /* Early exit if we run out of output space. */ + if (nresults == nblocks) + return nresults; + } + } + } + } + + return nresults; +} + +/* + * Serialize a block reference table to a file. + */ +void +WriteBlockRefTable(BlockRefTable *brtab, + io_callback_fn write_callback, + void *write_callback_arg) +{ + BlockRefTableSerializedEntry *sdata = NULL; + BlockRefTableBuffer buffer; + uint32 magic = BLOCKREFTABLE_MAGIC; + + /* Prepare buffer. */ + memset(&buffer, 0, sizeof(BlockRefTableBuffer)); + buffer.io_callback = write_callback; + buffer.io_callback_arg = write_callback_arg; + INIT_CRC32C(buffer.crc); + + /* Write magic number. */ + BlockRefTableWrite(&buffer, &magic, sizeof(uint32)); + + /* Write the entries, assuming there are some. */ + if (brtab->hash->members > 0) + { + unsigned i = 0; + blockreftable_iterator it; + BlockRefTableEntry *brtentry; + + /* Extract entries into serializable format and sort them. */ + sdata = + palloc(brtab->hash->members * sizeof(BlockRefTableSerializedEntry)); + blockreftable_start_iterate(brtab->hash, &it); + while ((brtentry = blockreftable_iterate(brtab->hash, &it)) != NULL) + { + BlockRefTableSerializedEntry *sentry = &sdata[i++]; + + sentry->rlocator = brtentry->key.rlocator; + sentry->forknum = brtentry->key.forknum; + sentry->limit_block = brtentry->limit_block; + sentry->nchunks = brtentry->nchunks; + + /* trim trailing zero entries */ + while (sentry->nchunks > 0 && + brtentry->chunk_usage[sentry->nchunks - 1] == 0) + sentry->nchunks--; + } + Assert(i == brtab->hash->members); + qsort(sdata, i, sizeof(BlockRefTableSerializedEntry), + BlockRefTableComparator); + + /* Loop over entries in sorted order and serialize each one. */ + for (i = 0; i < brtab->hash->members; ++i) + { + BlockRefTableSerializedEntry *sentry = &sdata[i]; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + unsigned j; + + /* Write the serialized entry itself. */ + BlockRefTableWrite(&buffer, sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* Look up the original entry so we can access the chunks. */ + memcpy(&key.rlocator, &sentry->rlocator, sizeof(RelFileLocator)); + key.forknum = sentry->forknum; + brtentry = blockreftable_lookup(brtab->hash, key); + Assert(brtentry != NULL); + + /* Write the untruncated portion of the chunk length array. */ + if (sentry->nchunks != 0) + BlockRefTableWrite(&buffer, brtentry->chunk_usage, + sentry->nchunks * sizeof(uint16)); + + /* Write the contents of each chunk. */ + for (j = 0; j < brtentry->nchunks; ++j) + { + if (brtentry->chunk_usage[j] == 0) + continue; + BlockRefTableWrite(&buffer, brtentry->chunk_data[j], + brtentry->chunk_usage[j] * sizeof(uint16)); + } + } + } + + /* Write out appropriate terminator and CRC and flush buffer. */ + BlockRefTableFileTerminate(&buffer); +} + +/* + * Prepare to incrementally read a block reference table file. + * + * 'read_callback' is a function that can be called to read data from the + * underlying file (or other data source) into our internal buffer. + * + * 'read_callback_arg' is an opaque argument to be passed to read_callback. + * + * 'error_filename' is the filename that should be included in error messages + * if the file is found to be malformed. The value is not copied, so the + * caller should ensure that it remains valid until done with this + * BlockRefTableReader. + * + * 'error_callback' is a function to be called if the file is found to be + * malformed. This is not used for I/O errors, which must be handled internally + * by read_callback. + * + * 'error_callback_arg' is an opaque arguent to be passed to error_callback. + */ +BlockRefTableReader * +CreateBlockRefTableReader(io_callback_fn read_callback, + void *read_callback_arg, + char *error_filename, + report_error_fn error_callback, + void *error_callback_arg) +{ + BlockRefTableReader *reader; + uint32 magic; + + /* Initialize data structure. */ + reader = palloc0(sizeof(BlockRefTableReader)); + reader->buffer.io_callback = read_callback; + reader->buffer.io_callback_arg = read_callback_arg; + reader->error_filename = error_filename; + reader->error_callback = error_callback; + reader->error_callback_arg = error_callback_arg; + INIT_CRC32C(reader->buffer.crc); + + /* Verify magic number. */ + BlockRefTableRead(reader, &magic, sizeof(uint32)); + if (magic != BLOCKREFTABLE_MAGIC) + error_callback(error_callback_arg, + "file \"%s\" has wrong magic number: expected %u, found %u", + error_filename, + BLOCKREFTABLE_MAGIC, magic); + + return reader; +} + +/* + * Read next relation fork covered by this block reference table file. + * + * After calling this function, you must call BlockRefTableReaderGetBlocks + * until it returns 0 before calling it again. + */ +bool +BlockRefTableReaderNextRelation(BlockRefTableReader *reader, + RelFileLocator *rlocator, + ForkNumber *forknum, + BlockNumber *limit_block) +{ + BlockRefTableSerializedEntry sentry; + BlockRefTableSerializedEntry zentry = {{0}}; + + /* + * Sanity check: caller must read all blocks from all chunks before moving + * on to the next relation. + */ + Assert(reader->total_chunks == reader->consumed_chunks); + + /* Read serialized entry. */ + BlockRefTableRead(reader, &sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* + * If we just read the sentinel entry indicating that we've reached the + * end, read and check the CRC. + */ + if (memcmp(&sentry, &zentry, sizeof(BlockRefTableSerializedEntry)) == 0) + { + pg_crc32c expected_crc; + pg_crc32c actual_crc; + + /* + * We want to know the CRC of the file excluding the 4-byte CRC + * itself, so copy the current value of the CRC accumulator before + * reading those bytes, and use the copy to finalize the calculation. + */ + expected_crc = reader->buffer.crc; + FIN_CRC32C(expected_crc); + + /* Now we can read the actual value. */ + BlockRefTableRead(reader, &actual_crc, sizeof(pg_crc32c)); + + /* Throw an error if there is a mismatch. */ + if (!EQ_CRC32C(expected_crc, actual_crc)) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" has wrong checksum: expected %08X, found %08X", + reader->error_filename, expected_crc, actual_crc); + + return false; + } + + /* Read chunk size array. */ + if (reader->chunk_size != NULL) + pfree(reader->chunk_size); + reader->chunk_size = palloc(sentry.nchunks * sizeof(uint16)); + BlockRefTableRead(reader, reader->chunk_size, + sentry.nchunks * sizeof(uint16)); + + /* Set up for chunk scan. */ + reader->total_chunks = sentry.nchunks; + reader->consumed_chunks = 0; + + /* Return data to caller. */ + memcpy(rlocator, &sentry.rlocator, sizeof(RelFileLocator)); + *forknum = sentry.forknum; + *limit_block = sentry.limit_block; + return true; +} + +/* + * Get modified blocks associated with the relation fork returned by + * the most recent call to BlockRefTableReaderNextRelation. + * + * On return, block numbers will be written into the 'blocks' array, whose + * length should be passed via 'nblocks'. The return value is the number of + * entries actually written into the 'blocks' array, which may be less than + * 'nblocks' if we run out of modified blocks in the relation fork before + * we run out of room in the array. + */ +unsigned +BlockRefTableReaderGetBlocks(BlockRefTableReader *reader, + BlockNumber *blocks, + int nblocks) +{ + unsigned blocks_found = 0; + + /* Must provide space for at least one block number to be returned. */ + Assert(nblocks > 0); + + /* Loop collecting blocks to return to caller. */ + for (;;) + { + uint16 next_chunk_size; + + /* + * If we've read at least one chunk, maybe it contains some block + * numbers that could satisfy caller's request. + */ + if (reader->consumed_chunks > 0) + { + uint32 chunkno = reader->consumed_chunks - 1; + uint16 chunk_size = reader->chunk_size[chunkno]; + + if (chunk_size == MAX_ENTRIES_PER_CHUNK) + { + /* Bitmap format, so search for bits that are set. */ + while (reader->chunk_position < BLOCKS_PER_CHUNK && + blocks_found < nblocks) + { + uint16 chunkoffset = reader->chunk_position; + uint16 w; + + w = reader->chunk_data[chunkoffset / BLOCKS_PER_ENTRY]; + if ((w & (1u << (chunkoffset % BLOCKS_PER_ENTRY))) != 0) + blocks[blocks_found++] = + chunkno * BLOCKS_PER_CHUNK + chunkoffset; + ++reader->chunk_position; + } + } + else + { + /* Not in bitmap format, so each entry is a 2-byte offset. */ + while (reader->chunk_position < chunk_size && + blocks_found < nblocks) + { + blocks[blocks_found++] = chunkno * BLOCKS_PER_CHUNK + + reader->chunk_data[reader->chunk_position]; + ++reader->chunk_position; + } + } + } + + /* We found enough blocks, so we're done. */ + if (blocks_found >= nblocks) + break; + + /* + * We didn't find enough blocks, so we must need the next chunk. If + * there are none left, though, then we're done anyway. + */ + if (reader->consumed_chunks == reader->total_chunks) + break; + + /* + * Read data for next chunk and reset scan position to beginning of + * chunk. Note that the next chunk might be empty, in which case we + * consume the chunk without actually consuming any bytes from the + * underlying file. + */ + next_chunk_size = reader->chunk_size[reader->consumed_chunks]; + if (next_chunk_size > 0) + BlockRefTableRead(reader, reader->chunk_data, + next_chunk_size * sizeof(uint16)); + ++reader->consumed_chunks; + reader->chunk_position = 0; + } + + return blocks_found; +} + +/* + * Release memory used while reading a block reference table from a file. + */ +void +DestroyBlockRefTableReader(BlockRefTableReader *reader) +{ + if (reader->chunk_size != NULL) + { + pfree(reader->chunk_size); + reader->chunk_size = NULL; + } + pfree(reader); +} + +/* + * Prepare to write a block reference table file incrementally. + * + * Caller must be able to supply BlockRefTableEntry objects sorted in the + * appropriate order. + */ +BlockRefTableWriter * +CreateBlockRefTableWriter(io_callback_fn write_callback, + void *write_callback_arg) +{ + BlockRefTableWriter *writer; + uint32 magic = BLOCKREFTABLE_MAGIC; + + /* Prepare buffer and CRC check and save callbacks. */ + writer = palloc0(sizeof(BlockRefTableWriter)); + writer->buffer.io_callback = write_callback; + writer->buffer.io_callback_arg = write_callback_arg; + INIT_CRC32C(writer->buffer.crc); + + /* Write magic number. */ + BlockRefTableWrite(&writer->buffer, &magic, sizeof(uint32)); + + return writer; +} + +/* + * Append one entry to a block reference table file. + * + * Note that entries must be written in the proper order, that is, sorted by + * tablespace, then database, then relfilenumber, then fork number. Caller + * is responsible for supplying data in the correct order. If that seems hard, + * use an in-memory BlockRefTable instead. + */ +void +BlockRefTableWriteEntry(BlockRefTableWriter *writer, BlockRefTableEntry *entry) +{ + BlockRefTableSerializedEntry sentry; + unsigned j; + + /* Convert to serialized entry format. */ + sentry.rlocator = entry->key.rlocator; + sentry.forknum = entry->key.forknum; + sentry.limit_block = entry->limit_block; + sentry.nchunks = entry->nchunks; + + /* Trim trailing zero entries. */ + while (sentry.nchunks > 0 && entry->chunk_usage[sentry.nchunks - 1] == 0) + sentry.nchunks--; + + /* Write the serialized entry itself. */ + BlockRefTableWrite(&writer->buffer, &sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* Write the untruncated portion of the chunk length array. */ + if (sentry.nchunks != 0) + BlockRefTableWrite(&writer->buffer, entry->chunk_usage, + sentry.nchunks * sizeof(uint16)); + + /* Write the contents of each chunk. */ + for (j = 0; j < entry->nchunks; ++j) + { + if (entry->chunk_usage[j] == 0) + continue; + BlockRefTableWrite(&writer->buffer, entry->chunk_data[j], + entry->chunk_usage[j] * sizeof(uint16)); + } +} + +/* + * Finalize an incremental write of a block reference table file. + */ +void +DestroyBlockRefTableWriter(BlockRefTableWriter *writer) +{ + BlockRefTableFileTerminate(&writer->buffer); + pfree(writer); +} + +/* + * Allocate a standalone BlockRefTableEntry. + * + * When we're manipulating a full in-memory BlockRefTable, the entries are + * part of the hash table and are allocated by simplehash. This routine is + * used by callers that want to write out a BlockRefTable to a file without + * needing to store the whole thing in memory at once. + * + * Entries allocated by this function can be manipulated using the functions + * BlockRefTableEntrySetLimitBlock and BlockRefTableEntryMarkBlockModified + * and then written using BlockRefTableWriteEntry and freed using + * BlockRefTableFreeEntry. + */ +BlockRefTableEntry * +CreateBlockRefTableEntry(RelFileLocator rlocator, ForkNumber forknum) +{ + BlockRefTableEntry *entry = palloc0(sizeof(BlockRefTableEntry)); + + memcpy(&entry->key.rlocator, &rlocator, sizeof(RelFileLocator)); + entry->key.forknum = forknum; + entry->limit_block = InvalidBlockNumber; + + return entry; +} + +/* + * Update a BlockRefTableEntry with a new value for the "limit block" and + * forget any equal-or-higher-numbered modified blocks. + * + * The "limit block" is the shortest known length of the relation within the + * range of WAL records covered by this block reference table. + */ +void +BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry, + BlockNumber limit_block) +{ + unsigned chunkno; + unsigned limit_chunkno; + unsigned limit_chunkoffset; + BlockRefTableChunk limit_chunk; + + /* If we already have an equal or lower limit block, do nothing. */ + if (limit_block >= entry->limit_block) + return; + + /* Record the new limit block value. */ + entry->limit_block = limit_block; + + /* + * Figure out which chunk would store the state of the new limit block, + * and which offset within that chunk. + */ + limit_chunkno = limit_block / BLOCKS_PER_CHUNK; + limit_chunkoffset = limit_block % BLOCKS_PER_CHUNK; + + /* + * If the number of chunks is not large enough for any blocks with equal + * or higher block numbers to exist, then there is nothing further to do. + */ + if (limit_chunkno >= entry->nchunks) + return; + + /* Discard entire contents of any higher-numbered chunks. */ + for (chunkno = limit_chunkno + 1; chunkno < entry->nchunks; ++chunkno) + entry->chunk_usage[chunkno] = 0; + + /* + * Next, we need to discard any offsets within the chunk that would + * contain the limit_block. We must handle this differenly depending on + * whether the chunk that would contain limit_block is a bitmap or an + * array of offsets. + */ + limit_chunk = entry->chunk_data[limit_chunkno]; + if (entry->chunk_usage[limit_chunkno] == MAX_ENTRIES_PER_CHUNK) + { + unsigned chunkoffset; + + /* It's a bitmap. Unset bits. */ + for (chunkoffset = limit_chunkoffset; chunkoffset < BLOCKS_PER_CHUNK; + ++chunkoffset) + limit_chunk[chunkoffset / BLOCKS_PER_ENTRY] &= + ~(1 << (chunkoffset % BLOCKS_PER_ENTRY)); + } + else + { + unsigned i, + j = 0; + + /* It's an offset array. Filter out large offsets. */ + for (i = 0; i < entry->chunk_usage[limit_chunkno]; ++i) + { + Assert(j <= i); + if (limit_chunk[i] < limit_chunkoffset) + limit_chunk[j++] = limit_chunk[i]; + } + Assert(j <= entry->chunk_usage[limit_chunkno]); + entry->chunk_usage[limit_chunkno] = j; + } +} + +/* + * Mark a block in a given BlkRefTableEntry as known to have been modified. + */ +void +BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry, + ForkNumber forknum, + BlockNumber blknum) +{ + unsigned chunkno; + unsigned chunkoffset; + unsigned i; + + /* + * Which chunk should store the state of this block? And what is the + * offset of this block relative to the start of that chunk? + */ + chunkno = blknum / BLOCKS_PER_CHUNK; + chunkoffset = blknum % BLOCKS_PER_CHUNK; + + /* + * If 'nchunks' isn't big enough for us to be able to represent the state + * of this block, we need to enlarge our arrays. + */ + if (chunkno >= entry->nchunks) + { + unsigned max_chunks; + unsigned extra_chunks; + + /* + * New array size is a power of 2, at least 16, big enough so that + * chunkno will be a valid array index. + */ + max_chunks = Max(16, entry->nchunks); + while (max_chunks < chunkno + 1) + chunkno *= 2; + Assert(max_chunks > chunkno); + extra_chunks = max_chunks - entry->nchunks; + + if (entry->nchunks == 0) + { + entry->chunk_size = palloc0(sizeof(uint16) * max_chunks); + entry->chunk_usage = palloc0(sizeof(uint16) * max_chunks); + entry->chunk_data = + palloc0(sizeof(BlockRefTableChunk) * max_chunks); + } + else + { + entry->chunk_size = repalloc(entry->chunk_size, + sizeof(uint16) * max_chunks); + memset(&entry->chunk_size[entry->nchunks], 0, + extra_chunks * sizeof(uint16)); + entry->chunk_usage = repalloc(entry->chunk_usage, + sizeof(uint16) * max_chunks); + memset(&entry->chunk_usage[entry->nchunks], 0, + extra_chunks * sizeof(uint16)); + entry->chunk_data = repalloc(entry->chunk_data, + sizeof(BlockRefTableChunk) * max_chunks); + memset(&entry->chunk_data[entry->nchunks], 0, + extra_chunks * sizeof(BlockRefTableChunk)); + } + entry->nchunks = max_chunks; + } + + /* + * If the chunk that covers this block number doesn't exist yet, create it + * as an array and add the appropriate offset to it. We make it pretty + * small initially, because there might only be 1 or a few block + * references in this chunk and we don't want to use up too much memory. + */ + if (entry->chunk_size[chunkno] == 0) + { + entry->chunk_data[chunkno] = + palloc(sizeof(uint16) * INITIAL_ENTRIES_PER_CHUNK); + entry->chunk_size[chunkno] = INITIAL_ENTRIES_PER_CHUNK; + entry->chunk_data[chunkno][0] = chunkoffset; + entry->chunk_usage[chunkno] = 1; + return; + } + + /* + * If the number of entries in this chunk is already maximum, it must be a + * bitmap. Just set the appropriate bit. + */ + if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK) + { + BlockRefTableChunk chunk = entry->chunk_data[chunkno]; + + chunk[chunkoffset / BLOCKS_PER_ENTRY] |= + 1 << (chunkoffset % BLOCKS_PER_ENTRY); + return; + } + + /* + * There is an existing chunk and it's in array format. Let's find out + * whether it already has an entry for this block. If so, we do not need + * to do anything. + */ + for (i = 0; i < entry->chunk_usage[chunkno]; ++i) + { + if (entry->chunk_data[chunkno][i] == chunkoffset) + return; + } + + /* + * If the number of entries currently used is one less than the maximum, + * it's time to convert to bitmap format. + */ + if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK - 1) + { + BlockRefTableChunk newchunk; + unsigned j; + + /* Allocate a new chunk. */ + newchunk = palloc0(MAX_ENTRIES_PER_CHUNK * sizeof(uint16)); + + /* Set the bit for each existing entry. */ + for (j = 0; j < entry->chunk_usage[chunkno]; ++j) + { + unsigned coff = entry->chunk_data[chunkno][j]; + + newchunk[coff / BLOCKS_PER_ENTRY] |= + 1 << (coff % BLOCKS_PER_ENTRY); + } + + /* Set the bit for the new entry. */ + newchunk[chunkoffset / BLOCKS_PER_ENTRY] |= + 1 << (chunkoffset % BLOCKS_PER_ENTRY); + + /* Swap the new chunk into place and update metadata. */ + pfree(entry->chunk_data[chunkno]); + entry->chunk_data[chunkno] = newchunk; + entry->chunk_size[chunkno] = MAX_ENTRIES_PER_CHUNK; + entry->chunk_usage[chunkno] = MAX_ENTRIES_PER_CHUNK; + return; + } + + /* + * OK, we currently have an array, and we don't need to convert to a + * bitmap, but we do need to add a new element. If there's not enough + * room, we'll have to expand the array. + */ + if (entry->chunk_usage[chunkno] == entry->chunk_size[chunkno]) + { + unsigned newsize = entry->chunk_size[chunkno] * 2; + + Assert(newsize <= MAX_ENTRIES_PER_CHUNK); + entry->chunk_data[chunkno] = repalloc(entry->chunk_data[chunkno], + newsize * sizeof(uint16)); + entry->chunk_size[chunkno] = newsize; + } + + /* Now we can add the new entry. */ + entry->chunk_data[chunkno][entry->chunk_usage[chunkno]] = + chunkoffset; + entry->chunk_usage[chunkno]++; +} + +/* + * Release memory for a BlockRefTablEntry that was created by + * CreateBlockRefTableEntry. + */ +void +BlockRefTableFreeEntry(BlockRefTableEntry *entry) +{ + if (entry->chunk_size != NULL) + { + pfree(entry->chunk_size); + entry->chunk_size = NULL; + } + + if (entry->chunk_usage != NULL) + { + pfree(entry->chunk_usage); + entry->chunk_usage = NULL; + } + + if (entry->chunk_data != NULL) + { + pfree(entry->chunk_data); + entry->chunk_data = NULL; + } + + pfree(entry); +} + +/* + * Comparator for BlockRefTableSerializedEntry objects. + * + * We make the tablespace OID the first column of the sort key to match + * the on-disk tree structure. + */ +static int +BlockRefTableComparator(const void *a, const void *b) +{ + const BlockRefTableSerializedEntry *sa = a; + const BlockRefTableSerializedEntry *sb = b; + + if (sa->rlocator.spcOid > sb->rlocator.spcOid) + return 1; + if (sa->rlocator.spcOid < sb->rlocator.spcOid) + return -1; + + if (sa->rlocator.dbOid > sb->rlocator.dbOid) + return 1; + if (sa->rlocator.dbOid < sb->rlocator.dbOid) + return -1; + + if (sa->rlocator.relNumber > sb->rlocator.relNumber) + return 1; + if (sa->rlocator.relNumber < sb->rlocator.relNumber) + return -1; + + if (sa->forknum > sb->forknum) + return 1; + if (sa->forknum < sb->forknum) + return -1; + + return 0; +} + +/* + * Flush any buffered data out of a BlockRefTableBuffer. + */ +static void +BlockRefTableFlush(BlockRefTableBuffer *buffer) +{ + buffer->io_callback(buffer->io_callback_arg, buffer->data, buffer->used); + buffer->used = 0; +} + +/* + * Read data from a BlockRefTableBuffer, and update the running CRC + * calculation for the returned data (but not any data that we may have + * buffered but not yet actually returned). + */ +static void +BlockRefTableRead(BlockRefTableReader *reader, void *data, int length) +{ + BlockRefTableBuffer *buffer = &reader->buffer; + + /* Loop until read is fully satisfied. */ + while (length > 0) + { + if (buffer->cursor < buffer->used) + { + /* + * If any buffered data is available, use that to satisfy as much + * of the request as possible. + */ + int bytes_to_copy = Min(length, buffer->used - buffer->cursor); + + memcpy(data, &buffer->data[buffer->cursor], bytes_to_copy); + COMP_CRC32C(buffer->crc, &buffer->data[buffer->cursor], + bytes_to_copy); + buffer->cursor += bytes_to_copy; + data = ((char *) data) + bytes_to_copy; + length -= bytes_to_copy; + } + else if (length >= BUFSIZE) + { + /* + * If the request length is long, read directly into caller's + * buffer. + */ + int bytes_read; + + bytes_read = buffer->io_callback(buffer->io_callback_arg, + data, length); + COMP_CRC32C(buffer->crc, data, bytes_read); + data = ((char *) data) + bytes_read; + length -= bytes_read; + + /* If we didn't get anything, that's bad. */ + if (bytes_read == 0) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" ends unexpectedly", + reader->error_filename); + } + else + { + /* + * Refill our buffer. + */ + buffer->used = buffer->io_callback(buffer->io_callback_arg, + buffer->data, BUFSIZE); + buffer->cursor = 0; + + /* If we didn't get anything, that's bad. */ + if (buffer->used == 0) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" ends unexpectedly", + reader->error_filename); + } + } +} + +/* + * Supply data to a BlockRefTableBuffer for write to the underlying File, + * and update the running CRC calculation for that data. + */ +static void +BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data, int length) +{ + /* Update running CRC calculation. */ + COMP_CRC32C(buffer->crc, data, length); + + /* If the new data can't fit into the buffer, flush the buffer. */ + if (buffer->used + length > BUFSIZE) + { + buffer->io_callback(buffer->io_callback_arg, buffer->data, + buffer->used); + buffer->used = 0; + } + + /* If the new data would fill the buffer, or more, write it directly. */ + if (length >= BUFSIZE) + { + buffer->io_callback(buffer->io_callback_arg, data, length); + return; + } + + /* Otherwise, copy the new data into the buffer. */ + memcpy(&buffer->data[buffer->used], data, length); + buffer->used += length; + Assert(buffer->used <= BUFSIZE); +} + +/* + * Generate the sentinel and CRC required at the end of a block reference + * table file and flush them out of our internal buffer. + */ +static void +BlockRefTableFileTerminate(BlockRefTableBuffer *buffer) +{ + BlockRefTableSerializedEntry zentry = {{0}}; + pg_crc32c crc; + + /* Write a sentinel indicating that there are no more entries. */ + BlockRefTableWrite(buffer, &zentry, + sizeof(BlockRefTableSerializedEntry)); + + /* + * Writing the checksum will perturb the ongoing checksum calculation, so + * copy the state first and finalize the computation using the copy. + */ + crc = buffer->crc; + FIN_CRC32C(crc); + BlockRefTableWrite(buffer, &crc, sizeof(pg_crc32c)); + + /* Flush any leftover data out of our buffer. */ + BlockRefTableFlush(buffer); +} diff --git a/src/common/meson.build b/src/common/meson.build index 84eb100f048..12fd43e87fd 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -4,6 +4,7 @@ common_sources = files( 'archive.c', 'base64.c', 'binaryheap.c', + 'blkreftable.c', 'checksum_helper.c', 'compression.c', 'controldata_utils.c', diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a14126d164f..da715803644 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -209,6 +209,7 @@ extern int XLogFileOpen(XLogSegNo segno, TimeLineID tli); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); extern XLogSegNo XLogGetLastRemovedSegno(void); +extern XLogSegNo XLogGetOldestSegno(TimeLineID tli); extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); diff --git a/src/include/backup/walsummary.h b/src/include/backup/walsummary.h new file mode 100644 index 00000000000..8e3dc7b837b --- /dev/null +++ b/src/include/backup/walsummary.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * walsummary.h + * WAL summary management + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/backup/walsummary.h + * + *------------------------------------------------------------------------- + */ +#ifndef WALSUMMARY_H +#define WALSUMMARY_H + +#include <time.h> + +#include "access/xlogdefs.h" +#include "nodes/pg_list.h" +#include "storage/fd.h" + +typedef struct WalSummaryIO +{ + File file; + off_t filepos; +} WalSummaryIO; + +typedef struct WalSummaryFile +{ + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimeLineID tli; +} WalSummaryFile; + +extern List *GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, + XLogRecPtr end_lsn); +extern List *FilterWalSummaries(List *wslist, TimeLineID tli, + XLogRecPtr start_lsn, XLogRecPtr end_lsn); +extern bool WalSummariesAreComplete(List *wslist, + XLogRecPtr start_lsn, XLogRecPtr end_lsn, + XLogRecPtr *missing_lsn); +extern File OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok); +extern void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, + time_t cutoff_time); + +extern int ReadWalSummary(void *wal_summary_io, void *data, int length); +extern int WriteWalSummary(void *wal_summary_io, void *data, int length); +extern void ReportWalSummaryError(void *callback_arg, char *fmt,...) pg_attribute_printf(2, 3); + +#endif /* WALSUMMARY_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 77e8b137649..916c8ec8d0e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12099,4 +12099,23 @@ proname => 'any_value_transfn', prorettype => 'anyelement', proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' }, +{ oid => '8436', + descr => 'list of available WAL summary files', + proname => 'pg_available_wal_summaries', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 's', + prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,pg_lsn,pg_lsn}', + proargmodes => '{o,o,o}', + proargnames => '{tli,start_lsn,end_lsn}', + prosrc => 'pg_available_wal_summaries' }, +{ oid => '8437', + descr => 'contents of a WAL sumamry file', + proname => 'pg_wal_summary_contents', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 's', + prorettype => 'record', proargtypes => 'int8 pg_lsn pg_lsn', + proallargtypes => '{int8,pg_lsn,pg_lsn,oid,oid,oid,int2,int8,bool}', + proargmodes => '{i,i,i,o,o,o,o,o,o}', + proargnames => '{tli,start_lsn,end_lsn,relfilenode,reltablespace,reldatabase,relforknumber,relblocknumber,is_limit_block}', + prosrc => 'pg_wal_summary_contents' }, + ] diff --git a/src/include/common/blkreftable.h b/src/include/common/blkreftable.h new file mode 100644 index 00000000000..5141f3acd50 --- /dev/null +++ b/src/include/common/blkreftable.h @@ -0,0 +1,116 @@ +/*------------------------------------------------------------------------- + * + * blkreftable.h + * Block reference tables. + * + * A block reference table is used to keep track of which blocks have + * been modified by WAL records within a certain LSN range. + * + * For each relation fork, there is a "limit block number". All existing + * blocks greater than or equal to the limit block number must be + * considered modified; for those less than the limit block number, + * we maintain a bitmap. When a relation fork is created or dropped, + * the limit block number should be set to 0. When it's truncated, + * the limit block number should be set to the length in blocks to + * which it was truncated. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/common/blkreftable.h + * + *------------------------------------------------------------------------- + */ +#ifndef BLKREFTABLE_H +#define BLKREFTABLE_H + +#include "storage/block.h" +#include "storage/relfilelocator.h" + +/* Magic number for serialization file format. */ +#define BLOCKREFTABLE_MAGIC 0x652b137b + +typedef struct BlockRefTable BlockRefTable; +typedef struct BlockRefTableEntry BlockRefTableEntry; +typedef struct BlockRefTableReader BlockRefTableReader; +typedef struct BlockRefTableWriter BlockRefTableWriter; + +/* + * The return value of io_callback_fn should be the number of bytes read + * or written. If an error occurs, the functions should report it and + * not return. When used as a write callback, short writes should be retried + * or treated as errors, so that if the callback returns, the return value + * is always the request length. + * + * report_error_fn should not return. + */ +typedef int (*io_callback_fn) (void *callback_arg, void *data, int length); +typedef void (*report_error_fn) (void *calblack_arg, char *msg,...) pg_attribute_printf(2, 3); + + +/* + * Functions for manipulating an entire in-memory block reference table. + */ +extern BlockRefTable *CreateEmptyBlockRefTable(void); +extern void BlockRefTableSetLimitBlock(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber limit_block); +extern void BlockRefTableMarkBlockModified(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber blknum); +extern void WriteBlockRefTable(BlockRefTable *brtab, + io_callback_fn write_callback, + void *write_callback_arg); + +extern BlockRefTableEntry *BlockRefTableGetEntry(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber *limit_block); +extern int BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry, + BlockNumber start_blkno, + BlockNumber stop_blkno, + BlockNumber *blocks, + int nblocks); + +/* + * Functions for reading a block reference table incrementally from disk. + */ +extern BlockRefTableReader *CreateBlockRefTableReader(io_callback_fn read_callback, + void *read_callback_arg, + char *error_filename, + report_error_fn error_callback, + void *error_callback_arg); +extern bool BlockRefTableReaderNextRelation(BlockRefTableReader *reader, + RelFileLocator *rlocator, + ForkNumber *forknum, + BlockNumber *limit_block); +extern unsigned BlockRefTableReaderGetBlocks(BlockRefTableReader *reader, + BlockNumber *blocks, + int nblocks); +extern void DestroyBlockRefTableReader(BlockRefTableReader *reader); + +/* + * Functions for writing a block reference table incrementally to disk. + * + * Note that entries must be written in the proper order, that is, sorted by + * database, then tablespace, then relfilenumber, then fork number. Caller + * is responsible for supplying data in the correct order. If that seems hard, + * use an in-memory BlockRefTable instead. + */ +extern BlockRefTableWriter *CreateBlockRefTableWriter(io_callback_fn write_callback, + void *write_callback_arg); +extern void BlockRefTableWriteEntry(BlockRefTableWriter *writer, + BlockRefTableEntry *entry); +extern void DestroyBlockRefTableWriter(BlockRefTableWriter *writer); + +extern BlockRefTableEntry *CreateBlockRefTableEntry(RelFileLocator rlocator, + ForkNumber forknum); +extern void BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry, + BlockNumber limit_block); +extern void BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry, + ForkNumber forknum, + BlockNumber blknum); +extern void BlockRefTableFreeEntry(BlockRefTableEntry *entry); + +#endif /* BLKREFTABLE_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 1043a4d7821..74bc2f97cbd 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -336,6 +336,7 @@ typedef enum BackendType B_STARTUP, B_WAL_RECEIVER, B_WAL_SENDER, + B_WAL_SUMMARIZER, B_WAL_WRITER, } BackendType; @@ -442,6 +443,7 @@ typedef enum CheckpointerProcess, WalWriterProcess, WalReceiverProcess, + WalSummarizerProcess, NUM_AUXPROCTYPES /* Must be last! */ } AuxProcType; @@ -454,6 +456,7 @@ extern PGDLLIMPORT AuxProcType MyAuxProcType; #define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess) #define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess) #define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess) +#define AmWalSummarizerProcess() (MyAuxProcType == WalSummarizerProcess) /***************************************************************************** diff --git a/src/include/postmaster/walsummarizer.h b/src/include/postmaster/walsummarizer.h new file mode 100644 index 00000000000..180d3f34b9b --- /dev/null +++ b/src/include/postmaster/walsummarizer.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * walsummarizer.h + * + * Header file for background WAL summarization process. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/postmaster/walsummarizer.h + * + *------------------------------------------------------------------------- + */ +#ifndef WALSUMMARIZER_H +#define WALSUMMARIZER_H + +#include "access/xlogdefs.h" + +extern bool summarize_wal; +extern int wal_summary_keep_time; + +extern Size WalSummarizerShmemSize(void); +extern void WalSummarizerShmemInit(void); +extern void WalSummarizerMain(void) pg_attribute_noreturn(); + +extern XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, + bool *lsn_is_exact, + bool reset_pending_lsn); +extern void SetWalSummarizerLatch(void); +extern XLogRecPtr WaitForWalSummarization(XLogRecPtr lsn, long timeout, + XLogRecPtr *pending_lsn); + +#endif diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 4b259612495..e87fd25d643 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -417,11 +417,12 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs; * We set aside some extra PGPROC structures for auxiliary processes, * ie things that aren't full-fledged backends but need shmem access. * - * Background writer, checkpointer, WAL writer and archiver run during normal - * operation. Startup process and WAL receiver also consume 2 slots, but WAL - * writer is launched only after startup has exited, so we only need 5 slots. + * Background writer, checkpointer, WAL writer, WAL summarizer, and archiver + * run during normal operation. Startup process and WAL receiver also consume + * 2 slots, but WAL writer is launched only after startup has exited, so we + * only need 6 slots. */ -#define NUM_AUXILIARY_PROCS 5 +#define NUM_AUXILIARY_PROCS 6 /* configurable options */ extern PGDLLIMPORT int DeadlockTimeout; diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 0c382559617..eaa8c46ddac 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -72,6 +72,7 @@ enum config_group WAL_RECOVERY, WAL_ARCHIVE_RECOVERY, WAL_RECOVERY_TARGET, + WAL_SUMMARIZATION, REPLICATION_SENDING, REPLICATION_PRIMARY, REPLICATION_STANDBY, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ba41149b881..93900493142 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4012,3 +4012,14 @@ yyscan_t z_stream z_streamp zic_t +BlockRefTable +BlockRefTableBuffer +BlockRefTableEntry +BlockRefTableKey +BlockRefTableReader +BlockRefTableSerializedEntry +BlockRefTableWriter +SummarizerReadLocalXLogPrivate +WalSummarizerData +WalSummaryFile +WalSummaryIO |