aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorJeff Davis <jdavis@postgresql.org>2022-04-06 22:26:43 -0700
committerJeff Davis <jdavis@postgresql.org>2022-04-06 23:06:46 -0700
commit5c279a6d350205cc98f91fb8e1d3e4442a6b25d1 (patch)
tree4165add040730afa0e116ab5be1db5dc6fa93aea /src/backend
parenta8cfb0c1a964ebbe830c5138d389b0d2627ec298 (diff)
downloadpostgresql-5c279a6d350205cc98f91fb8e1d3e4442a6b25d1.tar.gz
postgresql-5c279a6d350205cc98f91fb8e1d3e4442a6b25d1.zip
Custom WAL Resource Managers.
Allow extensions to specify a new custom resource manager (rmgr), which allows specialized WAL. This is meant to be used by a Table Access Method or Index Access Method. Prior to this commit, only Generic WAL was available, which offers support for recovery and physical replication but not logical replication. Reviewed-by: Julien Rouhaud, Bharath Rupireddy, Andres Freund Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/rmgr.c124
-rw-r--r--src/backend/access/transam/xlogreader.c2
-rw-r--r--src/backend/access/transam/xlogrecovery.c33
-rw-r--r--src/backend/postmaster/postmaster.c6
-rw-r--r--src/backend/replication/logical/decode.c8
-rw-r--r--src/backend/utils/init/miscinit.c2
-rw-r--r--src/backend/utils/misc/guc.c77
7 files changed, 215 insertions, 37 deletions
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..3c2dc1000db 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,16 +24,138 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/message.h"
#include "replication/origin.h"
#include "storage/standby.h"
+#include "utils/builtins.h"
#include "utils/relmapper.h"
/* must be kept in sync with RmgrData definition in xlog_internal.h */
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
{ name, redo, desc, identify, startup, cleanup, mask, decode },
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};
+
+/*
+ * Start up all resource managers.
+ */
+void
+RmgrStartup(void)
+{
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+
+ if (RmgrTable[rmid].rm_startup != NULL)
+ RmgrTable[rmid].rm_startup();
+ }
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+RmgrCleanup(void)
+{
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+
+ if (RmgrTable[rmid].rm_cleanup != NULL)
+ RmgrTable[rmid].rm_cleanup();
+ }
+}
+
+/*
+ * Emit ERROR when we encounter a record with an RmgrId we don't
+ * recognize.
+ */
+void
+RmgrNotFound(RmgrId rmid)
+{
+ ereport(ERROR, (errmsg("resource manager with ID %d not registered", rmid),
+ errhint("Include the extension module that implements this resource manager in shared_preload_libraries.")));
+}
+
+/*
+ * Register a new custom WAL resource manager.
+ *
+ * Resource manager IDs must be globally unique across all extensions. Refer
+ * to https://wiki.postgresql.org/wiki/CustomWALResourceManager to reserve a
+ * unique RmgrId for your extension, to avoid conflicts with other extension
+ * developers. During development, use RM_EXPERIMENTAL_ID to avoid needlessly
+ * reserving a new ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+ if (rmgr->rm_name == NULL || strlen(rmgr->rm_name) == 0)
+ ereport(ERROR, (errmsg("custom resource manager name is invalid"),
+ errhint("Provide a non-empty name for the custom resource manager.")));
+
+ if (!RMID_IS_CUSTOM(rmid))
+ ereport(ERROR, (errmsg("custom resource manager ID %d is out of range", rmid),
+ errhint("Provide a custom resource manager ID between %d and %d.",
+ RM_MIN_CUSTOM_ID, RM_MAX_CUSTOM_ID)));
+
+ if (!process_shared_preload_libraries_in_progress)
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Custom resource manager must be registered while initializing modules in shared_preload_libraries.")));
+
+ if (RmgrTable[rmid].rm_name != NULL)
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Custom resource manager \"%s\" already registered with the same ID.",
+ RmgrTable[rmid].rm_name)));
+
+ /* check for existing rmgr with the same name */
+ for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
+ {
+ if (!RmgrIdExists(existing_rmid))
+ continue;
+
+ if (!pg_strcasecmp(RmgrTable[existing_rmid].rm_name, rmgr->rm_name))
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Existing resource manager with ID %d has the same name.", existing_rmid)));
+ }
+
+ /* register it */
+ RmgrTable[rmid] = *rmgr;
+ ereport(LOG,
+ (errmsg("registered custom resource manager \"%s\" with ID %d",
+ rmgr->rm_name, rmid)));
+}
+
+/* SQL SRF showing loaded resource managers */
+Datum
+pg_get_wal_resource_managers(PG_FUNCTION_ARGS)
+{
+#define PG_GET_RESOURCE_MANAGERS_COLS 3
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Datum values[PG_GET_RESOURCE_MANAGERS_COLS];
+ bool nulls[PG_GET_RESOURCE_MANAGERS_COLS] = {0};
+
+ SetSingleFuncCall(fcinfo, 0);
+
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+ values[0] = Int32GetDatum(rmid);
+ values[1] = CStringGetTextDatum(GetRmgr(rmid).rm_name);
+ values[2] = BoolGetDatum(RMID_IS_BUILTIN(rmid));
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ }
+
+ return (Datum) 0;
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e437c429920..161cf13fed2 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
(uint32) SizeOfXLogRecord, record->xl_tot_len);
return false;
}
- if (record->xl_rmid > RM_MAX_ID)
+ if (!RMID_IS_VALID(record->xl_rmid))
{
report_invalid_record(state,
"invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 1b7bae387a0..55391921679 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1541,7 +1541,6 @@ ShutdownWalRecovery(void)
void
PerformWalRecovery(void)
{
- int rmid;
XLogRecord *record;
bool reachedRecoveryTarget = false;
TimeLineID replayTLI;
@@ -1614,12 +1613,7 @@ PerformWalRecovery(void)
InRedo = true;
- /* Initialize resource managers */
- for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- {
- if (RmgrTable[rmid].rm_startup != NULL)
- RmgrTable[rmid].rm_startup();
- }
+ RmgrStartup();
ereport(LOG,
(errmsg("redo starts at %X/%X",
@@ -1756,12 +1750,7 @@ PerformWalRecovery(void)
}
}
- /* Allow resource managers to do any required cleanup. */
- for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- {
- if (RmgrTable[rmid].rm_cleanup != NULL)
- RmgrTable[rmid].rm_cleanup();
- }
+ RmgrCleanup();
ereport(LOG,
(errmsg("redo done at %X/%X system usage: %s",
@@ -1881,7 +1870,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
xlogrecovery_redo(xlogreader, *replayTLI);
/* Now apply the WAL record itself */
- RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+ GetRmgr(record->xl_rmid).rm_redo(xlogreader);
/*
* After redo, check whether the backup pages associated with the WAL
@@ -2111,20 +2100,20 @@ rm_redo_error_callback(void *arg)
void
xlog_outdesc(StringInfo buf, XLogReaderState *record)
{
- RmgrId rmid = XLogRecGetRmid(record);
+ RmgrData rmgr = GetRmgr(XLogRecGetRmid(record));
uint8 info = XLogRecGetInfo(record);
const char *id;
- appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+ appendStringInfoString(buf, rmgr.rm_name);
appendStringInfoChar(buf, '/');
- id = RmgrTable[rmid].rm_identify(info);
+ id = rmgr.rm_identify(info);
if (id == NULL)
appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
else
appendStringInfo(buf, "%s: ", id);
- RmgrTable[rmid].rm_desc(buf, record);
+ rmgr.rm_desc(buf, record);
}
#ifdef WAL_DEBUG
@@ -2273,7 +2262,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
static void
verifyBackupPageConsistency(XLogReaderState *record)
{
- RmgrId rmid = XLogRecGetRmid(record);
+ RmgrData rmgr = GetRmgr(XLogRecGetRmid(record));
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
@@ -2353,10 +2342,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
* If masking function is defined, mask both the primary and replay
* images
*/
- if (RmgrTable[rmid].rm_mask != NULL)
+ if (rmgr.rm_mask != NULL)
{
- RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
- RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+ rmgr.rm_mask(replay_image_masked, blkno);
+ rmgr.rm_mask(primary_image_masked, blkno);
}
/* Time to compare the primary and replay images. */
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index d5551e0af68..3535e9e47d2 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1040,6 +1040,12 @@ PostmasterMain(int argc, char *argv[])
InitializeShmemGUCs();
/*
+ * Now that modules have been loaded, we can process any custom resource
+ * managers specified in the wal_consistency_checking GUC.
+ */
+ InitializeWalConsistencyChecking();
+
+ /*
* If -C was specified with a runtime-computed GUC, we held off printing
* the value earlier, as the GUC was not yet initialized. We handle -C
* for most GUCs before we lock the data directory so that the option may
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..c6ea7c98e15 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,7 +94,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
XLogRecordBuffer buf;
TransactionId txid;
- RmgrId rmid;
+ RmgrData rmgr;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
@@ -115,10 +115,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
buf.origptr);
}
- rmid = XLogRecGetRmid(record);
+ rmgr = GetRmgr(XLogRecGetRmid(record));
- if (RmgrTable[rmid].rm_decode != NULL)
- RmgrTable[rmid].rm_decode(ctx, &buf);
+ if (rmgr.rm_decode != NULL)
+ rmgr.rm_decode(ctx, &buf);
else
{
/* just deal with xid, and done */
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 0d3cfe8240b..30f0f19dd53 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -1610,6 +1610,7 @@ char *local_preload_libraries_string = NULL;
/* Flag telling that we are loading shared_preload_libraries */
bool process_shared_preload_libraries_in_progress = false;
+bool process_shared_preload_libraries_done = false;
/*
* load the shared libraries listed in 'libraries'
@@ -1677,6 +1678,7 @@ process_shared_preload_libraries(void)
"shared_preload_libraries",
false);
process_shared_preload_libraries_in_progress = false;
+ process_shared_preload_libraries_done = true;
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 998b8a94c45..89f8259bac5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -245,6 +245,11 @@ static bool check_default_with_oids(bool *newval, void **extra, GucSource source
static ConfigVariable *ProcessConfigFileInternal(GucContext context,
bool applySettings, int elevel);
+/*
+ * Track whether there were any deferred checks for custom resource managers
+ * specified in wal_consistency_checking.
+ */
+static bool check_wal_consistency_checking_deferred = false;
/*
* Options for enum values defined in this module.
@@ -5836,6 +5841,36 @@ InitializeGUCOptions(void)
}
/*
+ * If any custom resource managers were specified in the
+ * wal_consistency_checking GUC, processing was deferred. Now that
+ * shared_preload_libraries have been loaded, process wal_consistency_checking
+ * again.
+ */
+void
+InitializeWalConsistencyChecking(void)
+{
+ Assert(process_shared_preload_libraries_done);
+
+ if (check_wal_consistency_checking_deferred)
+ {
+ struct config_generic *guc;
+
+ guc = find_option("wal_consistency_checking", false, false, ERROR);
+
+ check_wal_consistency_checking_deferred = false;
+
+ set_config_option("wal_consistency_checking",
+ wal_consistency_checking_string,
+ PGC_POSTMASTER, guc->source,
+ GUC_ACTION_SET, true, ERROR, false);
+
+ /* checking should not be deferred again */
+ Assert(!check_wal_consistency_checking_deferred);
+ }
+
+}
+
+/*
* Assign any GUC values that can come from the server's environment.
*
* This is called from InitializeGUCOptions, and also from ProcessConfigFile
@@ -11882,13 +11917,13 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
{
char *tok = (char *) lfirst(l);
bool found = false;
- RmgrId rmid;
+ int rmid;
/* Check for 'all'. */
if (pg_strcasecmp(tok, "all") == 0)
{
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- if (RmgrTable[rmid].rm_mask != NULL)
+ if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL)
newwalconsistency[rmid] = true;
found = true;
}
@@ -11900,8 +11935,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
*/
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
- if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
- RmgrTable[rmid].rm_mask != NULL)
+ if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL &&
+ pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0)
{
newwalconsistency[rmid] = true;
found = true;
@@ -11912,10 +11947,21 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
/* If a valid resource manager is found, check for the next one. */
if (!found)
{
- GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
- pfree(rawstring);
- list_free(elemlist);
- return false;
+ /*
+ * Perhaps it's a custom resource manager. If so, defer checking
+ * until InitializeWalConsistencyChecking().
+ */
+ if (!process_shared_preload_libraries_done)
+ {
+ check_wal_consistency_checking_deferred = true;
+ }
+ else
+ {
+ GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
+ pfree(rawstring);
+ list_free(elemlist);
+ return false;
+ }
}
}
@@ -11931,7 +11977,20 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
static void
assign_wal_consistency_checking(const char *newval, void *extra)
{
- wal_consistency_checking = (bool *) extra;
+ /*
+ * If some checks were deferred, it's possible that the checks will fail
+ * later during InitializeWalConsistencyChecking(). But in that case, the
+ * postmaster will exit anyway, so it's safe to proceed with the
+ * assignment.
+ *
+ * Any built-in resource managers specified are assigned immediately,
+ * which affects WAL created before shared_preload_libraries are
+ * processed. Any custom resource managers specified won't be assigned
+ * until after shared_preload_libraries are processed, but that's OK
+ * because WAL for a custom resource manager can't be written before the
+ * module is loaded anyway.
+ */
+ wal_consistency_checking = extra;
}
static bool