diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/slot.c | 28 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 8 |
2 files changed, 27 insertions, 9 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2293c0c6fc3..f969f7c083f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 2 /* version for new files */ +#define SLOT_VERSION 3 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) SpinLockAcquire(&s->mutex); effective_xmin = s->effective_xmin; effective_catalog_xmin = s->effective_catalog_xmin; - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && - XLogRecPtrIsInvalid(s->data.restart_lsn)); + invalidated = s->data.invalidated != RS_INVAL_NONE; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ @@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + bool invalidated; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; + invalidated = s->data.invalidated != RS_INVAL_NONE; SpinLockRelease(&s->mutex); + /* invalidated slots need not apply */ + if (invalidated) + continue; + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + bool invalidated; s = &ReplicationSlotCtl->replication_slots[i]; @@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; + invalidated = s->data.invalidated != RS_INVAL_NONE; SpinLockRelease(&s->mutex); + /* invalidated slots need not apply */ + if (invalidated) + continue; + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) if (s->data.database != dboid) continue; + /* NB: intentionally counting invalidated slots */ + /* count slots with spinlock held */ SpinLockAcquire(&s->mutex); (*nslots)++; @@ -1069,6 +1082,8 @@ restart: if (s->data.database != dboid) continue; + /* NB: intentionally including invalidated slots */ + /* acquire slot, so ReplicationSlotDropAcquired can be reused */ SpinLockAcquire(&s->mutex); /* can't change while ReplicationSlotControlLock is held */ @@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated_at = restart_lsn; + s->data.invalidated = RS_INVAL_WAL_REMOVED; + + /* + * XXX: We should consider not overwriting restart_lsn and instead + * just rely on .invalidated. + */ s->data.restart_lsn = InvalidXLogRecPtr; /* Let caller know */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c9648241..ad3e72be5ee 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) nulls[i++] = true; /* - * If invalidated_at is valid and restart_lsn is invalid, we know for - * certain that the slot has been invalidated. Otherwise, test - * availability from restart_lsn. + * If the slot has not been invalidated, test availability from + * restart_lsn. */ - if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) && - !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at)) + if (slot_contents.data.invalidated != RS_INVAL_NONE) walstate = WALAVAIL_REMOVED; else walstate = GetWALAvailability(slot_contents.data.restart_lsn); |