diff options
author | Robert Haas <rhaas@postgresql.org> | 2014-01-31 22:45:17 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2014-01-31 22:45:36 -0500 |
commit | 858ec11858a914d4c380971985709b6d6b7dd6fc (patch) | |
tree | 59eb508185cd8544c3485919a25dee15f3818c21 /src/backend | |
parent | 5bdef38b8917cfbe206d14969c61a5d38fc822b6 (diff) | |
download | postgresql-858ec11858a914d4c380971985709b6d6b7dd6fc.tar.gz postgresql-858ec11858a914d4c380971985709b6d6b7dd6fc.zip |
Introduce replication slots.
Replication slots are a crash-safe data structure which can be created
on either a master or a standby to prevent premature removal of
write-ahead log segments needed by a standby, as well as (with
hot_standby_feedback=on) pruning of tuples whose removal would cause
replication conflicts. Slots have some advantages over existing
techniques, as explained in the documentation.
In a few places, we refer to the type of replication slots introduced
by this patch as "physical" slots, because forthcoming patches for
logical decoding will also have slots, but with somewhat different
properties.
Andres Freund and Robert Haas
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/xlog.c | 95 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 12 | ||||
-rw-r--r-- | src/backend/replication/Makefile | 2 | ||||
-rw-r--r-- | src/backend/replication/README | 5 | ||||
-rw-r--r-- | src/backend/replication/basebackup.c | 4 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 16 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 54 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 57 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 1066 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 193 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 5 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 13 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 197 | ||||
-rw-r--r-- | src/backend/storage/ipc/ipci.c | 3 | ||||
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 42 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 4 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 5 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 12 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 3 |
19 files changed, 1750 insertions, 38 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b333d820c72..7f63185b1cc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/barrier.h" @@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyModeRequested = false; static char *PrimaryConnInfo = NULL; +static char *PrimarySlotName = NULL; static char *TriggerFile = NULL; /* are we currently in standby mode? */ @@ -485,6 +487,8 @@ typedef struct XLogCtlData uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ + XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ + XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG * segment */ @@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); @@ -2909,6 +2914,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) } /* + * Record the LSN up to which we can remove WAL because it's not required by + * any replication slot. + */ +void +XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->replicationSlotMinLSN = lsn; + SpinLockRelease(&xlogctl->info_lck); +} + + +/* + * Return the oldest LSN we must retain to satisfy the needs of some + * replication slot. + */ +static XLogRecPtr +XLogGetReplicationSlotMinimumLSN(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr retval; + SpinLockAcquire(&xlogctl->info_lck); + retval = xlogctl->replicationSlotMinLSN; + SpinLockRelease(&xlogctl->info_lck); + + return retval; +} + +/* * Advance minRecoveryPoint in control file. * * If we crash during recovery, we must reach this point again before the @@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void) (errmsg_internal("primary_conninfo = '%s'", PrimaryConnInfo))); } + else if (strcmp(item->name, "primary_slotname") == 0) + { + ReplicationSlotValidateName(item->value, ERROR); + PrimarySlotName = pstrdup(item->value); + ereport(DEBUG2, + (errmsg_internal("primary_slotname = '%s'", + PrimarySlotName))); + } else if (strcmp(item->name, "trigger_file") == 0) { TriggerFile = pstrdup(item->value); @@ -6506,6 +6552,12 @@ StartupXLOG(void) XLogCtl->ckptXid = checkPoint.nextXid; /* + * Initialize replication slots, before there's a chance to remove + * required resources. + */ + StartupReplicationSlots(checkPoint.redo); + + /* * Startup MultiXact. We need to do this early for two reasons: one * is that we might try to access multixacts when we do tuple freezing, * and the other is we need its state initialized because we attempt @@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointMultiXact(); CheckPointPredicate(); CheckPointRelationMap(); + CheckPointReplicationSlots(); CheckPointBuffers(flags); /* performs all required fsyncs */ /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); @@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags) /* * Retreat *logSegNo to the last segment that we need to retain because of - * wal_keep_segments. This is calculated by subtracting wal_keep_segments - * from the given xlog location, recptr. + * either wal_keep_segments or replication slots. + * + * This is calculated by subtracting wal_keep_segments from the given xlog + * location, recptr and by making sure that that result is below the + * requirement of replication slots. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { XLogSegNo segno; - - if (wal_keep_segments == 0) - return; + XLogRecPtr keep; XLByteToSeg(recptr, segno); + keep = XLogGetReplicationSlotMinimumLSN(); - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + /* compute limit for wal_keep_segments first */ + if (wal_keep_segments > 0) + { + /* avoid underflow, don't go below 1 */ + if (segno <= wal_keep_segments) + segno = 1; + else + segno = segno - wal_keep_segments; + } + + /* then check whether slots limit removal further */ + if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + { + XLogRecPtr slotSegNo; + + XLByteToSeg(keep, slotSegNo); + + if (slotSegNo <= 0) + segno = 1; + else if (slotSegNo < segno) + segno = slotSegNo; + } /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) @@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo, + PrimarySlotName); receivedUpto = 0; } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 277af61f9da..f02efeca974 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS WHERE S.usesysid = U.oid AND S.pid = W.pid; +CREATE VIEW pg_replication_slots AS + SELECT + L.slot_name, + L.slot_type, + L.datoid, + D.datname AS database, + L.active, + L.xmin, + L.restart_lsn + FROM pg_get_replication_slots() AS L + LEFT JOIN pg_database D ON (L.datoid = D.oid); + CREATE VIEW pg_stat_database AS SELECT D.oid AS datid, diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 2dde0118a47..7941cb8d5e7 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o syncrep.o + repl_gram.o slot.o slotfuncs.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README index 60120ede29c..2f5df49de67 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -47,8 +47,9 @@ to fetch more WAL (if streaming replication is configured). Walreceiver is a postmaster subprocess, so the startup process can't fork it directly. Instead, it sends a signal to postmaster, asking postmaster to launch -it. Before that, however, startup process fills in WalRcvData->conninfo, -and initializes the starting point in WalRcvData->receiveStart. +it. Before that, however, startup process fills in WalRcvData->conninfo +and WalRcvData->slotname, and initializes the starting point in +WalRcvData->receiveStart. As walreceiver receives WAL from the master server, and writes and flushes it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 7d0ed9ce4c8..781f678097d 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -847,6 +847,10 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces) if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0) continue; + /* Skip pg_replslot, not useful to copy */ + if (strcmp(de->d_name, "pg_replslot") == 0) + continue; + /* * Check if the postmaster has signaled us to exit, and abort with an * error in that case. The error handler further up will call diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2e057b8969f..ecec8b34563 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -49,7 +49,8 @@ static char *recvBuf = NULL; static void libpqrcv_connect(char *conninfo); static void libpqrcv_identify_system(TimeLineID *primary_tli); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); -static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); +static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, + char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); @@ -171,15 +172,20 @@ libpqrcv_identify_system(TimeLineID *primary_tli) * throws an ERROR. */ static bool -libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint) +libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) { char cmd[64]; PGresult *res; /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u", - (uint32) (startpoint >> 32), (uint32) startpoint, - tli); + if (slotname != NULL) + snprintf(cmd, sizeof(cmd), + "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname, + (uint32) (startpoint >> 32), (uint32) startpoint, tli); + else + snprintf(cmd, sizeof(cmd), + "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpoint >> 32), (uint32) startpoint, tli); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) == PGRES_COMMAND_OK) diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 015aa44d89c..d4bd59bab24 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -65,7 +65,7 @@ Node *replication_parse_result; } /* Non-keyword tokens */ -%token <str> SCONST +%token <str> SCONST IDENT %token <uintval> UCONST %token <recptr> RECPTR @@ -73,6 +73,8 @@ Node *replication_parse_result; %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM %token K_START_REPLICATION +%token K_CREATE_REPLICATION_SLOT +%token K_DROP_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_LABEL %token K_PROGRESS @@ -80,12 +82,15 @@ Node *replication_parse_result; %token K_NOWAIT %token K_WAL %token K_TIMELINE +%token K_PHYSICAL +%token K_SLOT %type <node> command -%type <node> base_backup start_replication identify_system timeline_history +%type <node> base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history %type <list> base_backup_opt_list %type <defelt> base_backup_opt %type <uintval> opt_timeline +%type <str> opt_slot %% firstcmd: command opt_semicolon @@ -102,6 +107,8 @@ command: identify_system | base_backup | start_replication + | create_replication_slot + | drop_replication_slot | timeline_history ; @@ -158,18 +165,42 @@ base_backup_opt: } ; +/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */ +create_replication_slot: + K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL + { + CreateReplicationSlotCmd *cmd; + cmd = makeNode(CreateReplicationSlotCmd); + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + $$ = (Node *) cmd; + } + ; + +/* DROP_REPLICATION_SLOT SLOT slot */ +drop_replication_slot: + K_DROP_REPLICATION_SLOT IDENT + { + DropReplicationSlotCmd *cmd; + cmd = makeNode(DropReplicationSlotCmd); + cmd->slotname = $2; + $$ = (Node *) cmd; + } + ; + /* - * START_REPLICATION %X/%X [TIMELINE %d] + * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ start_replication: - K_START_REPLICATION RECPTR opt_timeline + K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline { StartReplicationCmd *cmd; cmd = makeNode(StartReplicationCmd); - cmd->startpoint = $2; - cmd->timeline = $3; - + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + cmd->startpoint = $4; + cmd->timeline = $5; $$ = (Node *) cmd; } ; @@ -205,6 +236,15 @@ timeline_history: $$ = (Node *) cmd; } ; + +opt_physical : K_PHYSICAL | /* EMPTY */; + + +opt_slot : K_SLOT IDENT + { + $$ = $2; + } + | /* nothing */ { $$ = NULL; } %% #include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 01e5ac6efb0..24195a59719 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -16,6 +16,7 @@ #include "postgres.h" #include "utils/builtins.h" +#include "parser/scansup.h" /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ #undef fprintf @@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar); %option warn %option prefix="replication_yy" -%x xq +%x xq xd /* Extended quote * xqdouble implements embedded quote, '''' @@ -57,12 +58,26 @@ xqstart {quote} xqdouble {quote}{quote} xqinside [^']+ +/* Double quote + * Allows embedded spaces and other special characters into identifiers. + */ +dquote \" +xdstart {dquote} +xdstop {dquote} +xddouble {dquote}{dquote} +xdinside [^"]+ + digit [0-9]+ hexdigit [0-9A-Za-z]+ quote ' quotestop {quote} +ident_start [A-Za-z\200-\377_] +ident_cont [A-Za-z\200-\377_0-9\$] + +identifier {ident_start}{ident_cont}* + %% BASE_BACKUP { return K_BASE_BACKUP; } @@ -74,9 +89,16 @@ PROGRESS { return K_PROGRESS; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } +CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } +DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } +PHYSICAL { return K_PHYSICAL; } +SLOT { return K_SLOT; } + "," { return ','; } ";" { return ';'; } +"(" { return '('; } +")" { return ')'; } [\n] ; [\t] ; @@ -100,20 +122,49 @@ TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } BEGIN(xq); startlit(); } + <xq>{quotestop} { yyless(1); BEGIN(INITIAL); yylval.str = litbufdup(); return SCONST; } -<xq>{xqdouble} { + +<xq>{xqdouble} { addlitchar('\''); } + <xq>{xqinside} { addlit(yytext, yyleng); } -<xq><<EOF>> { yyerror("unterminated quoted string"); } +{xdstart} { + BEGIN(xd); + startlit(); + } + +<xd>{xdstop} { + int len; + yyless(1); + BEGIN(INITIAL); + yylval.str = litbufdup(); + len = strlen(yylval.str); + truncate_identifier(yylval.str, len, true); + return IDENT; + } + +<xd>{xdinside} { + addlit(yytext, yyleng); + } + +{identifier} { + int len = strlen(yytext); + + yylval.str = downcase_truncate_identifier(yytext, len, true); + return IDENT; + } + +<xq,xd><<EOF>> { yyerror("unterminated quoted string"); } <<EOF>> { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c new file mode 100644 index 00000000000..30aff5f5e36 --- /dev/null +++ b/src/backend/replication/slot.c @@ -0,0 +1,1066 @@ +/*------------------------------------------------------------------------- + * + * slot.c + * Replication slot management. + * + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/slot.c + * + * NOTES + * + * Replication slots are used to keep state about replication streams + * originating from this cluster. Their primary purpose is to prevent the + * premature removal of WAL or of old tuple versions in a manner that would + * interfere with replication; they also useful for monitoring purposes. + * Slots need to be permanent (to allow restarts), crash-safe, and allocatable + * on standbys (to support cascading setups). The requirement that slots be + * usable on standbys precludes storing them in the system catalogs. + * + * Each replication slot gets its own directory inside the $PGDATA/pg_replslot + * directory. Inside that directory the state file will contain the slot's + * own data. Additional data can be stored alongside that file if required. + * While the server is running, the state data is also cached in memory for + * efficiency. + * + * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate + * or free a slot. ReplicationSlotControlLock must be taken in shared mode + * to iterate over the slots, and in exclusive mode to change the in_use flag + * of a slot. The remaining data in each slot is protected by its mutex. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <unistd.h> +#include <sys/stat.h> + +#include "access/transam.h" +#include "miscadmin.h" +#include "replication/slot.h" +#include "storage/fd.h" +#include "storage/procarray.h" + +/* + * Replication slot on-disk data structure. + */ +typedef struct ReplicationSlotOnDisk +{ + /* first part of this struct needs to be version independent */ + + /* data not covered by checksum */ + uint32 magic; + pg_crc32 checksum; + + /* data covered by checksum */ + uint32 version; + uint32 length; + + ReplicationSlotPersistentData slotdata; +} ReplicationSlotOnDisk; + +/* size of the part of the slot that is version independent */ +#define ReplicationSlotOnDiskConstantSize \ + offsetof(ReplicationSlotOnDisk, slotdata) +/* size of the slots that is not version indepenent */ +#define ReplicationSlotOnDiskDynamicSize \ + sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize + +#define SLOT_MAGIC 0x1051CA1 /* format identifier */ +#define SLOT_VERSION 1 /* version for new files */ + +/* Control array for replication slot management */ +ReplicationSlotCtlData *ReplicationSlotCtl = NULL; + +/* My backend's replication slot in the shared memory array */ +ReplicationSlot *MyReplicationSlot = NULL; + +/* GUCs */ +int max_replication_slots = 0; /* the maximum number of replication slots */ + +/* internal persistency functions */ +static void RestoreSlotFromDisk(const char *name); +static void CreateSlotOnDisk(ReplicationSlot *slot); +static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel); + +/* + * Report shared-memory space needed by ReplicationSlotShmemInit. + */ +Size +ReplicationSlotsShmemSize(void) +{ + Size size = 0; + + if (max_replication_slots == 0) + return size; + + size = offsetof(ReplicationSlotCtlData, replication_slots); + size = add_size(size, + mul_size(max_replication_slots, sizeof(ReplicationSlot))); + + return size; +} + +/* + * Allocate and initialize walsender-related shared memory. + */ +void +ReplicationSlotsShmemInit(void) +{ + bool found; + + if (max_replication_slots == 0) + return; + + ReplicationSlotCtl = (ReplicationSlotCtlData *) + ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(), + &found); + + if (!found) + { + int i; + + /* First time through, so initialize */ + MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; + + /* everything else is zeroed by the memset above */ + SpinLockInit(&slot->mutex); + slot->io_in_progress_lock = LWLockAssign(); + } + } +} + +/* + * Check whether the passed slot name is valid and report errors at elevel. + * + * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow + * the name to be uses as a directory name on every supported OS. + * + * Returns whether the directory name is valid or not if elevel < ERROR. + */ +bool +ReplicationSlotValidateName(const char *name, int elevel) +{ + const char *cp; + + if (strlen(name) == 0) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_NAME), + errmsg("replication slot name \"%s\" is too short", + name))); + return false; + } + + if (strlen(name) >= NAMEDATALEN) + { + ereport(elevel, + (errcode(ERRCODE_NAME_TOO_LONG), + errmsg("replication slot name \"%s\" is too long", + name))); + return false; + } + + for (cp = name; *cp; cp++) + { + if (!((*cp >= 'a' && *cp <= 'z') + || (*cp >= '0' && *cp <= '9') + || (*cp == '_'))) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_NAME), + errmsg("replication slot name \"%s\" contains invalid character", + name), + errhint("Replication slot names may only contain letters, numbers and the underscore character."))); + return false; + } + } + return true; +} + +/* + * Create a new replication slot and mark it as used by this backend. + * + * name: Name of the slot + * db_specific: changeset extraction is db specific, if the slot is going to + * be used for that pass true, otherwise false. + */ +void +ReplicationSlotCreate(const char *name, bool db_specific) +{ + ReplicationSlot *slot = NULL; + int i; + + Assert(MyReplicationSlot == NULL); + + ReplicationSlotValidateName(name, ERROR); + + /* + * If some other backend ran this code currently with us, we'd likely + * both allocate the same slot, and that would be bad. We'd also be + * at risk of missing a name collision. Also, we don't want to try to + * create a new slot while somebody's busy cleaning up an old one, because + * we might both be monkeying with the same directory. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* + * Check for name collision, and identify an allocatable slot. We need + * to hold ReplicationSlotControlLock in shared mode for this, so that + * nobody else can change the in_use flags while we're looking at them. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication slot \"%s\" already exists", name))); + if (!s->in_use && slot == NULL) + slot = s; + } + LWLockRelease(ReplicationSlotControlLock); + + /* If all slots are in use, we're out of luck. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("all replication slots are in use"), + errhint("Free one or increase max_replication_slots."))); + + /* + * Since this slot is not in use, nobody should be looking at any + * part of it other than the in_use field unless they're trying to allocate + * it. And since we hold ReplicationSlotAllocationLock, nobody except us + * can be doing that. So it's safe to initialize the slot. + */ + Assert(!slot->in_use); + Assert(!slot->active); + slot->data.xmin = InvalidTransactionId; + slot->effective_xmin = InvalidTransactionId; + strncpy(NameStr(slot->data.name), name, NAMEDATALEN); + NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0'; + slot->data.database = db_specific ? MyDatabaseId : InvalidOid; + slot->data.restart_lsn = InvalidXLogRecPtr; + + /* + * Create the slot on disk. We haven't actually marked the slot allocated + * yet, so no special cleanup is required if this errors out. + */ + CreateSlotOnDisk(slot); + + /* + * We need to briefly prevent any other backend from iterating over the + * slots while we flip the in_use flag. We also need to set the active + * flag while holding the ControlLock as otherwise a concurrent + * SlotAcquire() could acquire the slot as well. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + + slot->in_use = true; + + /* We can now mark the slot active, and that makes it our slot. */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&slot->mutex); + Assert(!vslot->active); + vslot->active = true; + SpinLockRelease(&slot->mutex); + MyReplicationSlot = slot; + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Now that the slot has been marked as in_use and in_active, it's safe to + * let somebody else try to allocate a slot. + */ + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Find an previously created slot and mark it as used by this backend. + */ +void +ReplicationSlotAcquire(const char *name) +{ + ReplicationSlot *slot = NULL; + int i; + bool active = false; + + Assert(MyReplicationSlot == NULL); + + ReplicationSlotValidateName(name, ERROR); + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&s->mutex); + slot = s; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* If we did not find the slot or it was already active, error out. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", name))); + if (active) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is already active", name))); + + /* We made this slot active, so it's ours now. */ + MyReplicationSlot = slot; +} + +/* + * Release a replication slot, this or another backend can ReAcquire it + * later. Resources this slot requires will be preserved. + */ +void +ReplicationSlotRelease(void) +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL && slot->active); + + /* Mark slot inactive. We're not freeing it, just disconnecting. */ + { + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + MyReplicationSlot = NULL; + } +} + +/* + * Permanently drop replication slot identified by the passed in name. + */ +void +ReplicationSlotDrop(const char *name) +{ + ReplicationSlot *slot = NULL; + int i; + bool active; + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + + ReplicationSlotValidateName(name, ERROR); + + /* + * If some other backend ran this code currently with us, we might both + * try to free the same slot at the same time. Or we might try to delete + * a slot with a certain name while someone else was trying to create a + * slot with the same name. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&s->mutex); + slot = s; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* If we did not find the slot or it was already active, error out. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", name))); + if (active) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is already active", name))); + + /* Generate pathnames. */ + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + + /* + * Rename the slot directory on disk, so that we'll no longer recognize + * this as a valid slot. Note that if this fails, we've got to mark the + * slot inactive again before bailing out. + */ + if (rename(path, tmppath) != 0) + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename \"%s\" to \"%s\": %m", + path, tmppath))); + } + + /* + * We need to fsync() the directory we just renamed and its parent to make + * sure that our changes are on disk in a crash-safe fashion. If fsync() + * fails, we can't be sure whether the changes are on disk or not. For + * now, we handle that by panicking; StartupReplicationSlots() will + * try to straighten it out after restart. + */ + START_CRIT_SECTION(); + fsync_fname(tmppath, true); + fsync_fname("pg_replslot", true); + END_CRIT_SECTION(); + + /* + * The slot is definitely gone. Lock out concurrent scans of the array + * long enough to kill it. It's OK to clear the active flag here without + * grabbing the mutex because nobody else can be scanning the array here, + * and nobody can be attached to this slot and thus access it without + * scanning the array. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + slot->active = false; + slot->in_use = false; + LWLockRelease(ReplicationSlotControlLock); + + /* + * Slot is dead and doesn't prevent resource removal anymore, recompute + * limits. + */ + ReplicationSlotsComputeRequiredXmin(); + ReplicationSlotsComputeRequiredLSN(); + + /* + * If removing the directory fails, the worst thing that will happen is + * that the user won't be able to create a new slot with the same name + * until the next server restart. We warn about it, but that's all. + */ + if (!rmtree(tmppath, true)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\"", tmppath))); + + /* + * We release this at the very end, so that nobody starts trying to create + * a slot while we're still cleaning up the detritus of the old one. + */ + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Serialize the currently acquired slot's state from memory to disk, thereby + * guaranteeing the current state will survive a crash. + */ +void +ReplicationSlotSave(void) +{ + char path[MAXPGPATH]; + + Assert(MyReplicationSlot != NULL); + + sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); + SaveSlotToPath(MyReplicationSlot, path, ERROR); +} + +/* + * Signal that it would be useful if the currently acquired slot would be + * flushed out to disk. + * + * Note that the actual flush to disk can be delayed for a long time, if + * required for correctness explicitly do a ReplicationSlotSave(). + */ +void +ReplicationSlotMarkDirty(void) +{ + Assert(MyReplicationSlot != NULL); + + { + volatile ReplicationSlot *vslot = MyReplicationSlot; + + SpinLockAcquire(&vslot->mutex); + MyReplicationSlot->just_dirtied = true; + MyReplicationSlot->dirty = true; + SpinLockRelease(&vslot->mutex); + } +} + +/* + * Compute the oldest xmin across all slots and store it in the ProcArray. + */ +void +ReplicationSlotsComputeRequiredXmin(void) +{ + int i; + TransactionId agg_xmin = InvalidTransactionId; + + Assert(ReplicationSlotCtl != NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + TransactionId effective_xmin; + + if (!s->in_use) + continue; + + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + effective_xmin = vslot->effective_xmin; + SpinLockRelease(&s->mutex); + } + + /* check the data xmin */ + if (TransactionIdIsValid(effective_xmin) && + (!TransactionIdIsValid(agg_xmin) || + TransactionIdPrecedes(effective_xmin, agg_xmin))) + agg_xmin = effective_xmin; + } + LWLockRelease(ReplicationSlotControlLock); + + ProcArraySetReplicationSlotXmin(agg_xmin); +} + +/* + * Compute the oldest restart LSN across all slots and inform xlog module. + */ +void +ReplicationSlotsComputeRequiredLSN(void) +{ + int i; + XLogRecPtr min_required = InvalidXLogRecPtr; + + Assert(ReplicationSlotCtl != NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn; + + if (!s->in_use) + continue; + + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + restart_lsn = vslot->data.restart_lsn; + SpinLockRelease(&s->mutex); + } + + if (restart_lsn != InvalidXLogRecPtr && + (min_required == InvalidXLogRecPtr || + restart_lsn < min_required)) + min_required = restart_lsn; + } + LWLockRelease(ReplicationSlotControlLock); + + XLogSetReplicationSlotMinimumLSN(min_required); +} + +/* + * Check whether the server's configuration supports using replication + * slots. + */ +void +CheckSlotRequirements(void) +{ + if (max_replication_slots == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("replication slots can only be used if max_replication_slots > 0")))); + + if (wal_level < WAL_LEVEL_ARCHIVE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slots can only be used if wal_level >= archive"))); +} + +/* + * Returns whether the string `str' has the postfix `end'. + */ +static bool +string_endswith(const char *str, const char *end) +{ + size_t slen = strlen(str); + size_t elen = strlen(end); + + /* can't be a postfix if longer */ + if (elen > slen) + return false; + + /* compare the end of the strings */ + str += slen - elen; + return strcmp(str, end) == 0; +} + +/* + * Flush all replication slots to disk. + * + * This needn't actually be part of a checkpoint, but it's a convenient + * location. + */ +void +CheckPointReplicationSlots(void) +{ + int i; + + ereport(DEBUG1, + (errmsg("performing replication slot checkpoint"))); + + /* + * Prevent any slot from being created/dropped while we're active. As we + * explicitly do *not* want to block iterating over replication_slots or + * acquiring a slot we cannot take the control lock - but that's OK, + * because holding ReplicationSlotAllocationLock is strictly stronger, + * and enough to guarantee that nobody can change the in_use bits on us. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + char path[MAXPGPATH]; + + if (!s->in_use) + continue; + + /* save the slot to disk, locking is handled in SaveSlotToPath() */ + sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); + SaveSlotToPath(s, path, LOG); + } + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Load all replication slots from disk into memory at server startup. This + * needs to be run before we start crash recovery. + */ +void +StartupReplicationSlots(XLogRecPtr checkPointRedo) +{ + DIR *replication_dir; + struct dirent *replication_de; + + ereport(DEBUG1, + (errmsg("starting up replication slots"))); + + /* restore all slots by iterating over all on-disk entries */ + replication_dir = AllocateDir("pg_replslot"); + while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL) + { + struct stat statbuf; + char path[MAXPGPATH]; + + if (strcmp(replication_de->d_name, ".") == 0 || + strcmp(replication_de->d_name, "..") == 0) + continue; + + snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name); + + /* we're only creating directories here, skip if it's not our's */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + continue; + + /* we crashed while a slot was being setup or deleted, clean up */ + if (string_endswith(replication_de->d_name, ".tmp")) + { + if (!rmtree(path, true)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\"", path))); + continue; + } + fsync_fname("pg_replslot", true); + continue; + } + + /* looks like a slot in a normal state, restore */ + RestoreSlotFromDisk(replication_de->d_name); + } + FreeDir(replication_dir); + + /* currently no slots exist, we're done. */ + if (max_replication_slots <= 0) + return; + + /* Now that we have recovered all the data, compute replication xmin */ + ReplicationSlotsComputeRequiredXmin(); + ReplicationSlotsComputeRequiredLSN(); +} + +/* ---- + * Manipulation of ondisk state of replication slots + * + * NB: none of the routines below should take any notice whether a slot is the + * current one or not, that's all handled a layer above. + * ---- + */ +static void +CreateSlotOnDisk(ReplicationSlot *slot) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + struct stat st; + + /* + * No need to take out the io_in_progress_lock, nobody else can see this + * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which + * takes out the lock, if we'd take the lock here, we'd deadlock. + */ + + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + + /* + * It's just barely possible that some previous effort to create or + * drop a slot with this name left a temp directory lying around. + * If that seems to be the case, try to remove it. If the rmtree() + * fails, we'll error out at the mkdir() below, so we don't bother + * checking success. + */ + if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode)) + rmtree(tmppath, true); + + /* Create and fsync the temporary slot directory. */ + if (mkdir(tmppath, S_IRWXU) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + tmppath))); + fsync_fname(tmppath, true); + + /* Write the actual state file. */ + slot->dirty = true; /* signal that we really need to write */ + SaveSlotToPath(slot, tmppath, ERROR); + + /* Rename the directory into place. */ + if (rename(tmppath, path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + tmppath, path))); + + /* + * If we'd now fail - really unlikely - we wouldn't know wether this slot + * would persist after an OS crash or not - so, force a restart. The + * restart would try to fysnc this again till it works. + */ + START_CRIT_SECTION(); + + fsync_fname(path, true); + fsync_fname("pg_replslot", true); + + END_CRIT_SECTION(); +} + +/* + * Shared functionality between saving and creating a replication slot. + */ +static void +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + int fd; + ReplicationSlotOnDisk cp; + bool was_dirty; + + /* first check whether there's something to write out */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&vslot->mutex); + was_dirty = vslot->dirty; + vslot->just_dirtied = false; + SpinLockRelease(&vslot->mutex); + } + + /* and don't do anything if there's nothing to write */ + if (!was_dirty) + return; + + LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE); + + /* silence valgrind :( */ + memset(&cp, 0, sizeof(ReplicationSlotOnDisk)); + + sprintf(tmppath, "%s/state.tmp", dir); + sprintf(path, "%s/state", dir); + + fd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + tmppath))); + return; + } + + cp.magic = SLOT_MAGIC; + INIT_CRC32(cp.checksum); + cp.version = 1; + cp.length = ReplicationSlotOnDiskDynamicSize; + + SpinLockAcquire(&slot->mutex); + + memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData)); + + SpinLockRelease(&slot->mutex); + + COMP_CRC32(cp.checksum, + (char *)(&cp) + ReplicationSlotOnDiskConstantSize, + ReplicationSlotOnDiskDynamicSize); + + if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) + { + int save_errno = errno; + CloseTransientFile(fd); + errno = save_errno; + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + tmppath))); + return; + } + + /* fsync the temporary file */ + if (pg_fsync(fd) != 0) + { + int save_errno = errno; + CloseTransientFile(fd); + errno = save_errno; + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + tmppath))); + return; + } + + CloseTransientFile(fd); + + /* rename to permanent file, fsync file and directory */ + if (rename(tmppath, path) != 0) + { + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not rename \"%s\" to \"%s\": %m", + tmppath, path))); + return; + } + + /* Check CreateSlot() for the reasoning of using a crit. section. */ + START_CRIT_SECTION(); + + fsync_fname(path, false); + fsync_fname((char *) dir, true); + fsync_fname("pg_replslot", true); + + END_CRIT_SECTION(); + + /* + * Successfully wrote, unset dirty bit, unless somebody dirtied again + * already. + */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&vslot->mutex); + if (!vslot->just_dirtied) + vslot->dirty = false; + SpinLockRelease(&vslot->mutex); + } + + LWLockRelease(slot->io_in_progress_lock); +} + +/* + * Load a single slot from disk into memory. + */ +static void +RestoreSlotFromDisk(const char *name) +{ + ReplicationSlotOnDisk cp; + int i; + char path[MAXPGPATH]; + int fd; + bool restored = false; + int readBytes; + pg_crc32 checksum; + + /* no need to lock here, no concurrent access allowed yet */ + + /* delete temp file if it exists */ + sprintf(path, "pg_replslot/%s/state.tmp", name); + if (unlink(path) < 0 && errno != ENOENT) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not unlink file \"%s\": %m", path))); + + sprintf(path, "pg_replslot/%s/state", name); + + elog(DEBUG1, "restoring replication slot from \"%s\"", path); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + + /* + * We do not need to handle this as we are rename()ing the directory into + * place only after we fsync()ed the state file. + */ + if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* + * Sync state file before we're reading from it. We might have crashed + * while it wasn't synced yet and we shouldn't continue on that basis. + */ + if (pg_fsync(fd) != 0) + { + CloseTransientFile(fd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + path))); + } + + /* Also sync the parent directory */ + START_CRIT_SECTION(); + fsync_fname(path, true); + END_CRIT_SECTION(); + + /* read part of statefile that's guaranteed to be version independent */ + readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize); + if (readBytes != ReplicationSlotOnDiskConstantSize) + { + int saved_errno = errno; + + CloseTransientFile(fd); + errno = saved_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\", read %d of %u: %m", + path, readBytes, + (uint32) ReplicationSlotOnDiskConstantSize))); + } + + /* verify magic */ + if (cp.magic != SLOT_MAGIC) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has wrong magic %u instead of %u", + path, cp.magic, SLOT_MAGIC))); + + /* verify version */ + if (cp.version != SLOT_VERSION) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has unsupported version %u", + path, cp.version))); + + /* boundary check on length */ + if (cp.length != ReplicationSlotOnDiskDynamicSize) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has corrupted length %u", + path, cp.length))); + + /* Now that we know the size, read the entire file */ + readBytes = read(fd, + (char *)&cp + ReplicationSlotOnDiskConstantSize, + cp.length); + if (readBytes != cp.length) + { + int saved_errno = errno; + + CloseTransientFile(fd); + errno = saved_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\", read %d of %u: %m", + path, readBytes, cp.length))); + } + + CloseTransientFile(fd); + + /* now verify the CRC32 */ + INIT_CRC32(checksum); + COMP_CRC32(checksum, + (char *)&cp + ReplicationSlotOnDiskConstantSize, + ReplicationSlotOnDiskDynamicSize); + + if (!EQ_CRC32(checksum, cp.checksum)) + ereport(PANIC, + (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u", + path, checksum, cp.checksum))); + + /* nothing can be active yet, don't lock anything */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + if (slot->in_use) + continue; + + /* restore the entire set of persistent data */ + memcpy(&slot->data, &cp.slotdata, + sizeof(ReplicationSlotPersistentData)); + + /* initialize in memory state */ + slot->effective_xmin = cp.slotdata.xmin; + slot->in_use = true; + slot->active = false; + + restored = true; + break; + } + + if (!restored) + ereport(PANIC, + (errmsg("too many replication slots active before shutdown"), + errhint("Increase max_replication_slots and try again."))); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c new file mode 100644 index 00000000000..98a860e5288 --- /dev/null +++ b/src/backend/replication/slotfuncs.c @@ -0,0 +1,193 @@ +/*------------------------------------------------------------------------- + * + * slotfuncs.c + * Support functions for replication slots + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/slotfuncs.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "access/htup_details.h" +#include "utils/builtins.h" +#include "replication/slot.h" + +Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); +Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); + +static void +check_permissions(void) +{ + if (!superuser() && !has_rolreplication(GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or replication role to use replication slots")))); +} + +/* + * SQL function for creating a new physical (streaming replication) + * replication slot. + */ +Datum +pg_create_physical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Datum values[2]; + bool nulls[2]; + TupleDesc tupdesc; + HeapTuple tuple; + Datum result; + + check_permissions(); + + CheckSlotRequirements(); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* acquire replication slot, this will check for conflicting names*/ + ReplicationSlotCreate(NameStr(*name), false); + + values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); + + nulls[0] = false; + nulls[1] = true; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + ReplicationSlotRelease(); + + PG_RETURN_DATUM(result); +} + +/* + * SQL function for dropping a replication slot. + */ +Datum +pg_drop_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + + check_permissions(); + + CheckSlotRequirements(); + + ReplicationSlotDrop(NameStr(*name)); + + PG_RETURN_VOID(); +} + +/* + * pg_get_replication_slots - SQL SRF showing active replication slots. + */ +Datum +pg_get_replication_slots(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int slotno; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* + * We don't require any special permission to see this function's data + * because nothing should be sensitive. The most critical being the slot + * name, which shouldn't contain anything particularly sensitive. + */ + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS]; + bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS]; + + TransactionId xmin; + XLogRecPtr restart_lsn; + bool active; + Oid database; + const char *slot_name; + + char restart_lsn_s[MAXFNAMELEN]; + int i; + + SpinLockAcquire(&slot->mutex); + if (!slot->in_use) + { + SpinLockRelease(&slot->mutex); + continue; + } + else + { + xmin = slot->data.xmin; + database = slot->data.database; + restart_lsn = slot->data.restart_lsn; + slot_name = pstrdup(NameStr(slot->data.name)); + + active = slot->active; + } + SpinLockRelease(&slot->mutex); + + memset(nulls, 0, sizeof(nulls)); + + snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X", + (uint32) (restart_lsn >> 32), (uint32) restart_lsn); + + i = 0; + values[i++] = CStringGetTextDatum(slot_name); + if (database == InvalidOid) + values[i++] = CStringGetTextDatum("physical"); + else + values[i++] = CStringGetTextDatum("logical"); + values[i++] = database; + values[i++] = BoolGetDatum(active); + if (xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(xmin); + else + nulls[i++] = true; + if (restart_lsn != InvalidTransactionId) + values[i++] = CStringGetTextDatum(restart_lsn_s); + else + nulls[i++] = true; + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 1fbd33ef61b..cc3d7753074 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -187,6 +187,7 @@ void WalReceiverMain(void) { char conninfo[MAXCONNINFO]; + char slotname[NAMEDATALEN]; XLogRecPtr startpoint; TimeLineID startpointTLI; TimeLineID primaryTLI; @@ -241,6 +242,7 @@ WalReceiverMain(void) /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); startpoint = walrcv->receiveStart; startpointTLI = walrcv->receiveStartTLI; @@ -355,7 +357,8 @@ WalReceiverMain(void) * on the new timeline. */ ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(startpointTLI, startpoint)) + if (walrcv_startstreaming(startpointTLI, startpoint, + slotname[0] != '\0' ? slotname : NULL)) { bool endofwal = false; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index cc96d7c2f8a..acadec57f5a 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -219,11 +219,13 @@ ShutdownWalRcv(void) /* * Request postmaster to start walreceiver. * - * recptr indicates the position where streaming should begin, and conninfo - * is a libpq connection string to use. + * recptr indicates the position where streaming should begin, conninfo + * is a libpq connection string to use, and slotname is, optionally, the name + * of a replication slot to acquire. */ void -RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) +RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, + const char *slotname) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -250,6 +252,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) else walrcv->conninfo[0] = '\0'; + if (slotname != NULL) + strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN); + else + walrcv->slotname[0] = '\0'; + if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 652487e3de7..119a920af21 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -53,6 +53,7 @@ #include "miscadmin.h" #include "nodes/replnodes.h" #include "replication/basebackup.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -218,12 +219,17 @@ InitWalSender(void) void WalSndErrorCleanup() { + LWLockReleaseAll(); + if (sendFile >= 0) { close(sendFile); sendFile = -1; } + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + replication_active = false; if (walsender_ready_to_stop) proc_exit(0); @@ -421,6 +427,15 @@ StartReplication(StartReplicationCmd *cmd) * written at wal_level='minimal'. */ + if (cmd->slotname) + { + ReplicationSlotAcquire(cmd->slotname); + if (MyReplicationSlot->data.database != InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("cannot use a replication slot created for changeset extraction for streaming replication")))); + } + /* * Select the timeline. If it was given explicitly by the client, use * that. Otherwise use the timeline of the last replayed record, which is @@ -565,6 +580,9 @@ StartReplication(StartReplicationCmd *cmd) Assert(streamingDoneSending && streamingDoneReceiving); } + if (cmd->slotname) + ReplicationSlotRelease(); + /* * Copy is finished now. Send a single-row result set indicating the next * timeline. @@ -623,6 +641,75 @@ StartReplication(StartReplicationCmd *cmd) } /* + * Create a new replication slot. + */ +static void +CreateReplicationSlot(CreateReplicationSlotCmd *cmd) +{ + const char *slot_name; + StringInfoData buf; + + Assert(!MyReplicationSlot); + + /* setup state for XLogReadPage */ + sendTimeLineIsHistoric = false; + sendTimeLine = ThisTimeLineID; + + ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL); + + initStringInfo(&output_message); + + slot_name = NameStr(MyReplicationSlot->data.name); + + /* + * It may seem somewhat pointless to send back the same slot name the + * client just requested and nothing else, but logical replication + * will add more fields here. (We could consider removing the slot + * name from what's sent back, though, since the client has specified + * that.) + */ + + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* first field: slot name */ + pq_sendstring(&buf, "slot_name"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* # of columns */ + + /* slot_name */ + pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */ + pq_sendbytes(&buf, slot_name, strlen(slot_name)); + + pq_endmessage(&buf); + + /* + * release active status again, START_REPLICATION will reacquire it + */ + ReplicationSlotRelease(); +} + +/* + * Get rid of a replication slot that is no longer wanted. + */ +static void +DropReplicationSlot(DropReplicationSlotCmd *cmd) +{ + ReplicationSlotDrop(cmd->slotname); + EndCommand("DROP_REPLICATION_SLOT", DestRemote); +} + +/* * Execute an incoming replication command. */ void @@ -660,14 +747,28 @@ exec_replication_command(const char *cmd_string) IdentifySystem(); break; - case T_StartReplicationCmd: - StartReplication((StartReplicationCmd *) cmd_node); - break; - case T_BaseBackupCmd: SendBaseBackup((BaseBackupCmd *) cmd_node); break; + case T_CreateReplicationSlotCmd: + CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + break; + + case T_DropReplicationSlotCmd: + DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + break; + + case T_StartReplicationCmd: + { + StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) + StartReplication(cmd); + else + elog(ERROR, "cannot handle changeset extraction yet"); + break; + } + case T_TimeLineHistoryCmd: SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; @@ -831,6 +932,39 @@ ProcessStandbyMessage(void) } /* + * Remember that a walreceiver just confirmed receipt of lsn `lsn`. + */ +static void +PhysicalConfirmReceivedLocation(XLogRecPtr lsn) +{ + bool changed = false; + /* use volatile pointer to prevent code rearrangement */ + volatile ReplicationSlot *slot = MyReplicationSlot; + + Assert(lsn != InvalidXLogRecPtr); + SpinLockAcquire(&slot->mutex); + if (slot->data.restart_lsn != lsn) + { + changed = true; + slot->data.restart_lsn = lsn; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredLSN(); + } + + /* + * One could argue that the slot should saved to disk now, but that'd be + * energy wasted - the worst lost information can do here is give us wrong + * information in a statistics view - we'll just potentially be more + * conservative in removing files. + */ +} + +/* * Regular reply from standby advising of WAL positions on standby server. */ static void @@ -875,6 +1009,48 @@ ProcessStandbyReplyMessage(void) if (!am_cascading_walsender) SyncRepReleaseWaiters(); + + /* + * Advance our local xmin horizon when the client confirmed a flush. + */ + if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) + { + if (MyReplicationSlot->data.database != InvalidOid) + elog(ERROR, "cannot handle changeset extraction yet"); + else + PhysicalConfirmReceivedLocation(flushPtr); + } +} + +/* compute new replication slot xmin horizon if needed */ +static void +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) +{ + bool changed = false; + volatile ReplicationSlot *slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + MyPgXact->xmin = InvalidTransactionId; + /* + * For physical replication we don't need the the interlock provided + * by xmin and effective_xmin since the consequences of a missed increase + * are limited to query cancellations, so set both at once. + */ + if (!TransactionIdIsNormal(slot->data.xmin) || + !TransactionIdIsNormal(feedbackXmin) || + TransactionIdPrecedes(slot->data.xmin, feedbackXmin)) + { + changed = true; + slot->data.xmin = feedbackXmin; + slot->effective_xmin = feedbackXmin; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(); + } } /* @@ -904,6 +1080,8 @@ ProcessStandbyHSFeedbackMessage(void) if (!TransactionIdIsNormal(feedbackXmin)) { MyPgXact->xmin = InvalidTransactionId; + if (MyReplicationSlot != NULL) + PhysicalReplicationSlotNewXmin(feedbackXmin); return; } @@ -951,8 +1129,17 @@ ProcessStandbyHSFeedbackMessage(void) * GetOldestXmin. (If we're moving our xmin forward, this is obviously * safe, and if we're moving it backwards, well, the data is at risk * already since a VACUUM could have just finished calling GetOldestXmin.) + * + * If we're using a replication slot we reserve the xmin via that, + * otherwise via the walsender's PGXACT entry. + + * XXX: It might make sense to introduce ephemeral slots and always use + * the slot mechanism. */ - MyPgXact->xmin = feedbackXmin; + if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ + PhysicalReplicationSlotNewXmin(feedbackXmin); + else + MyPgXact->xmin = feedbackXmin; } /* Main loop of walsender process that streams the WAL over Copy messages. */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2e717457b12..c392d4fa228 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -27,6 +27,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, ProcSignalShmemSize()); size = add_size(size, CheckpointerShmemSize()); size = add_size(size, AutoVacuumShmemSize()); + size = add_size(size, ReplicationSlotsShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); @@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ProcSignalShmemInit(); CheckpointerShmemInit(); AutoVacuumShmemInit(); + ReplicationSlotsShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index b68c95612c5..082115b4fff 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -82,6 +82,9 @@ typedef struct ProcArrayStruct */ TransactionId lastOverflowedXid; + /* oldest xmin of any replication slot */ + TransactionId replication_slot_xmin; + /* * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, * but actually it is maxProcs entries long. @@ -228,6 +231,7 @@ CreateSharedProcArray(void) */ procArray->numProcs = 0; procArray->maxProcs = PROCARRAY_MAXPROCS; + procArray->replication_slot_xmin = InvalidTransactionId; procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS; procArray->numKnownAssignedXids = 0; procArray->tailKnownAssignedXids = 0; @@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) ProcArrayStruct *arrayP = procArray; TransactionId result; int index; + volatile TransactionId replication_slot_xmin = InvalidTransactionId; /* Cannot look for individual databases during recovery */ Assert(allDbs || !RecoveryInProgress()); @@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) } } + /* fetch into volatile var while ProcArrayLock is held */ + replication_slot_xmin = procArray->replication_slot_xmin; + if (RecoveryInProgress()) { /* @@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) result = FirstNormalTransactionId; } + /* + * Check whether there are replication slots requiring an older xmin. + */ + if (TransactionIdIsValid(replication_slot_xmin) && + NormalTransactionIdPrecedes(replication_slot_xmin, result)) + result = replication_slot_xmin; + return result; } @@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot) int count = 0; int subcount = 0; bool suboverflowed = false; + volatile TransactionId replication_slot_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot) suboverflowed = true; } + + /* fetch into volatile var while ProcArrayLock is held */ + replication_slot_xmin = procArray->replication_slot_xmin; + if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); /* @@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot) RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age; if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + + /* Check whether there's a replication slot requiring an older xmin. */ + if (TransactionIdIsValid(replication_slot_xmin) && + NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = replication_slot_xmin; + RecentXmin = xmin; snapshot->xmin = xmin; @@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) return true; /* timed out, still conflicts */ } +/* + * ProcArraySetReplicationSlotXmin + * + * Install limits to future computations of the xmin horizon to prevent vacuum + * and HOT pruning from removing affected rows still needed by clients with + * replicaton slots. + */ +void +ProcArraySetReplicationSlotXmin(TransactionId xmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + procArray->replication_slot_xmin = xmin; + LWLockRelease(ProcArrayLock); +} + #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 55d9d7837ca..82ef4409494 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -27,6 +27,7 @@ #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" +#include "replication/slot.h" #include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" @@ -238,6 +239,9 @@ NumLWLocks(void) /* predicate.c needs one per old serializable xid buffer */ numLocks += NUM_OLDSERXID_BUFFERS; + /* slot.c needs one for each slot */ + numLocks += max_replication_slots; + /* * Add any requested by loadable modules; for backwards-compatibility * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 9d32f9405d5..fb449a88204 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -40,6 +40,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -780,6 +781,10 @@ ProcKill(int code, Datum arg) /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); + /* Make sure active replication slots are released */ + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + #ifdef USE_ASSERT_CHECKING if (assert_enabled) { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a9b9794965b..70d73d9898e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -57,6 +57,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -2124,6 +2125,17 @@ static struct config_int ConfigureNamesInt[] = }, { + /* see max_connections */ + {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING, + gettext_noop("Sets the maximum number of simultaneously defined replication slots."), + NULL + }, + &max_replication_slots, + 0, 0, MAX_BACKENDS /* XXX?*/, + NULL, NULL, NULL + }, + + { {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c8673b382da..d10e8a5783a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -226,6 +226,9 @@ #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables +#max_replication_slots = 0 # max number of replication slots. + # (change requires restart) + # - Master Server - # These settings are ignored on a standby server. |