diff options
Diffstat (limited to 'src/backend/replication/slot.c')
-rw-r--r-- | src/backend/replication/slot.c | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..f9fec50ae88 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_saved_restart_lsn = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1835,7 +1879,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->last_saved_restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2032,6 +2079,7 @@ void CheckPointReplicationSlots(bool is_shutdown) { int i; + bool last_saved_restart_lsn_updated = false; elog(DEBUG1, "performing replication slot checkpoint"); @@ -2076,9 +2124,23 @@ CheckPointReplicationSlots(bool is_shutdown) SpinLockRelease(&s->mutex); } + /* + * Track if we're going to update slot's last_saved_restart_lsn. We + * need this to know if we need to recompute the required LSN. + */ + if (s->last_saved_restart_lsn != s->data.restart_lsn) + last_saved_restart_lsn_updated = true; + SaveSlotToPath(s, path, LOG); } LWLockRelease(ReplicationSlotAllocationLock); + + /* + * Recompute the required LSN if SaveSlotToPath() updated + * last_saved_restart_lsn for any slot. + */ + if (last_saved_restart_lsn_updated) + ReplicationSlotsComputeRequiredLSN(); } /* @@ -2354,6 +2416,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2569,6 +2632,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; |