aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/conflict.c22
-rw-r--r--src/backend/replication/logical/launcher.c230
-rw-r--r--src/backend/replication/logical/reorderbuffer.c4
-rw-r--r--src/backend/replication/logical/slotsync.c8
-rw-r--r--src/backend/replication/logical/tablesync.c37
-rw-r--r--src/backend/replication/logical/worker.c783
7 files changed, 1060 insertions, 27 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..1fa931a7422 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -441,7 +441,8 @@ pa_launch_parallel_worker(void)
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
- dsm_segment_handle(winfo->dsm_seg));
+ dsm_segment_handle(winfo->dsm_seg),
+ false);
if (launched)
{
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 97c4e26b586..2fd3e8bbda5 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,6 +29,7 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+ [CT_UPDATE_DELETED] = "update_deleted",
[CT_DELETE_MISSING] = "delete_missing",
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
@@ -176,6 +177,7 @@ errcode_apply_conflict(ConflictType type)
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
case CT_DELETE_ORIGIN_DIFFERS:
+ case CT_UPDATE_DELETED:
case CT_DELETE_MISSING:
return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
}
@@ -261,6 +263,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
break;
+ case CT_UPDATE_DELETED:
+ if (localts)
+ {
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+ origin_name, localxmin, timestamptz_to_str(localts));
+
+ /* The origin that modified this row has been removed. */
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ }
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+ break;
+
case CT_UPDATE_MISSING:
appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
break;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..37377f7eb63 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -32,6 +32,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
-static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static bool acquire_conflict_slot_if_exists(void);
+static void advance_conflict_slot_xmin(TransactionId new_xmin);
/*
@@ -148,6 +151,7 @@ get_subscription_list(void)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid, dsm_handle subworker_dsm)
+ Oid relid, dsm_handle subworker_dsm,
+ bool retain_dead_tuples)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
+ * - The replication slot used in conflict detection is created when
+ * retain_dead_tuples is enabled
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert(!retain_dead_tuples || MyReplicationSlot);
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -454,6 +462,9 @@ retry:
worker->stream_fileset = NULL;
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
+ worker->oldest_nonremovable_xid = retain_dead_tuples
+ ? MyReplicationSlot->data.xmin
+ : InvalidTransactionId;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -779,6 +790,8 @@ logicalrep_worker_detach(void)
}
LWLockRelease(LogicalRepWorkerLock);
+
+ list_free(workers);
}
/* Block concurrent access. */
@@ -1118,7 +1131,10 @@ ApplyLauncherWakeupAtCommit(void)
on_commit_launcher_wakeup = true;
}
-static void
+/*
+ * Wakeup the launcher immediately.
+ */
+void
ApplyLauncherWakeup(void)
{
if (LogicalRepCtx->launcher_pid != 0)
@@ -1150,6 +1166,12 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+ /*
+ * Acquire the conflict detection slot at startup to ensure it can be
+ * dropped if no longer needed after a restart.
+ */
+ acquire_conflict_slot_if_exists();
+
/* Enter main loop */
for (;;)
{
@@ -1159,6 +1181,9 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ bool can_advance_xmin = true;
+ bool retain_dead_tuples = false;
+ TransactionId xmin = InvalidTransactionId;
CHECK_FOR_INTERRUPTS();
@@ -1168,7 +1193,14 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
- /* Start any missing workers for enabled subscriptions. */
+ /*
+ * Start any missing workers for enabled subscriptions.
+ *
+ * Also, during the iteration through all subscriptions, we compute
+ * the minimum XID required to protect deleted tuples for conflict
+ * detection if one of the subscription enables retain_dead_tuples
+ * option.
+ */
sublist = get_subscription_list();
foreach(lc, sublist)
{
@@ -1178,6 +1210,38 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long elapsed;
+ if (sub->retaindeadtuples)
+ {
+ retain_dead_tuples = true;
+
+ /*
+ * Can't advance xmin of the slot unless all the subscriptions
+ * with retain_dead_tuples are enabled. This is required to
+ * ensure that we don't advance the xmin of
+ * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
+ * enabled. Otherwise, we won't be able to detect conflicts
+ * reliably for such a subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_advance_xmin &= sub->enabled;
+
+ /*
+ * Create a replication slot to retain information necessary
+ * for conflict detection such as dead tuples, commit
+ * timestamps, and origins.
+ *
+ * The slot is created before starting the apply worker to
+ * prevent it from unnecessarily maintaining its
+ * oldest_nonremovable_xid.
+ *
+ * The slot is created even for a disabled subscription to
+ * ensure that conflict-related information is available when
+ * applying remote changes that occurred before the
+ * subscription was enabled.
+ */
+ CreateConflictDetectionSlot();
+ }
+
if (!sub->enabled)
continue;
@@ -1186,7 +1250,27 @@ ApplyLauncherMain(Datum main_arg)
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
- continue; /* worker is running already */
+ {
+ /*
+ * Compute the minimum xmin required to protect dead tuples
+ * required for conflict detection among all running apply
+ * workers that enables retain_dead_tuples.
+ */
+ if (sub->retaindeadtuples && can_advance_xmin)
+ compute_min_nonremovable_xid(w, &xmin);
+
+ /* worker is running already */
+ continue;
+ }
+
+ /*
+ * Can't advance xmin of the slot unless all the workers
+ * corresponding to subscriptions with retain_dead_tuples are
+ * running, disabling the further computation of the minimum
+ * nonremovable xid.
+ */
+ if (sub->retaindeadtuples)
+ can_advance_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1210,7 +1294,8 @@ ApplyLauncherMain(Datum main_arg)
if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
- DSM_HANDLE_INVALID))
+ DSM_HANDLE_INVALID,
+ sub->retaindeadtuples))
{
/*
* We get here either if we failed to launch a worker
@@ -1230,6 +1315,20 @@ ApplyLauncherMain(Datum main_arg)
}
}
+ /*
+ * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
+ * that requires us to retain dead tuples. Otherwise, if required,
+ * advance the slot's xmin to protect dead tuples required for the
+ * conflict detection.
+ */
+ if (MyReplicationSlot)
+ {
+ if (!retain_dead_tuples)
+ ReplicationSlotDropAcquired();
+ else if (can_advance_xmin)
+ advance_conflict_slot_xmin(xmin);
+ }
+
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
@@ -1258,6 +1357,125 @@ ApplyLauncherMain(Datum main_arg)
}
/*
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_dead_tuples enabled. Store the result
+ * in *xmin.
+ */
+static void
+compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+{
+ TransactionId nonremovable_xid;
+
+ Assert(worker != NULL);
+
+ /*
+ * The replication slot for conflict detection must be created before the
+ * worker starts.
+ */
+ Assert(MyReplicationSlot);
+
+ SpinLockAcquire(&worker->relmutex);
+ nonremovable_xid = worker->oldest_nonremovable_xid;
+ SpinLockRelease(&worker->relmutex);
+
+ Assert(TransactionIdIsValid(nonremovable_xid));
+
+ if (!TransactionIdIsValid(*xmin) ||
+ TransactionIdPrecedes(nonremovable_xid, *xmin))
+ *xmin = nonremovable_xid;
+}
+
+/*
+ * Acquire the replication slot used to retain information for conflict
+ * detection, if it exists.
+ *
+ * Return true if successfully acquired, otherwise return false.
+ */
+static bool
+acquire_conflict_slot_if_exists(void)
+{
+ if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
+ return false;
+
+ ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
+ return true;
+}
+
+/*
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
+ */
+static void
+advance_conflict_slot_xmin(TransactionId new_xmin)
+{
+ Assert(MyReplicationSlot);
+ Assert(TransactionIdIsValid(new_xmin));
+ Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+
+ /* Return if the xmin value of the slot cannot be advanced */
+ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
+ return;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = new_xmin;
+ MyReplicationSlot->data.xmin = new_xmin;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+
+ /*
+ * Like PhysicalConfirmReceivedLocation(), do not save slot information
+ * each time. This is acceptable because all concurrent transactions on
+ * the publisher that require the data preceding the slot's xmin should
+ * have already been applied and flushed on the subscriber before the xmin
+ * is advanced. So, even if the slot's xmin regresses after a restart, it
+ * will be advanced again in the next cycle. Therefore, no data required
+ * for conflict detection will be prematurely removed.
+ */
+ return;
+}
+
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ TransactionId xmin_horizon;
+
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ xmin_horizon = GetOldestSafeDecodingTransactionId(false);
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = xmin_horizon;
+ MyReplicationSlot->data.xmin = xmin_horizon;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotsComputeRequiredXmin(true);
+
+ LWLockRelease(ProcArrayLock);
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7b4e8629553..34cf05668ae 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2599,7 +2599,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (++changes_count >= CHANGES_THRESHOLD)
{
- rb->update_progress_txn(rb, txn, change->lsn);
+ rb->update_progress_txn(rb, txn, prev_lsn);
changes_count = 0;
}
}
@@ -4917,7 +4917,7 @@ StartupReorderBuffer(void)
continue;
/* if it cannot be a slot, skip the directory */
- if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+ if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
continue;
/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2f0c08b8fbd..37738440113 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1059,14 +1059,14 @@ ValidateSlotSyncParams(int elevel)
{
/*
* Logical slot sync/creation requires wal_level >= logical.
- *
- * Since altering the wal_level requires a server restart, so error out in
- * this case regardless of elevel provided by caller.
*/
if (wal_level < WAL_LEVEL_LOGICAL)
- ereport(ERROR,
+ {
+ ereport(elevel,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
+ return false;
+ }
/*
* A physical replication slot(primary_slot_name) is required on the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e4fd6347fd1..d3356bc84ee 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell *lc;
bool started_tx = false;
bool should_exit = false;
+ Relation rel = NULL;
Assert(!IsTransactionState());
@@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* worker to remove the origin tracking as if there is any
* error while dropping we won't restart it to drop the
* origin. So passing missing_ok = true.
+ *
+ * Lock the subscription and origin in the same order as we
+ * are doing during DDL commands to avoid deadlocks. See
+ * AlterSubscription_refresh.
*/
+ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+ 0, AccessShareLock);
+
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
rstate->relid,
originname,
@@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
- rstate->lsn);
+ rstate->lsn, true);
}
}
else
@@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won't
* be able to detect the waits on the latch.
+ *
+ * Also close any tables prior to the commit.
*/
+ if (rel)
+ {
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
CommitTransactionCommand();
pgstat_report_stat(false);
}
@@ -615,13 +634,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid,
- DSM_HANDLE_INVALID);
+ DSM_HANDLE_INVALID,
+ false);
}
}
}
}
}
+ /* Close table if opened */
+ if (rel)
+ table_close(rel, NoLock);
+
+
if (started_tx)
{
/*
@@ -1413,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();
pgstat_report_stat(true);
@@ -1546,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_FINISHEDCOPY,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c5fb627aa56..89e241c8392 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -132,6 +132,96 @@
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
+ *
+ * RETAIN DEAD TUPLES
+ * ----------------------
+ * Each apply worker that enabled retain_dead_tuples option maintains a
+ * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
+ * prevent dead rows from being removed prematurely when the apply worker still
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
+ * otherwise, vacuum freeze could remove the required information.
+ *
+ * The logical replication launcher manages an internal replication slot named
+ * "pg_conflict_detection". It asynchronously aggregates the non-removable
+ * transaction ID from all apply workers to determine the appropriate xmin for
+ * the slot, thereby retaining necessary tuples.
+ *
+ * The non-removable transaction ID in the apply worker is advanced to the
+ * oldest running transaction ID once all concurrent transactions on the
+ * publisher have been applied and flushed locally. The process involves:
+ *
+ * - RDT_GET_CANDIDATE_XID:
+ * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
+ * candidate xid.
+ *
+ * - RDT_REQUEST_PUBLISHER_STATUS:
+ * Send a message to the walsender requesting the publisher status, which
+ * includes the latest WAL write position and information about transactions
+ * that are in the commit phase.
+ *
+ * - RDT_WAIT_FOR_PUBLISHER_STATUS:
+ * Wait for the status from the walsender. After receiving the first status,
+ * do not proceed if there are concurrent remote transactions that are still
+ * in the commit phase. These transactions might have been assigned an
+ * earlier commit timestamp but have not yet written the commit WAL record.
+ * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
+ * until all these transactions have completed.
+ *
+ * - RDT_WAIT_FOR_LOCAL_FLUSH:
+ * Advance the non-removable transaction ID if the current flush location has
+ * reached or surpassed the last received WAL position.
+ *
+ * The overall state progression is: GET_CANDIDATE_XID ->
+ * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
+ * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
+ *
+ * Retaining the dead tuples for this period is sufficient for ensuring
+ * eventual consistency using last-update-wins strategy, as dead tuples are
+ * useful for detecting conflicts only during the application of concurrent
+ * transactions from remote nodes. After applying and flushing all remote
+ * transactions that occurred concurrently with the tuple DELETE, any
+ * subsequent UPDATE from a remote node should have a later timestamp. In such
+ * cases, it is acceptable to detect an update_missing scenario and convert the
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
+ *
+ * Note that advancing the non-removable transaction ID is not supported if the
+ * publisher is also a physical standby. This is because the logical walsender
+ * on the standby can only get the WAL replay position but there may be more
+ * WALs that are being replicated from the primary and those WALs could have
+ * earlier commit timestamp.
+ *
+ * Similarly, when the publisher has subscribed to another publisher,
+ * information necessary for conflict detection cannot be retained for
+ * changes from origins other than the publisher. This is because publisher
+ * lacks the information on concurrent transactions of other publishers to
+ * which it subscribes. As the information on concurrent transactions is
+ * unavailable beyond subscriber's immediate publishers, the non-removable
+ * transaction ID might be advanced prematurely before changes from other
+ * origins have been fully applied.
+ *
+ * XXX Retaining information for changes from other origins might be possible
+ * by requesting the subscription on that origin to enable retain_dead_tuples
+ * and fetching the conflict detection slot.xmin along with the publisher's
+ * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
+ * wait for the remote slot's xmin to reach the oldest active transaction ID,
+ * ensuring that all transactions from other origins have been applied on the
+ * publisher, thereby getting the latest WAL position that includes all
+ * concurrent changes. However, this approach may impact performance, so it
+ * might not worth the effort.
+ *
+ * XXX It seems feasible to get the latest commit's WAL location from the
+ * publisher and wait till that is applied. However, we can't do that
+ * because commit timestamps can regress as a commit with a later LSN is not
+ * guaranteed to have a later timestamp than those with earlier LSNs. Having
+ * said that, even if that is possible, it won't improve performance much as
+ * the apply always lag and moves slowly as compared with the transactions
+ * on the publisher.
*-------------------------------------------------------------------------
*/
@@ -140,6 +230,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
@@ -148,6 +239,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -166,12 +258,14 @@
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
@@ -268,6 +362,78 @@ typedef enum
TRANS_PARALLEL_APPLY,
} TransApplyAction;
+/*
+ * The phases involved in advancing the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details of the transition between these
+ * phases.
+ */
+typedef enum
+{
+ RDT_GET_CANDIDATE_XID,
+ RDT_REQUEST_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_LOCAL_FLUSH
+} RetainDeadTuplesPhase;
+
+/*
+ * Critical information for managing phase transitions within the
+ * RetainDeadTuplesPhase.
+ */
+typedef struct RetainDeadTuplesData
+{
+ RetainDeadTuplesPhase phase; /* current phase */
+ XLogRecPtr remote_lsn; /* WAL write position on the publisher */
+
+ /*
+ * Oldest transaction ID that was in the commit phase on the publisher.
+ * Use FullTransactionId to prevent issues with transaction ID wraparound,
+ * where a new remote_oldestxid could falsely appear to originate from the
+ * past and block advancement.
+ */
+ FullTransactionId remote_oldestxid;
+
+ /*
+ * Next transaction ID to be assigned on the publisher. Use
+ * FullTransactionId for consistency and to allow straightforward
+ * comparisons with remote_oldestxid.
+ */
+ FullTransactionId remote_nextxid;
+
+ TimestampTz reply_time; /* when the publisher responds with status */
+
+ /*
+ * Publisher transaction ID that must be awaited to complete before
+ * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
+ * FullTransactionId for the same reason as remote_nextxid.
+ */
+ FullTransactionId remote_wait_for;
+
+ TransactionId candidate_xid; /* candidate for the non-removable
+ * transaction ID */
+ TimestampTz flushpos_update_time; /* when the remote flush position was
+ * updated in final phase
+ * (RDT_WAIT_FOR_LOCAL_FLUSH) */
+
+ /*
+ * The following fields are used to determine the timing for the next
+ * round of transaction ID advancement.
+ */
+ TimestampTz last_recv_time; /* when the last message was received */
+ TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
+ int xid_advance_interval; /* how much time (ms) to wait before
+ * attempting to advance the
+ * non-removable transaction ID */
+} RetainDeadTuplesData;
+
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs. The maximum interval is a bit arbitrary but
+ * is sufficient to not cause any undue network traffic.
+ */
+#define MIN_XID_ADVANCE_INTERVAL 100
+#define MAX_XID_ADVANCE_INTERVAL 180000
+
/* errcontext tracker */
static ApplyErrorCallbackArg apply_error_callback_arg =
{
@@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
+/*
+ * The remote WAL position that has been applied and flushed locally. We record
+ * and use this information both while sending feedback to the server and
+ * advancing oldest_nonremovable_xid.
+ */
+static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -372,6 +545,19 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
+static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
+static void request_publisher_status(RetainDeadTuplesData *rdt_data);
+static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
+ bool new_xid_found);
+
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -390,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+ Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
@@ -2726,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
else
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or never existed
+ * is crucial to avoid misleading the user during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing except for
- * emitting a log message.
+ * The tuple to be updated could not be found or was deleted. Do
+ * nothing except for emitting a log message.
*/
- ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, newslot, list_make1(&conflicttuple));
+ ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2957,6 +3163,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
}
/*
+ * Determine whether the index can reliably locate the deleted tuple in the
+ * local relation.
+ *
+ * An index may exclude deleted tuples if it was re-indexed or re-created during
+ * change application. Therefore, an index is considered usable only if the
+ * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
+ * index tuple's xmin. This ensures that any tuples deleted prior to the index
+ * creation or re-indexing are not relevant for conflict detection in the
+ * current apply worker.
+ *
+ * Note that indexes may also be excluded if they were modified by other DDL
+ * operations, such as ALTER INDEX. However, this is acceptable, as the
+ * likelihood of such DDL changes coinciding with the need to scan dead
+ * tuples for the update_deleted is low.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
+ TransactionId conflict_detection_xmin)
+{
+ HeapTuple index_tuple;
+ TransactionId index_xmin;
+
+ index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+ if (!HeapTupleIsValid(index_tuple)) /* should not happen */
+ elog(ERROR, "cache lookup failed for index %u", localindexoid);
+
+ /*
+ * No need to check for a frozen transaction ID, as
+ * TransactionIdPrecedes() manages it internally, treating it as falling
+ * behind the conflict_detection_xmin.
+ */
+ index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
+
+ ReleaseSysCache(index_tuple);
+
+ return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Attempts to locate a deleted tuple in the local relation that matches the
+ * values of the tuple received from the publication side (in 'remoteslot').
+ * The search is performed using either the replica identity index, primary
+ * key, other available index, or a sequential scan if necessary.
+ *
+ * Returns true if the deleted tuple is found. If found, the transaction ID,
+ * origin, and commit timestamp of the deletion are stored in '*delete_xid',
+ * '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid, RepOriginId *delete_origin,
+ TimestampTz *delete_time)
+{
+ TransactionId oldestxmin;
+ ReplicationSlot *slot;
+
+ /*
+ * Return false if either dead tuples are not retained or commit timestamp
+ * data is not available.
+ */
+ if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
+ return false;
+
+ /*
+ * For conflict detection, we use the conflict slot's xmin value instead
+ * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
+ * a threshold to identify tuples that were recently deleted. These tuples
+ * are not visible to concurrent transactions, but we log an
+ * update_deleted conflict if such a tuple matches the remote update being
+ * applied.
+ *
+ * Although GetOldestNonRemovableTransactionId() can return a value older
+ * than the slot's xmin, for our current purpose it is acceptable to treat
+ * tuples deleted by transactions prior to slot.xmin as update_missing
+ * conflicts.
+ *
+ * Ideally, we would use oldest_nonremovable_xid, which is directly
+ * maintained by the leader apply worker. However, this value is not
+ * available to table synchronization or parallel apply workers, making
+ * slot.xmin a practical alternative in those contexts.
+ */
+ slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+ Assert(slot);
+
+ SpinLockAcquire(&slot->mutex);
+ oldestxmin = slot->data.xmin;
+ SpinLockRelease(&slot->mutex);
+
+ Assert(TransactionIdIsValid(oldestxmin));
+
+ if (OidIsValid(localidxoid) &&
+ IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
+ return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
+ remoteslot, oldestxmin,
+ delete_xid, delete_origin,
+ delete_time);
+ else
+ return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
+ oldestxmin, delete_xid,
+ delete_origin, delete_time);
+}
+
+/*
* This handles insert, update, delete on a partitioned table.
*/
static void
@@ -3074,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part, &localslot);
if (!found)
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or
+ * never existed is crucial to avoid misleading the user
+ * during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(partrel,
+ part_entry->localindexoid,
+ remoteslot_part,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing
- * except for emitting a log message.
+ * The tuple to be updated could not be found or was
+ * deleted. Do nothing except for emitting a log message.
*/
ReportApplyConflict(estate, partrelinfo, LOG,
- CT_UPDATE_MISSING, remoteslot_part,
- newslot, list_make1(&conflicttuple));
+ type, remoteslot_part, newslot,
+ list_make1(&conflicttuple));
return;
}
@@ -3577,6 +3906,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ RetainDeadTuplesData rdt_data = {0};
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3655,6 +3985,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
+ rdt_data.last_recv_time = last_recv_timestamp;
+
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3681,6 +4013,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
{
@@ -3696,8 +4030,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
UpdateWorkerStats(last_received, timestamp, true);
}
+ else if (c == 's') /* Primary status update */
+ {
+ rdt_data.remote_lsn = pq_getmsgint64(&s);
+ rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.reply_time = pq_getmsgint64(&s);
+
+ /*
+ * This should never happen, see
+ * ProcessStandbyPSRequestMessage. But if it happens
+ * due to a bug, we don't want to proceed as it can
+ * incorrectly advance oldest_nonremovable_xid.
+ */
+ if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+ elog(ERROR, "cannot get the latest WAL position from the publisher");
+
+ maybe_advance_nonremovable_xid(&rdt_data, true);
+
+ UpdateWorkerStats(last_received, rdt_data.reply_time, false);
+ }
/* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
@@ -3710,6 +4067,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
+ /* Reset the timestamp if no message was received */
+ rdt_data.last_recv_time = 0;
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
if (!in_remote_transaction && !in_streamed_transaction)
{
/*
@@ -3744,6 +4106,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
else
wait_time = NAPTIME_PER_CYCLE;
+ /*
+ * Ensure to wake up when it's possible to advance the non-removable
+ * transaction ID.
+ */
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -3807,6 +4177,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
@@ -3842,7 +4214,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
- static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
@@ -3921,6 +4292,368 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
+ * Attempt to advance the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details.
+ */
+static void
+maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ if (!can_advance_nonremovable_xid(rdt_data))
+ return;
+
+ process_rdt_phase_transition(rdt_data, status_received);
+}
+
+/*
+ * Preliminary check to determine if advancing the non-removable transaction ID
+ * is allowed.
+ */
+static bool
+can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * It is sufficient to manage non-removable transaction ID for a
+ * subscription by the main apply worker to detect update_deleted reliably
+ * even for table sync or parallel apply workers.
+ */
+ if (!am_leader_apply_worker())
+ return false;
+
+ /* No need to advance if retaining dead tuples is not required */
+ if (!MySubscription->retaindeadtuples)
+ return false;
+
+ return true;
+}
+
+/*
+ * Process phase transitions during the non-removable transaction ID
+ * advancement. See comments atop worker.c for details of the transition.
+ */
+static void
+process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ switch (rdt_data->phase)
+ {
+ case RDT_GET_CANDIDATE_XID:
+ get_candidate_xid(rdt_data);
+ break;
+ case RDT_REQUEST_PUBLISHER_STATUS:
+ request_publisher_status(rdt_data);
+ break;
+ case RDT_WAIT_FOR_PUBLISHER_STATUS:
+ wait_for_publisher_status(rdt_data, status_received);
+ break;
+ case RDT_WAIT_FOR_LOCAL_FLUSH:
+ wait_for_local_flush(rdt_data);
+ break;
+ }
+}
+
+/*
+ * Workhorse for the RDT_GET_CANDIDATE_XID phase.
+ */
+static void
+get_candidate_xid(RetainDeadTuplesData *rdt_data)
+{
+ TransactionId oldest_running_xid;
+ TimestampTz now;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Compute the candidate_xid and request the publisher status at most once
+ * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
+ * details on how this value is dynamically adjusted. This is to avoid
+ * using CPU and network resources without making much progress.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ rdt_data->xid_advance_interval))
+ return;
+
+ /*
+ * Immediately update the timer, even if the function returns later
+ * without setting candidate_xid due to inactivity on the subscriber. This
+ * avoids frequent calls to GetOldestActiveTransactionId.
+ */
+ rdt_data->candidate_xid_time = now;
+
+ /*
+ * Consider transactions in the current database, as only dead tuples from
+ * this database are required for conflict detection.
+ */
+ oldest_running_xid = GetOldestActiveTransactionId(false, false);
+
+ /*
+ * Oldest active transaction ID (oldest_running_xid) can't be behind any
+ * of its previously computed value.
+ */
+ Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid));
+
+ /* Return if the oldest_nonremovable_xid cannot be advanced */
+ if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid))
+ {
+ adjust_xid_advance_interval(rdt_data, false);
+ return;
+ }
+
+ adjust_xid_advance_interval(rdt_data, true);
+
+ rdt_data->candidate_xid = oldest_running_xid;
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
+ */
+static void
+request_publisher_status(RetainDeadTuplesData *rdt_data)
+{
+ static StringInfo request_message = NULL;
+
+ if (!request_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ request_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(request_message);
+
+ /*
+ * Send the current time to update the remote walsender's latest reply
+ * message received time.
+ */
+ pq_sendbyte(request_message, 'p');
+ pq_sendint64(request_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");
+
+ /* Send a request for the publisher status */
+ walrcv_send(LogRepWorkerWalRcvConn,
+ request_message->data, request_message->len);
+
+ rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
+
+ /*
+ * Skip calling maybe_advance_nonremovable_xid() since further transition
+ * is possible only once we receive the publisher status message.
+ */
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
+ */
+static void
+wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ /*
+ * Return if we have requested but not yet received the publisher status.
+ */
+ if (!status_received)
+ return;
+
+ if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
+ rdt_data->remote_wait_for = rdt_data->remote_nextxid;
+
+ /*
+ * Check if all remote concurrent transactions that were active at the
+ * first status request have now completed. If completed, proceed to the
+ * next phase; otherwise, continue checking the publisher status until
+ * these transactions finish.
+ *
+ * It's possible that transactions in the commit phase during the last
+ * cycle have now finished committing, but remote_oldestxid remains older
+ * than remote_wait_for. This can happen if some old transaction came in
+ * the commit phase when we requested status in this cycle. We do not
+ * handle this case explicitly as it's rare and the benefit doesn't
+ * justify the required complexity. Tracking would require either caching
+ * all xids at the publisher or sending them to subscribers. The condition
+ * will resolve naturally once the remaining transactions are finished.
+ *
+ * Directly advancing the non-removable transaction ID is possible if
+ * there are no activities on the publisher since the last advancement
+ * cycle. However, it requires maintaining two fields, last_remote_nextxid
+ * and last_remote_lsn, within the structure for comparison with the
+ * current cycle's values. Considering the minimal cost of continuing in
+ * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
+ * advance the transaction ID here.
+ */
+ if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
+ rdt_data->remote_oldestxid))
+ rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
+ else
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainDeadTuplesData *rdt_data)
+{
+ Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+ TransactionIdIsValid(rdt_data->candidate_xid));
+
+ /*
+ * We expect the publisher and subscriber clocks to be in sync using time
+ * sync service like NTP. Otherwise, we will advance this worker's
+ * oldest_nonremovable_xid prematurely, leading to the removal of rows
+ * required to detect update_deleted reliably. This check primarily
+ * addresses scenarios where the publisher's clock falls behind; if the
+ * publisher's clock is ahead, subsequent transactions will naturally bear
+ * later commit timestamps, conforming to the design outlined atop
+ * worker.c.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(rdt_data->reply_time,
+ rdt_data->candidate_xid_time, 0))
+ ereport(ERROR,
+ errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
+ errdetail_internal("The clock on the publisher is behind that of the subscriber."));
+
+ /*
+ * Do not attempt to advance the non-removable transaction ID when table
+ * sync is in progress. During this time, changes from a single
+ * transaction may be applied by multiple table sync workers corresponding
+ * to the target tables. So, it's necessary for all table sync workers to
+ * apply and flush the corresponding changes before advancing the
+ * transaction ID, otherwise, dead tuples that are still needed for
+ * conflict detection in table sync workers could be removed prematurely.
+ * However, confirming the apply and flush progress across all table sync
+ * workers is complex and not worth the effort, so we simply return if not
+ * all tables are in the READY state.
+ *
+ * It is safe to add new tables with initial states to the subscription
+ * after this check because any changes applied to these tables should
+ * have a WAL position greater than the rdt_data->remote_lsn.
+ */
+ if (!AllTablesyncsReady())
+ return;
+
+ /*
+ * Update and check the remote flush position if we are applying changes
+ * in a loop. This is done at most once per WalWriterDelay to avoid
+ * performing costly operations in get_flush_position() too frequently
+ * during change application.
+ */
+ if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
+ TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
+ rdt_data->last_recv_time, WalWriterDelay))
+ {
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ bool have_pending_txes;
+
+ /* Fetch the latest remote flush position */
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+
+ rdt_data->flushpos_update_time = rdt_data->last_recv_time;
+ }
+
+ /* Return to wait for the changes to be applied */
+ if (last_flushpos < rdt_data->remote_lsn)
+ return;
+
+ /*
+ * Reaching here means the remote WAL position has been received, and all
+ * transactions up to that position on the publisher have been applied and
+ * flushed locally. So, we can advance the non-removable transaction ID.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
+ LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+ rdt_data->candidate_xid);
+
+ /* Notify launcher to update the xmin of the conflict slot */
+ ApplyLauncherWakeup();
+
+ /*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement. We can even use
+ * flushpos_update_time in the next round to decide whether to get the
+ * latest flush position.
+ */
+ rdt_data->phase = RDT_GET_CANDIDATE_XID;
+ rdt_data->remote_lsn = InvalidXLogRecPtr;
+ rdt_data->remote_oldestxid = InvalidFullTransactionId;
+ rdt_data->remote_nextxid = InvalidFullTransactionId;
+ rdt_data->reply_time = 0;
+ rdt_data->remote_wait_for = InvalidFullTransactionId;
+ rdt_data->candidate_xid = InvalidTransactionId;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Adjust the interval for advancing non-removable transaction IDs.
+ *
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
+ * consider the other interval or a separate GUC if the need arises.
+ */
+static void
+adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
+{
+ if (!new_xid_found && rdt_data->xid_advance_interval)
+ {
+ int max_interval = wal_receiver_status_interval
+ ? wal_receiver_status_interval * 1000
+ : MAX_XID_ADVANCE_INTERVAL;
+
+ /*
+ * No new transaction ID has been assigned since the last check, so
+ * double the interval, but not beyond the maximum allowable value.
+ */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+ max_interval);
+ }
+ else
+ {
+ /*
+ * A new transaction ID was found or the interval is not yet
+ * initialized, so set the interval to the minimum value.
+ */
+ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
+ }
+}
+
+/*
* Exit routine for apply workers due to subscription parameter changes.
*/
static void
@@ -4708,6 +5441,30 @@ InitializeLogRepWorker(void)
apply_worker_exit();
}
+ /*
+ * Restart the worker if retain_dead_tuples was enabled during startup.
+ *
+ * At this point, the replication slot used for conflict detection might
+ * not exist yet, or could be dropped soon if the launcher perceives
+ * retain_dead_tuples as disabled. To avoid unnecessary tracking of
+ * oldest_nonremovable_xid when the slot is absent or at risk of being
+ * dropped, a restart is initiated.
+ *
+ * The oldest_nonremovable_xid should be initialized only when the
+ * retain_dead_tuples is enabled before launching the worker. See
+ * logicalrep_worker_launch.
+ */
+ if (am_leader_apply_worker() &&
+ MySubscription->retaindeadtuples &&
+ !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+ {
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
+ MySubscription->name, "retain_dead_tuples"));
+
+ apply_worker_exit();
+ }
+
/* Setup synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -4864,6 +5621,14 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Skip the track_commit_timestamp check when disabling the worker due to
+ * an error, as verifying commit timestamps is unnecessary in this
+ * context.
+ */
+ if (MySubscription->retaindeadtuples)
+ CheckSubDeadTupleRetention(false, true, WARNING);
+
proc_exit(0);
}