diff options
Diffstat (limited to 'src/backend/replication/logical/origin.c')
-rw-r--r-- | src/backend/replication/logical/origin.c | 66 |
1 files changed, 31 insertions, 35 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index c3c1d7a2a51..6583dd497da 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -90,6 +90,7 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/guc.h" #include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -99,6 +100,9 @@ #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint" #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp" +/* GUC variables */ +int max_active_replication_origins = 10; + /* * Replay progress of a single remote node. */ @@ -151,7 +155,7 @@ typedef struct ReplicationStateCtl { /* Tranche to use for per-origin LWLocks */ int tranche_id; - /* Array of length max_replication_slots */ + /* Array of length max_active_replication_origins */ ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; @@ -162,10 +166,7 @@ TimestampTz replorigin_session_origin_timestamp = 0; /* * Base address into a shared memory array of replication states of size - * max_replication_slots. - * - * XXX: Should we use a separate variable to size this rather than - * max_replication_slots? + * max_active_replication_origins. */ static ReplicationState *replication_states; @@ -186,12 +187,12 @@ static ReplicationState *session_replication_state = NULL; #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) static void -replorigin_check_prerequisites(bool check_slots, bool recoveryOK) +replorigin_check_prerequisites(bool check_origins, bool recoveryOK) { - if (check_slots && max_replication_slots == 0) + if (check_origins && max_active_replication_origins == 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0"))); + errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0"))); if (!recoveryOK && RecoveryInProgress()) ereport(ERROR, @@ -352,7 +353,7 @@ replorigin_state_clear(RepOriginId roident, bool nowait) restart: LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state = &replication_states[i]; @@ -511,18 +512,13 @@ ReplicationOriginShmemSize(void) { Size size = 0; - /* - * XXX: max_replication_slots is arguably the wrong thing to use, as here - * we keep the replay state of *remote* transactions. But for now it seems - * sufficient to reuse it, rather than introduce a separate GUC. - */ - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return size; size = add_size(size, offsetof(ReplicationStateCtl, states)); size = add_size(size, - mul_size(max_replication_slots, sizeof(ReplicationState))); + mul_size(max_active_replication_origins, sizeof(ReplicationState))); return size; } @@ -531,7 +527,7 @@ ReplicationOriginShmemInit(void) { bool found; - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; replication_states_ctl = (ReplicationStateCtl *) @@ -548,7 +544,7 @@ ReplicationOriginShmemInit(void) replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE; - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); @@ -570,7 +566,7 @@ ReplicationOriginShmemInit(void) * * So its just the magic, followed by the statically sized * ReplicationStateOnDisk structs. Note that the maximum number of - * ReplicationState is determined by max_replication_slots. + * ReplicationState is determined by max_active_replication_origins. * --------------------------------------------------------------------------- */ void @@ -583,7 +579,7 @@ CheckPointReplicationOrigin(void) uint32 magic = REPLICATION_STATE_MAGIC; pg_crc32c crc; - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; INIT_CRC32C(crc); @@ -625,7 +621,7 @@ CheckPointReplicationOrigin(void) LWLockAcquire(ReplicationOriginLock, LW_SHARED); /* write actual data */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationStateOnDisk disk_state; ReplicationState *curstate = &replication_states[i]; @@ -718,7 +714,7 @@ StartupReplicationOrigin(void) already_started = true; #endif - if (max_replication_slots == 0) + if (max_active_replication_origins == 0) return; INIT_CRC32C(crc); @@ -728,8 +724,8 @@ StartupReplicationOrigin(void) fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); /* - * might have had max_replication_slots == 0 last run, or we just brought - * up a standby. + * might have had max_active_replication_origins == 0 last run, or we just + * brought up a standby. */ if (fd < 0 && errno == ENOENT) return; @@ -796,10 +792,10 @@ StartupReplicationOrigin(void) COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); - if (last_state == max_replication_slots) + if (last_state == max_active_replication_origins) ereport(PANIC, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("could not find free replication state, increase \"max_replication_slots\""))); + errmsg("could not find free replication state, increase \"max_active_replication_origins\""))); /* copy data to shared memory */ replication_states[last_state].roident = disk_state.roident; @@ -852,7 +848,7 @@ replorigin_redo(XLogReaderState *record) xlrec = (xl_replorigin_drop *) XLogRecGetData(record); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state = &replication_states[i]; @@ -917,7 +913,7 @@ replorigin_advance(RepOriginId node, * Search for either an existing slot for the origin, or a free one we can * use. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *curstate = &replication_states[i]; @@ -958,7 +954,7 @@ replorigin_advance(RepOriginId node, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with ID %d", node), - errhint("Increase \"max_replication_slots\" and try again."))); + errhint("Increase \"max_active_replication_origins\" and try again."))); if (replication_state == NULL) { @@ -1024,7 +1020,7 @@ replorigin_get_progress(RepOriginId node, bool flush) /* prevent slots from being concurrently dropped */ LWLockAcquire(ReplicationOriginLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state; @@ -1110,7 +1106,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) registered_cleanup = true; } - Assert(max_replication_slots > 0); + Assert(max_active_replication_origins > 0); if (session_replication_state != NULL) ereport(ERROR, @@ -1124,7 +1120,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) * Search for either an existing slot for the origin, or a free one we can * use. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *curstate = &replication_states[i]; @@ -1159,7 +1155,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by) (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with ID %d", node), - errhint("Increase \"max_replication_slots\" and try again."))); + errhint("Increase \"max_active_replication_origins\" and try again."))); else if (session_replication_state == NULL) { /* initialize new slot */ @@ -1195,7 +1191,7 @@ replorigin_session_reset(void) { ConditionVariable *cv; - Assert(max_replication_slots != 0); + Assert(max_active_replication_origins != 0); if (session_replication_state == NULL) ereport(ERROR, @@ -1536,7 +1532,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS) * filled. Note that we do not take any locks, so slightly corrupted/out * of date values are a possibility. */ - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_active_replication_origins; i++) { ReplicationState *state; Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; |