diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-04-03 14:04:59 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-04-03 14:04:59 +0530 |
commit | 2ec005b4e29740f0d36e6646d149af192328b2ff (patch) | |
tree | 666945f7acefb7bf88adb1a84ef22ce368581ae6 /src/backend/replication/logical | |
parent | e37662f22158c29bc55eda4eda1757f444cf701a (diff) | |
download | postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.tar.gz postgresql-2ec005b4e29740f0d36e6646d149af192328b2ff.zip |
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the
standby. Now, it is possible that at some particular restart_lsn there are
some running xacts, which means if we start reading the WAL from that
location after promotion, we won't reach a consistent snapshot state at
that point. However, on the primary, we would have already been in a
consistent snapshot state at that restart_lsn so we would have just
serialized the existing snapshot.
To avoid this problem we will use the advance_slot functionality unless
the snapshot already exists at the synced restart_lsn location. This will
help us to ensure that snapbuilder/slot statuses are updated properly
without generating any changes. Note that the synced slot will remain as
RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a
consistent snapshot state after which they will be marked as
RS_PERSISTENT.
Per buildfarm
Author: Hou Zhijie
Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/logical.c | 147 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 133 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 23 |
3 files changed, 262 insertions, 41 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c01..97a4d99c4e7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use physical replication slot for logical decoding"))); - if (slot->data.database != MyDatabaseId) + /* + * We need to access the system tables during decoding to build the + * logical changes unless we are in fast_forward mode where no changes are + * generated. + */ + if (slot->data.database != MyDatabaseId && !fast_forward) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); /* - * Do not allow consumption of a "synchronized" slot until the standby - * gets promoted. + * The slots being synced from the primary can't be used for decoding as + * they are used after failover. However, we do allow advancing the LSNs + * during the synchronization of slots. See update_local_synced_slot. */ - if (RecoveryInProgress() && slot->data.synced) + if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots()) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use replication slot \"%s\" for logical decoding", @@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) return has_pending_wal; } + +/* + * Helper function for advancing our logical replication slot forward. + * + * The slot's restart_lsn is used as start point for reading records, while + * confirmed_flush is used as base point for the decoding context. + * + * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, + * because we need to digest WAL to advance restart_lsn allowing to recycle + * WAL and removal of old catalog tuples. As decoding is done in fast_forward + * mode, no changes are generated anyway. + * + * *found_consistent_snapshot will be true if the initial decoding snapshot has + * been built; Otherwise, it will be false. + */ +XLogRecPtr +LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, + bool *found_consistent_snapshot) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn; + + Assert(moveto != InvalidXLogRecPtr); + + if (found_consistent_snapshot) + *found_consistent_snapshot = false; + + PG_TRY(); + { + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as InvalidXLogRecPtr, so that we start processing from my slot's + * confirmed_flush. + */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, /* fast_forward */ + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + + /* + * Start reading at the slot's restart_lsn, which we know to point to + * a valid record. + */ + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode records until we reach the requested target */ + while (ctx->reader->EndRecPtr < moveto) + { + char *errm = NULL; + XLogRecord *record; + + /* + * Read records. No changes are generated in fast_forward mode, + * but snapbuilder/slot statuses are updated properly. + */ + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "could not find record while advancing replication slot: %s", + errm); + + /* + * Process the record. Storage-level changes are ignored in + * fast_forward mode, but other modules (such as snapbuilder) + * might still have critical updates to do. + */ + if (record) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + CHECK_FOR_INTERRUPTS(); + } + + if (found_consistent_snapshot && DecodingContextReady(ctx)) + *found_consistent_snapshot = true; + + /* + * Logical decoding could have clobbered CurrentResourceOwner during + * transaction management, so restore the executor's value. (This is + * a kluge, but it's not worth cleaning up right now.) + */ + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush LSN has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it is written out at the next checkpoint. The + * LSN position advanced to may still be lost on a crash but this + * makes the data consistent after a clean shutdown. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c55..9ac847b7806 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -25,6 +25,15 @@ * which slot sync worker can perform the sync periodically or user can call * pg_sync_replication_slots() periodically to perform the syncs. * + * If synchronized slots fail to build a consistent snapshot from the + * restart_lsn before reaching confirmed_flush_lsn, they would become + * unreliable after promotion due to potential data loss from changes + * before reaching a consistent point. This can happen because the slots can + * be synced at some random time and we may not reach the consistent point + * at the same WAL location as the primary. So, we mark such slots as + * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a + * consistent point, they will be marked as RS_PERSISTENT. + * * The slot sync worker waits for some time before the next synchronization, * with the duration varying based on whether any slots were updated during * the last cycle. Refer to the comments above wait_for_slot_activity() for @@ -49,8 +58,9 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" -#include "replication/slot.h" +#include "replication/logical.h" #include "replication/slotsync.h" +#include "replication/snapbuild.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg); * * If no update was needed (the data of the remote slot is the same as the * local slot) return false, otherwise true. + * + * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is + * modified, and decoding from the corresponding LSN's can reach a + * consistent snapshot. */ static bool -update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *found_consistent_snapshot) { ReplicationSlot *slot = MyReplicationSlot; - bool xmin_changed; - bool restart_lsn_changed; - NameData plugin_name; + bool slot_updated = false; Assert(slot->data.invalidated == RS_INVAL_NONE); - xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin); - restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn); + if (found_consistent_snapshot) + *found_consistent_snapshot = false; - if (!xmin_changed && - !restart_lsn_changed && - remote_dbid == slot->data.database && - remote_slot->two_phase == slot->data.two_phase && - remote_slot->failover == slot->data.failover && - remote_slot->confirmed_lsn == slot->data.confirmed_flush && - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0) - return false; + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || + remote_slot->restart_lsn != slot->data.restart_lsn || + remote_slot->catalog_xmin != slot->data.catalog_xmin) + { + /* + * We can't directly copy the remote slot's LSN or xmin unless there + * exists a consistent snapshot at that point. Otherwise, after + * promotion, the slots may not reach a consistent point before the + * confirmed_flush_lsn which can lead to a data loss. To avoid data + * loss, we let slot machinery advance the slot which ensures that + * snapbuilder/slot statuses are updated properly. + */ + if (SnapBuildSnapshotExists(remote_slot->restart_lsn)) + { + /* + * Update the slot info directly if there is a serialized snapshot + * at the restart_lsn, as the slot can quickly reach consistency + * at restart_lsn by restoring the snapshot. + */ + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = remote_slot->restart_lsn; + slot->data.confirmed_flush = remote_slot->confirmed_lsn; + slot->data.catalog_xmin = remote_slot->catalog_xmin; + slot->effective_catalog_xmin = remote_slot->catalog_xmin; + SpinLockRelease(&slot->mutex); - /* Avoid expensive operations while holding a spinlock. */ - namestrcpy(&plugin_name, remote_slot->plugin); - - SpinLockAcquire(&slot->mutex); - slot->data.plugin = plugin_name; - slot->data.database = remote_dbid; - slot->data.two_phase = remote_slot->two_phase; - slot->data.failover = remote_slot->failover; - slot->data.restart_lsn = remote_slot->restart_lsn; - slot->data.confirmed_flush = remote_slot->confirmed_lsn; - slot->data.catalog_xmin = remote_slot->catalog_xmin; - slot->effective_catalog_xmin = remote_slot->catalog_xmin; - SpinLockRelease(&slot->mutex); - - if (xmin_changed) - ReplicationSlotsComputeRequiredXmin(false); + if (found_consistent_snapshot) + *found_consistent_snapshot = true; + } + else + { + LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn, + found_consistent_snapshot); + } - if (restart_lsn_changed) + ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); - return true; + slot_updated = true; + } + + if (remote_dbid != slot->data.database || + remote_slot->two_phase != slot->data.two_phase || + remote_slot->failover != slot->data.failover || + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0) + { + NameData plugin_name; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.plugin = plugin_name; + slot->data.database = remote_dbid; + slot->data.two_phase = remote_slot->two_phase; + slot->data.failover = remote_slot->failover; + SpinLockRelease(&slot->mutex); + + slot_updated = true; + } + + return slot_updated; } /* @@ -413,6 +458,7 @@ static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; + bool found_consistent_snapshot = false; /* * Check if the primary server has caught up. Refer to the comment atop @@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - /* First time slot update, the function must return true */ - if (!update_local_synced_slot(remote_slot, remote_dbid)) - elog(ERROR, "failed to update slot"); + (void) update_local_synced_slot(remote_slot, remote_dbid, + &found_consistent_snapshot); + + /* + * Don't persist the slot if it cannot reach the consistent point from the + * restart_lsn. See comments atop this file. + */ + if (!found_consistent_snapshot) + { + ereport(LOG, + errmsg("could not sync slot \"%s\"", remote_slot->name), + errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.", + LSN_FORMAT_ARGS(slot->data.restart_lsn))); + + return false; + } ReplicationSlotPersist(); @@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) LSN_FORMAT_ARGS(remote_slot->restart_lsn)); /* Make sure the slot changes persist across server restart */ - if (update_local_synced_slot(remote_slot, remote_dbid)) + if (update_local_synced_slot(remote_slot, remote_dbid, NULL)) { ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ac24b518603..e37e22f4417 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * Check if a logical snapshot at the specified point has been serialized. + */ +bool +SnapBuildSnapshotExists(XLogRecPtr lsn) +{ + char path[MAXPGPATH]; + int ret; + struct stat stat_buf; + + sprintf(path, "pg_logical/snapshots/%X-%X.snap", + LSN_FORMAT_ARGS(lsn)); + + ret = stat(path, &stat_buf); + + if (ret != 0 && errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + + return ret == 0; +} |