aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/origin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/origin.c')
-rw-r--r--src/backend/replication/logical/origin.c66
1 files changed, 31 insertions, 35 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c3c1d7a2a51..6583dd497da 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -90,6 +90,7 @@
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
@@ -99,6 +100,9 @@
#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
+/* GUC variables */
+int max_active_replication_origins = 10;
+
/*
* Replay progress of a single remote node.
*/
@@ -151,7 +155,7 @@ typedef struct ReplicationStateCtl
{
/* Tranche to use for per-origin LWLocks */
int tranche_id;
- /* Array of length max_replication_slots */
+ /* Array of length max_active_replication_origins */
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
} ReplicationStateCtl;
@@ -162,10 +166,7 @@ TimestampTz replorigin_session_origin_timestamp = 0;
/*
* Base address into a shared memory array of replication states of size
- * max_replication_slots.
- *
- * XXX: Should we use a separate variable to size this rather than
- * max_replication_slots?
+ * max_active_replication_origins.
*/
static ReplicationState *replication_states;
@@ -186,12 +187,12 @@ static ReplicationState *session_replication_state = NULL;
#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
static void
-replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
+replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
{
- if (check_slots && max_replication_slots == 0)
+ if (check_origins && max_active_replication_origins == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
+ errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
if (!recoveryOK && RecoveryInProgress())
ereport(ERROR,
@@ -352,7 +353,7 @@ replorigin_state_clear(RepOriginId roident, bool nowait)
restart:
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *state = &replication_states[i];
@@ -511,18 +512,13 @@ ReplicationOriginShmemSize(void)
{
Size size = 0;
- /*
- * XXX: max_replication_slots is arguably the wrong thing to use, as here
- * we keep the replay state of *remote* transactions. But for now it seems
- * sufficient to reuse it, rather than introduce a separate GUC.
- */
- if (max_replication_slots == 0)
+ if (max_active_replication_origins == 0)
return size;
size = add_size(size, offsetof(ReplicationStateCtl, states));
size = add_size(size,
- mul_size(max_replication_slots, sizeof(ReplicationState)));
+ mul_size(max_active_replication_origins, sizeof(ReplicationState)));
return size;
}
@@ -531,7 +527,7 @@ ReplicationOriginShmemInit(void)
{
bool found;
- if (max_replication_slots == 0)
+ if (max_active_replication_origins == 0)
return;
replication_states_ctl = (ReplicationStateCtl *)
@@ -548,7 +544,7 @@ ReplicationOriginShmemInit(void)
replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
LWLockInitialize(&replication_states[i].lock,
replication_states_ctl->tranche_id);
@@ -570,7 +566,7 @@ ReplicationOriginShmemInit(void)
*
* So its just the magic, followed by the statically sized
* ReplicationStateOnDisk structs. Note that the maximum number of
- * ReplicationState is determined by max_replication_slots.
+ * ReplicationState is determined by max_active_replication_origins.
* ---------------------------------------------------------------------------
*/
void
@@ -583,7 +579,7 @@ CheckPointReplicationOrigin(void)
uint32 magic = REPLICATION_STATE_MAGIC;
pg_crc32c crc;
- if (max_replication_slots == 0)
+ if (max_active_replication_origins == 0)
return;
INIT_CRC32C(crc);
@@ -625,7 +621,7 @@ CheckPointReplicationOrigin(void)
LWLockAcquire(ReplicationOriginLock, LW_SHARED);
/* write actual data */
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationStateOnDisk disk_state;
ReplicationState *curstate = &replication_states[i];
@@ -718,7 +714,7 @@ StartupReplicationOrigin(void)
already_started = true;
#endif
- if (max_replication_slots == 0)
+ if (max_active_replication_origins == 0)
return;
INIT_CRC32C(crc);
@@ -728,8 +724,8 @@ StartupReplicationOrigin(void)
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
/*
- * might have had max_replication_slots == 0 last run, or we just brought
- * up a standby.
+ * might have had max_active_replication_origins == 0 last run, or we just
+ * brought up a standby.
*/
if (fd < 0 && errno == ENOENT)
return;
@@ -796,10 +792,10 @@ StartupReplicationOrigin(void)
COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
- if (last_state == max_replication_slots)
+ if (last_state == max_active_replication_origins)
ereport(PANIC,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("could not find free replication state, increase \"max_replication_slots\"")));
+ errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
/* copy data to shared memory */
replication_states[last_state].roident = disk_state.roident;
@@ -852,7 +848,7 @@ replorigin_redo(XLogReaderState *record)
xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *state = &replication_states[i];
@@ -917,7 +913,7 @@ replorigin_advance(RepOriginId node,
* Search for either an existing slot for the origin, or a free one we can
* use.
*/
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *curstate = &replication_states[i];
@@ -958,7 +954,7 @@ replorigin_advance(RepOriginId node,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("could not find free replication state slot for replication origin with ID %d",
node),
- errhint("Increase \"max_replication_slots\" and try again.")));
+ errhint("Increase \"max_active_replication_origins\" and try again.")));
if (replication_state == NULL)
{
@@ -1024,7 +1020,7 @@ replorigin_get_progress(RepOriginId node, bool flush)
/* prevent slots from being concurrently dropped */
LWLockAcquire(ReplicationOriginLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *state;
@@ -1110,7 +1106,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
registered_cleanup = true;
}
- Assert(max_replication_slots > 0);
+ Assert(max_active_replication_origins > 0);
if (session_replication_state != NULL)
ereport(ERROR,
@@ -1124,7 +1120,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
* Search for either an existing slot for the origin, or a free one we can
* use.
*/
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *curstate = &replication_states[i];
@@ -1159,7 +1155,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("could not find free replication state slot for replication origin with ID %d",
node),
- errhint("Increase \"max_replication_slots\" and try again.")));
+ errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
/* initialize new slot */
@@ -1195,7 +1191,7 @@ replorigin_session_reset(void)
{
ConditionVariable *cv;
- Assert(max_replication_slots != 0);
+ Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
ereport(ERROR,
@@ -1536,7 +1532,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
* filled. Note that we do not take any locks, so slightly corrupted/out
* of date values are a possibility.
*/
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_active_replication_origins; i++)
{
ReplicationState *state;
Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];