aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/slotsync.c163
1 files changed, 119 insertions, 44 deletions
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 97440cb6bf0..bda0de52db9 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
* modified, and decoding from the corresponding LSN's can reach a
* consistent snapshot.
+ *
+ * *remote_slot_precedes will be true if the remote slot's LSN or xmin
+ * precedes locally reserved position.
*/
static bool
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
- bool *found_consistent_snapshot)
+ bool *found_consistent_snapshot,
+ bool *remote_slot_precedes)
{
ReplicationSlot *slot = MyReplicationSlot;
- bool slot_updated = false;
+ bool updated_xmin_or_lsn = false;
+ bool updated_config = false;
Assert(slot->data.invalidated == RS_INVAL_NONE);
if (found_consistent_snapshot)
*found_consistent_snapshot = false;
- if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
- remote_slot->restart_lsn != slot->data.restart_lsn ||
- remote_slot->catalog_xmin != slot->data.catalog_xmin)
+ if (remote_slot_precedes)
+ *remote_slot_precedes = false;
+
+ /*
+ * Don't overwrite if we already have a newer catalog_xmin and
+ * restart_lsn.
+ */
+ if (remote_slot->restart_lsn < slot->data.restart_lsn ||
+ TransactionIdPrecedes(remote_slot->catalog_xmin,
+ slot->data.catalog_xmin))
+ {
+ /*
+ * This can happen in following situations:
+ *
+ * If the slot is temporary, it means either the initial WAL location
+ * reserved for the local slot is ahead of the remote slot's
+ * restart_lsn or the initial xmin_horizon computed for the local slot
+ * is ahead of the remote slot.
+ *
+ * If the slot is persistent, restart_lsn of the synced slot could
+ * still be ahead of the remote slot. Since we use slot advance
+ * functionality to keep snapbuild/slot updated, it is possible that
+ * the restart_lsn is advanced to a later position than it has on the
+ * primary. This can happen when slot advancing machinery finds
+ * running xacts record after reaching the consistent state at a later
+ * point than the primary where it serializes the snapshot and updates
+ * the restart_lsn.
+ *
+ * We LOG the message if the slot is temporary as it can help the user
+ * to understand why the slot is not sync-ready. In the case of a
+ * persistent slot, it would be a more common case and won't directly
+ * impact the users, so we used DEBUG1 level to log the message.
+ */
+ ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
+ errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
+ remote_slot->name),
+ errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(slot->data.restart_lsn),
+ slot->data.catalog_xmin));
+
+ if (remote_slot_precedes)
+ *remote_slot_precedes = true;
+ }
+
+ /*
+ * Attempt to sync LSNs and xmins only if remote slot is ahead of local
+ * slot.
+ */
+ else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
+ remote_slot->restart_lsn > slot->data.restart_lsn ||
+ TransactionIdFollows(remote_slot->catalog_xmin,
+ slot->data.catalog_xmin))
{
/*
* We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.restart_lsn = remote_slot->restart_lsn;
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
slot->data.catalog_xmin = remote_slot->catalog_xmin;
- slot->effective_catalog_xmin = remote_slot->catalog_xmin;
SpinLockRelease(&slot->mutex);
if (found_consistent_snapshot)
@@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
{
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
found_consistent_snapshot);
- }
- ReplicationSlotsComputeRequiredXmin(false);
- ReplicationSlotsComputeRequiredLSN();
+ /* Sanity check */
+ if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
+ ereport(ERROR,
+ errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
+ remote_slot->name),
+ errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+ }
- slot_updated = true;
+ updated_xmin_or_lsn = true;
}
if (remote_dbid != slot->data.database ||
@@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.failover = remote_slot->failover;
SpinLockRelease(&slot->mutex);
- slot_updated = true;
+ updated_config = true;
}
- return slot_updated;
+ /*
+ * We have to write the changed xmin to disk *before* we change the
+ * in-memory value, otherwise after a crash we wouldn't know that some
+ * catalog tuples might have been removed already.
+ */
+ if (updated_config || updated_xmin_or_lsn)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }
+
+ /*
+ * Now the new xmin is safely on disk, we can let the global value
+ * advance. We do not take ProcArrayLock or similar since we only advance
+ * xmin here and there's not much harm done by a concurrent computation
+ * missing that.
+ */
+ if (updated_xmin_or_lsn)
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+ SpinLockRelease(&slot->mutex);
+
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ }
+
+ return updated_config || updated_xmin_or_lsn;
}
/*
@@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
bool found_consistent_snapshot = false;
+ bool remote_slot_precedes = false;
+
+ (void) update_local_synced_slot(remote_slot, remote_dbid,
+ &found_consistent_snapshot,
+ &remote_slot_precedes);
/*
* Check if the primary server has caught up. Refer to the comment atop
* the file for details on this check.
*/
- if (remote_slot->restart_lsn < slot->data.restart_lsn ||
- TransactionIdPrecedes(remote_slot->catalog_xmin,
- slot->data.catalog_xmin))
+ if (remote_slot_precedes)
{
/*
* The remote slot didn't catch up to locally reserved position.
@@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* current location when recreating the slot in the next cycle. It may
* take more time to create such a slot. Therefore, we keep this slot
* and attempt the synchronization in the next cycle.
- *
- * XXX should this be changed to elog(DEBUG1) perhaps?
*/
- ereport(LOG,
- errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
- remote_slot->name),
- errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
- LSN_FORMAT_ARGS(remote_slot->restart_lsn),
- remote_slot->catalog_xmin,
- LSN_FORMAT_ARGS(slot->data.restart_lsn),
- slot->data.catalog_xmin));
return false;
}
- (void) update_local_synced_slot(remote_slot, remote_dbid,
- &found_consistent_snapshot);
-
/*
* Don't persist the slot if it cannot reach the consistent point from the
* restart_lsn. See comments atop this file.
@@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/*
* Sanity check: As long as the invalidations are handled
* appropriately as above, this should never happen.
+ *
+ * We don't need to check restart_lsn here. See the comments in
+ * update_local_synced_slot() for details.
*/
- if (remote_slot->restart_lsn < slot->data.restart_lsn)
- elog(ERROR,
- "cannot synchronize local slot \"%s\" LSN(%X/%X)"
- " to remote slot's LSN(%X/%X) as synchronization"
- " would move it backwards", remote_slot->name,
- LSN_FORMAT_ARGS(slot->data.restart_lsn),
- LSN_FORMAT_ARGS(remote_slot->restart_lsn));
-
- /* Make sure the slot changes persist across server restart */
- if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
- {
- ReplicationSlotMarkDirty();
- ReplicationSlotSave();
-
- slot_updated = true;
- }
+ if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
+ ereport(ERROR,
+ errmsg_internal("cannot synchronize local slot \"%s\"",
+ remote_slot->name),
+ errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+ LSN_FORMAT_ARGS(slot->data.confirmed_flush),
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+
+ slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
+ NULL, NULL);
}
}
/* Otherwise create the slot first. */