aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-04-03 14:04:59 +0530
committerAmit Kapila <akapila@postgresql.org>2024-04-03 14:04:59 +0530
commit2ec005b4e29740f0d36e6646d149af192328b2ff (patch)
tree666945f7acefb7bf88adb1a84ef22ce368581ae6 /src/backend/replication/logical
parente37662f22158c29bc55eda4eda1757f444cf701a (diff)
downloadpostgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.tar.gz
postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.zip
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the standby. Now, it is possible that at some particular restart_lsn there are some running xacts, which means if we start reading the WAL from that location after promotion, we won't reach a consistent snapshot state at that point. However, on the primary, we would have already been in a consistent snapshot state at that restart_lsn so we would have just serialized the existing snapshot. To avoid this problem we will use the advance_slot functionality unless the snapshot already exists at the synced restart_lsn location. This will help us to ensure that snapbuilder/slot statuses are updated properly without generating any changes. Note that the synced slot will remain as RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a consistent snapshot state after which they will be marked as RS_PERSISTENT. Per buildfarm Author: Hou Zhijie Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/logical.c147
-rw-r--r--src/backend/replication/logical/slotsync.c133
-rw-r--r--src/backend/replication/logical/snapbuild.c23
3 files changed, 262 insertions, 41 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c01..97a4d99c4e7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
#include "replication/snapbuild.h"
#include "storage/proc.h"
#include "storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use physical replication slot for logical decoding")));
- if (slot->data.database != MyDatabaseId)
+ /*
+ * We need to access the system tables during decoding to build the
+ * logical changes unless we are in fast_forward mode where no changes are
+ * generated.
+ */
+ if (slot->data.database != MyDatabaseId && !fast_forward)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot \"%s\" was not created in this database",
NameStr(slot->data.name))));
/*
- * Do not allow consumption of a "synchronized" slot until the standby
- * gets promoted.
+ * The slots being synced from the primary can't be used for decoding as
+ * they are used after failover. However, we do allow advancing the LSNs
+ * during the synchronization of slots. See update_local_synced_slot.
*/
- if (RecoveryInProgress() && slot->data.synced)
+ if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
return has_pending_wal;
}
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *found_consistent_snapshot will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
+ bool *found_consistent_snapshot)
+{
+ LogicalDecodingContext *ctx;
+ ResourceOwner old_resowner = CurrentResourceOwner;
+ XLogRecPtr retlsn;
+
+ Assert(moveto != InvalidXLogRecPtr);
+
+ if (found_consistent_snapshot)
+ *found_consistent_snapshot = false;
+
+ PG_TRY();
+ {
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
+ * confirmed_flush.
+ */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true, /* fast_forward */
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
+
+ /*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to moveto lsn.
+ */
+ WaitForStandbyConfirmation(moveto);
+
+ /*
+ * Start reading at the slot's restart_lsn, which we know to point to
+ * a valid record.
+ */
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+ /* invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Decode records until we reach the requested target */
+ while (ctx->reader->EndRecPtr < moveto)
+ {
+ char *errm = NULL;
+ XLogRecord *record;
+
+ /*
+ * Read records. No changes are generated in fast_forward mode,
+ * but snapbuilder/slot statuses are updated properly.
+ */
+ record = XLogReadRecord(ctx->reader, &errm);
+ if (errm)
+ elog(ERROR, "could not find record while advancing replication slot: %s",
+ errm);
+
+ /*
+ * Process the record. Storage-level changes are ignored in
+ * fast_forward mode, but other modules (such as snapbuilder)
+ * might still have critical updates to do.
+ */
+ if (record)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (found_consistent_snapshot && DecodingContextReady(ctx))
+ *found_consistent_snapshot = true;
+
+ /*
+ * Logical decoding could have clobbered CurrentResourceOwner during
+ * transaction management, so restore the executor's value. (This is
+ * a kluge, but it's not worth cleaning up right now.)
+ */
+ CurrentResourceOwner = old_resowner;
+
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ {
+ LogicalConfirmReceivedLocation(moveto);
+
+ /*
+ * If only the confirmed_flush LSN has changed the slot won't get
+ * marked as dirty by the above. Callers on the walsender
+ * interface are expected to keep track of their own progress and
+ * don't need it written out. But SQL-interface users cannot
+ * specify their own start positions and it's harder for them to
+ * keep track of their progress, so we should make more of an
+ * effort to save it for them.
+ *
+ * Dirty the slot so it is written out at the next checkpoint. The
+ * LSN position advanced to may still be lost on a crash but this
+ * makes the data consistent after a clean shutdown.
+ */
+ ReplicationSlotMarkDirty();
+ }
+
+ retlsn = MyReplicationSlot->data.confirmed_flush;
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return retlsn;
+}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c55..9ac847b7806 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,15 @@
* which slot sync worker can perform the sync periodically or user can call
* pg_sync_replication_slots() periodically to perform the syncs.
*
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
+ * unreliable after promotion due to potential data loss from changes
+ * before reaching a consistent point. This can happen because the slots can
+ * be synced at some random time and we may not reach the consistent point
+ * at the same WAL location as the primary. So, we mark such slots as
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
+ * consistent point, they will be marked as RS_PERSISTENT.
+ *
* The slot sync worker waits for some time before the next synchronization,
* with the duration varying based on whether any slots were updated during
* the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +58,9 @@
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
-#include "replication/slot.h"
+#include "replication/logical.h"
#include "replication/slotsync.h"
+#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
*
* If no update was needed (the data of the remote slot is the same as the
* local slot) return false, otherwise true.
+ *
+ * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
+ * modified, and decoding from the corresponding LSN's can reach a
+ * consistent snapshot.
*/
static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+ bool *found_consistent_snapshot)
{
ReplicationSlot *slot = MyReplicationSlot;
- bool xmin_changed;
- bool restart_lsn_changed;
- NameData plugin_name;
+ bool slot_updated = false;
Assert(slot->data.invalidated == RS_INVAL_NONE);
- xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
- restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+ if (found_consistent_snapshot)
+ *found_consistent_snapshot = false;
- if (!xmin_changed &&
- !restart_lsn_changed &&
- remote_dbid == slot->data.database &&
- remote_slot->two_phase == slot->data.two_phase &&
- remote_slot->failover == slot->data.failover &&
- remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
- strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
- return false;
+ if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
+ remote_slot->restart_lsn != slot->data.restart_lsn ||
+ remote_slot->catalog_xmin != slot->data.catalog_xmin)
+ {
+ /*
+ * We can't directly copy the remote slot's LSN or xmin unless there
+ * exists a consistent snapshot at that point. Otherwise, after
+ * promotion, the slots may not reach a consistent point before the
+ * confirmed_flush_lsn which can lead to a data loss. To avoid data
+ * loss, we let slot machinery advance the slot which ensures that
+ * snapbuilder/slot statuses are updated properly.
+ */
+ if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
+ {
+ /*
+ * Update the slot info directly if there is a serialized snapshot
+ * at the restart_lsn, as the slot can quickly reach consistency
+ * at restart_lsn by restoring the snapshot.
+ */
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = remote_slot->restart_lsn;
+ slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+ slot->data.catalog_xmin = remote_slot->catalog_xmin;
+ slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+ SpinLockRelease(&slot->mutex);
- /* Avoid expensive operations while holding a spinlock. */
- namestrcpy(&plugin_name, remote_slot->plugin);
-
- SpinLockAcquire(&slot->mutex);
- slot->data.plugin = plugin_name;
- slot->data.database = remote_dbid;
- slot->data.two_phase = remote_slot->two_phase;
- slot->data.failover = remote_slot->failover;
- slot->data.restart_lsn = remote_slot->restart_lsn;
- slot->data.confirmed_flush = remote_slot->confirmed_lsn;
- slot->data.catalog_xmin = remote_slot->catalog_xmin;
- slot->effective_catalog_xmin = remote_slot->catalog_xmin;
- SpinLockRelease(&slot->mutex);
-
- if (xmin_changed)
- ReplicationSlotsComputeRequiredXmin(false);
+ if (found_consistent_snapshot)
+ *found_consistent_snapshot = true;
+ }
+ else
+ {
+ LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
+ found_consistent_snapshot);
+ }
- if (restart_lsn_changed)
+ ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
- return true;
+ slot_updated = true;
+ }
+
+ if (remote_dbid != slot->data.database ||
+ remote_slot->two_phase != slot->data.two_phase ||
+ remote_slot->failover != slot->data.failover ||
+ strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+ {
+ NameData plugin_name;
+
+ /* Avoid expensive operations while holding a spinlock. */
+ namestrcpy(&plugin_name, remote_slot->plugin);
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.plugin = plugin_name;
+ slot->data.database = remote_dbid;
+ slot->data.two_phase = remote_slot->two_phase;
+ slot->data.failover = remote_slot->failover;
+ SpinLockRelease(&slot->mutex);
+
+ slot_updated = true;
+ }
+
+ return slot_updated;
}
/*
@@ -413,6 +458,7 @@ static bool
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
+ bool found_consistent_snapshot = false;
/*
* Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
return false;
}
- /* First time slot update, the function must return true */
- if (!update_local_synced_slot(remote_slot, remote_dbid))
- elog(ERROR, "failed to update slot");
+ (void) update_local_synced_slot(remote_slot, remote_dbid,
+ &found_consistent_snapshot);
+
+ /*
+ * Don't persist the slot if it cannot reach the consistent point from the
+ * restart_lsn. See comments atop this file.
+ */
+ if (!found_consistent_snapshot)
+ {
+ ereport(LOG,
+ errmsg("could not sync slot \"%s\"", remote_slot->name),
+ errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
+ LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+
+ return false;
+ }
ReplicationSlotPersist();
@@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
/* Make sure the slot changes persist across server restart */
- if (update_local_synced_slot(remote_slot, remote_dbid))
+ if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ac24b518603..e37e22f4417 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
}
FreeDir(snap_dir);
}
+
+/*
+ * Check if a logical snapshot at the specified point has been serialized.
+ */
+bool
+SnapBuildSnapshotExists(XLogRecPtr lsn)
+{
+ char path[MAXPGPATH];
+ int ret;
+ struct stat stat_buf;
+
+ sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+ LSN_FORMAT_ARGS(lsn));
+
+ ret = stat(path, &stat_buf);
+
+ if (ret != 0 && errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", path)));
+
+ return ret == 0;
+}