aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c36
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/conflict.c22
-rw-r--r--src/backend/replication/logical/launcher.c270
-rw-r--r--src/backend/replication/logical/logical.c32
-rw-r--r--src/backend/replication/logical/origin.c6
-rw-r--r--src/backend/replication/logical/reorderbuffer.c202
-rw-r--r--src/backend/replication/logical/slotsync.c20
-rw-r--r--src/backend/replication/logical/snapbuild.c58
-rw-r--r--src/backend/replication/logical/tablesync.c56
-rw-r--r--src/backend/replication/logical/worker.c810
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c4
-rw-r--r--src/backend/replication/repl_gram.y4
-rw-r--r--src/backend/replication/repl_scanner.l2
-rw-r--r--src/backend/replication/slot.c169
-rw-r--r--src/backend/replication/slotfuncs.c2
-rw-r--r--src/backend/replication/syncrep.c4
-rw-r--r--src/backend/replication/syncrep_scanner.l11
-rw-r--r--src/backend/replication/walreceiver.c16
-rw-r--r--src/backend/replication/walsender.c123
20 files changed, 1590 insertions, 260 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7b4ddf7a8f5..239641bfbb6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -232,6 +232,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
}
+ PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
+ "received message via replication");
+
/*
* Set always-secure search path for the cases where the connection is
* used to run SQL queries, so malicious users can't get control.
@@ -418,31 +421,22 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
"IDENTIFY_SYSTEM",
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
/*
* IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
* 9.4 and onwards.
*/
if (PQnfields(res) < 3 || PQntuples(res) != 1)
- {
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
-
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
- ntuples, nfields, 1, 3)));
- }
+ PQntuples(res), PQnfields(res), 1, 3)));
primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
PQclear(res);
@@ -534,7 +528,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
if (options->logical)
appendStringInfoString(&cmd, " LOGICAL");
- appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
+ appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
/*
* Additional options are different depending on if we are doing logical
@@ -604,13 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
return false;
}
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
PQclear(res);
return true;
}
@@ -718,26 +709,17 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
cmd,
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive timeline history file from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
if (PQnfields(res) != 2 || PQntuples(res) != 1)
- {
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
-
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
- ntuples, nfields)));
- }
+ PQntuples(res), PQnfields(res))));
*filename = pstrdup(PQgetvalue(res, 0, 0));
*len = PQgetlength(res, 0, 1);
@@ -841,13 +823,10 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
return -1;
}
else
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
}
if (rawlen < -1)
ereport(ERROR,
@@ -971,13 +950,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
- }
if (lsn)
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..1fa931a7422 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -441,7 +441,8 @@ pa_launch_parallel_worker(void)
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
- dsm_segment_handle(winfo->dsm_seg));
+ dsm_segment_handle(winfo->dsm_seg),
+ false);
if (launched)
{
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 97c4e26b586..2fd3e8bbda5 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,6 +29,7 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+ [CT_UPDATE_DELETED] = "update_deleted",
[CT_DELETE_MISSING] = "delete_missing",
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
@@ -176,6 +177,7 @@ errcode_apply_conflict(ConflictType type)
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
case CT_DELETE_ORIGIN_DIFFERS:
+ case CT_UPDATE_DELETED:
case CT_DELETE_MISSING:
return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
}
@@ -261,6 +263,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
break;
+ case CT_UPDATE_DELETED:
+ if (localts)
+ {
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+ origin_name, localxmin, timestamptz_to_str(localts));
+
+ /* The origin that modified this row has been removed. */
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ }
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+ break;
+
case CT_UPDATE_MISSING:
appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
break;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 1c3c051403d..37377f7eb63 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -32,6 +32,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
-static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static bool acquire_conflict_slot_if_exists(void);
+static void advance_conflict_slot_xmin(TransactionId new_xmin);
/*
@@ -148,6 +151,7 @@ get_subscription_list(void)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -175,12 +179,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
{
- BgwHandleStatus status;
- int rc;
+ bool result = false;
+ bool dropped_latch = false;
for (;;)
{
+ BgwHandleStatus status;
pid_t pid;
+ int rc;
CHECK_FOR_INTERRUPTS();
@@ -189,8 +195,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
+ result = worker->in_use;
LWLockRelease(LogicalRepWorkerLock);
- return worker->in_use;
+ break;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -205,7 +212,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return false;
+ break; /* result is already false */
}
/*
@@ -220,8 +227,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
+ dropped_latch = true;
}
}
+
+ /*
+ * If we had to clear a latch event in order to wait, be sure to restore
+ * it before exiting. Otherwise caller may miss events.
+ */
+ if (dropped_latch)
+ SetLatch(MyLatch);
+
+ return result;
}
/*
@@ -296,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid, dsm_handle subworker_dsm)
+ Oid relid, dsm_handle subworker_dsm,
+ bool retain_dead_tuples)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -315,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
+ * - The replication slot used in conflict detection is created when
+ * retain_dead_tuples is enabled
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert(!retain_dead_tuples || MyReplicationSlot);
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -328,7 +349,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
if (max_active_replication_origins == 0)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
+ errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
/*
* We need to do the modification of the shared memory under lock so that
@@ -441,6 +462,9 @@ retry:
worker->stream_fileset = NULL;
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
+ worker->oldest_nonremovable_xid = retain_dead_tuples
+ ? MyReplicationSlot->data.xmin
+ : InvalidTransactionId;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -766,6 +790,8 @@ logicalrep_worker_detach(void)
}
LWLockRelease(LogicalRepWorkerLock);
+
+ list_free(workers);
}
/* Block concurrent access. */
@@ -1105,7 +1131,10 @@ ApplyLauncherWakeupAtCommit(void)
on_commit_launcher_wakeup = true;
}
-static void
+/*
+ * Wakeup the launcher immediately.
+ */
+void
ApplyLauncherWakeup(void)
{
if (LogicalRepCtx->launcher_pid != 0)
@@ -1137,6 +1166,12 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+ /*
+ * Acquire the conflict detection slot at startup to ensure it can be
+ * dropped if no longer needed after a restart.
+ */
+ acquire_conflict_slot_if_exists();
+
/* Enter main loop */
for (;;)
{
@@ -1146,6 +1181,9 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ bool can_advance_xmin = true;
+ bool retain_dead_tuples = false;
+ TransactionId xmin = InvalidTransactionId;
CHECK_FOR_INTERRUPTS();
@@ -1155,7 +1193,14 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
- /* Start any missing workers for enabled subscriptions. */
+ /*
+ * Start any missing workers for enabled subscriptions.
+ *
+ * Also, during the iteration through all subscriptions, we compute
+ * the minimum XID required to protect deleted tuples for conflict
+ * detection if one of the subscription enables retain_dead_tuples
+ * option.
+ */
sublist = get_subscription_list();
foreach(lc, sublist)
{
@@ -1165,6 +1210,38 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long elapsed;
+ if (sub->retaindeadtuples)
+ {
+ retain_dead_tuples = true;
+
+ /*
+ * Can't advance xmin of the slot unless all the subscriptions
+ * with retain_dead_tuples are enabled. This is required to
+ * ensure that we don't advance the xmin of
+ * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
+ * enabled. Otherwise, we won't be able to detect conflicts
+ * reliably for such a subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_advance_xmin &= sub->enabled;
+
+ /*
+ * Create a replication slot to retain information necessary
+ * for conflict detection such as dead tuples, commit
+ * timestamps, and origins.
+ *
+ * The slot is created before starting the apply worker to
+ * prevent it from unnecessarily maintaining its
+ * oldest_nonremovable_xid.
+ *
+ * The slot is created even for a disabled subscription to
+ * ensure that conflict-related information is available when
+ * applying remote changes that occurred before the
+ * subscription was enabled.
+ */
+ CreateConflictDetectionSlot();
+ }
+
if (!sub->enabled)
continue;
@@ -1173,7 +1250,27 @@ ApplyLauncherMain(Datum main_arg)
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
- continue; /* worker is running already */
+ {
+ /*
+ * Compute the minimum xmin required to protect dead tuples
+ * required for conflict detection among all running apply
+ * workers that enables retain_dead_tuples.
+ */
+ if (sub->retaindeadtuples && can_advance_xmin)
+ compute_min_nonremovable_xid(w, &xmin);
+
+ /* worker is running already */
+ continue;
+ }
+
+ /*
+ * Can't advance xmin of the slot unless all the workers
+ * corresponding to subscriptions with retain_dead_tuples are
+ * running, disabling the further computation of the minimum
+ * nonremovable xid.
+ */
+ if (sub->retaindeadtuples)
+ can_advance_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1194,10 +1291,22 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(WORKERTYPE_APPLY,
- sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid,
- DSM_HANDLE_INVALID);
+ if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid,
+ DSM_HANDLE_INVALID,
+ sub->retaindeadtuples))
+ {
+ /*
+ * We get here either if we failed to launch a worker
+ * (perhaps for resource-exhaustion reasons) or if we
+ * launched one but it immediately quit. Either way, it
+ * seems appropriate to try again after
+ * wal_retrieve_retry_interval.
+ */
+ wait_time = Min(wait_time,
+ wal_retrieve_retry_interval);
+ }
}
else
{
@@ -1206,6 +1315,20 @@ ApplyLauncherMain(Datum main_arg)
}
}
+ /*
+ * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
+ * that requires us to retain dead tuples. Otherwise, if required,
+ * advance the slot's xmin to protect dead tuples required for the
+ * conflict detection.
+ */
+ if (MyReplicationSlot)
+ {
+ if (!retain_dead_tuples)
+ ReplicationSlotDropAcquired();
+ else if (can_advance_xmin)
+ advance_conflict_slot_xmin(xmin);
+ }
+
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
@@ -1234,6 +1357,125 @@ ApplyLauncherMain(Datum main_arg)
}
/*
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_dead_tuples enabled. Store the result
+ * in *xmin.
+ */
+static void
+compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+{
+ TransactionId nonremovable_xid;
+
+ Assert(worker != NULL);
+
+ /*
+ * The replication slot for conflict detection must be created before the
+ * worker starts.
+ */
+ Assert(MyReplicationSlot);
+
+ SpinLockAcquire(&worker->relmutex);
+ nonremovable_xid = worker->oldest_nonremovable_xid;
+ SpinLockRelease(&worker->relmutex);
+
+ Assert(TransactionIdIsValid(nonremovable_xid));
+
+ if (!TransactionIdIsValid(*xmin) ||
+ TransactionIdPrecedes(nonremovable_xid, *xmin))
+ *xmin = nonremovable_xid;
+}
+
+/*
+ * Acquire the replication slot used to retain information for conflict
+ * detection, if it exists.
+ *
+ * Return true if successfully acquired, otherwise return false.
+ */
+static bool
+acquire_conflict_slot_if_exists(void)
+{
+ if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
+ return false;
+
+ ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
+ return true;
+}
+
+/*
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
+ */
+static void
+advance_conflict_slot_xmin(TransactionId new_xmin)
+{
+ Assert(MyReplicationSlot);
+ Assert(TransactionIdIsValid(new_xmin));
+ Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+
+ /* Return if the xmin value of the slot cannot be advanced */
+ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
+ return;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = new_xmin;
+ MyReplicationSlot->data.xmin = new_xmin;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+
+ /*
+ * Like PhysicalConfirmReceivedLocation(), do not save slot information
+ * each time. This is acceptable because all concurrent transactions on
+ * the publisher that require the data preceding the slot's xmin should
+ * have already been applied and flushed on the subscriber before the xmin
+ * is advanced. So, even if the slot's xmin regresses after a restart, it
+ * will be advanced again in the next cycle. Therefore, no data required
+ * for conflict detection will be prematurely removed.
+ */
+ return;
+}
+
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ TransactionId xmin_horizon;
+
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ xmin_horizon = GetOldestSafeDecodingTransactionId(false);
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = xmin_horizon;
+ MyReplicationSlot->data.xmin = xmin_horizon;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotsComputeRequiredXmin(true);
+
+ LWLockRelease(ProcArrayLock);
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..7e363a7c05b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -565,7 +567,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* kinds of client errors; so the client may wish to check that
* confirmed_flush_lsn matches its expectations.
*/
- elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
+ elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
LSN_FORMAT_ARGS(start_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush));
@@ -608,7 +610,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ereport(LOG,
(errmsg("starting logical decoding for slot \"%s\"",
NameStr(slot->data.name)),
- errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
+ errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(slot->data.restart_lsn))));
@@ -635,7 +637,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
/* Initialize from where to start reading WAL. */
XLogBeginRead(ctx->reader, slot->data.restart_lsn);
- elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
+ elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
LSN_FORMAT_ARGS(slot->data.restart_lsn));
/* Wait for a consistent starting point */
@@ -756,7 +758,7 @@ output_plugin_error_callback(void *arg)
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
- errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
+ errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
@@ -1723,7 +1725,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockRelease(&slot->mutex);
if (got_new_xmin)
- elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
+ elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
LSN_FORMAT_ARGS(current_lsn));
/* candidate already valid with the current flush position, apply */
@@ -1783,7 +1785,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
slot->candidate_restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
+ elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn));
}
@@ -1798,7 +1800,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
+ elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
LSN_FORMAT_ARGS(candidate_restart_lsn),
@@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
+ XLogRecPtr restart_lsn pg_attribute_unused();
SpinLockAcquire(&MyReplicationSlot->mutex);
+ /* remember the old restart lsn */
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+
/*
* Prevent moving the confirmed_flush backwards, as this could lead to
* data duplication issues caused by replicating already replicated
@@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
+#ifdef USE_INJECTION_POINTS
+ XLogSegNo seg1,
+ seg2;
+
+ XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+ XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+ /* trigger injection point, but only if segment changes */
+ if (seg1 != seg2)
+ INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index a17bacf88e7..87f10e50dcc 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -826,9 +826,9 @@ StartupReplicationOrigin(void)
last_state++;
ereport(LOG,
- (errmsg("recovered replication state of node %d to %X/%X",
- disk_state.roident,
- LSN_FORMAT_ARGS(disk_state.remote_lsn))));
+ errmsg("recovered replication state of node %d to %X/%08X",
+ disk_state.roident,
+ LSN_FORMAT_ARGS(disk_state.remote_lsn)));
}
/* now check checksum */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 67655111875..34cf05668ae 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -109,10 +109,22 @@
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+ ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ }
+
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
@@ -1397,7 +1415,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 off;
/* nothing there anymore */
- if (state->heap->bh_size == 0)
+ if (binaryheap_empty(state->heap))
return NULL;
off = DatumGetInt32(binaryheap_first(state->heap));
@@ -2581,7 +2599,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (++changes_count >= CHANGES_THRESHOLD)
{
- rb->update_progress_txn(rb, txn, change->lsn);
+ rb->update_progress_txn(rb, txn, prev_lsn);
changes_count = 0;
}
}
@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations,
- txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
* We might have decoded changes for this transaction that could load
* the cache as per the current transaction's view (consider DDL's
* happened in this transaction). We don't want the decoding of future
- * transactions to use those cache entries so execute invalidations.
+ * transactions to use those cache entries so execute only the inval
+ * messages in this transaction.
*/
if (txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
txn->final_lsn = lsn;
/*
- * Process cache invalidation messages if there are any. Even if we're not
- * interested in the transaction's contents, it could have manipulated the
- * catalog and we need to update the caches according to that.
+ * Process only cache invalidation messages in this transaction if there
+ * are any. Even if we're not interested in the transaction's contents, it
+ * could have manipulated the catalog and we need to update the caches
+ * according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferChange *change;
+
+ change = ReorderBufferAllocChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+ uint32 *ninvals_out,
+ SharedInvalidationMessage *msgs_new,
+ Size nmsgs_new)
+{
+ if (*ninvals_out == 0)
+ {
+ *ninvals_out = nmsgs_new;
+ *invals_out = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+ memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+ }
+ else
+ {
+ /* Enlarge the array of inval messages */
+ *invals_out = (SharedInvalidationMessage *)
+ repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+ (*ninvals_out + nmsgs_new));
+ memcpy(*invals_out + *ninvals_out, msgs_new,
+ nmsgs_new * sizeof(SharedInvalidationMessage));
+ *ninvals_out += nmsgs_new;
+ }
+}
+
+/*
* Accumulate the invalidations for executing them later.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferTXN *txn;
MemoryContext oldcontext;
- ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
Assert(nmsgs > 0);
- /* Accumulate invalidations. */
- if (txn->ninvalidations == 0)
- {
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
- }
- else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations,
+ &txn->ninvalidations,
+ msgs, nmsgs);
+
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together. See comments
+ * ReorderBufferAddInvalidations.
+ */
+ txn = rbtxn_get_toptxn(txn);
+
+ Assert(nmsgs > 0);
+
+ if (!rbtxn_distr_inval_overflowed(txn))
{
- txn->invalidations = (SharedInvalidationMessage *)
- repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
- (txn->ninvalidations + nmsgs));
+ /*
+ * Check the transaction has enough space for storing distributed
+ * invalidation messages.
+ */
+ if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+ {
+ /*
+ * Mark the invalidation message as overflowed and free up the
+ * messages accumulated so far.
+ */
+ txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
- memcpy(txn->invalidations + txn->ninvalidations, msgs,
- nmsgs * sizeof(SharedInvalidationMessage));
- txn->ninvalidations += nmsgs;
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ txn->ninvalidations_distributed = 0;
+ }
+ }
+ else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+ &txn->ninvalidations_distributed,
+ msgs, nmsgs);
}
- change = ReorderBufferAllocChange(rb);
- change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
- change->data.inval.ninvalidations = nmsgs;
- change->data.inval.invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(change->data.inval.invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
-
- ReorderBufferQueueChange(rb, xid, lsn, change, false);
+ /* Queue the invalidation messages into the transaction */
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
MemoryContextSwitchTo(oldcontext);
}
@@ -4787,7 +4917,7 @@ StartupReorderBuffer(void)
continue;
/* if it cannot be a slot, skip the directory */
- if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+ if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
continue;
/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..37738440113 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -211,9 +211,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* impact the users, so we used DEBUG1 level to log the message.
*/
ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
- errmsg("could not synchronize replication slot \"%s\" because remote slot precedes local slot",
+ errmsg("could not synchronize replication slot \"%s\"",
remote_slot->name),
- errdetail("The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
+ errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
@@ -275,7 +275,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
ereport(ERROR,
errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
remote_slot->name),
- errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+ errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
}
@@ -593,7 +593,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ereport(LOG,
errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
- errdetail("Logical decoding could not find consistent point from local slot's LSN %X/%X.",
+ errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
return false;
@@ -642,7 +642,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr)));
@@ -733,7 +733,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(ERROR,
errmsg_internal("cannot synchronize local slot \"%s\"",
remote_slot->name),
- errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+ errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
@@ -1059,14 +1059,14 @@ ValidateSlotSyncParams(int elevel)
{
/*
* Logical slot sync/creation requires wal_level >= logical.
- *
- * Since altering the wal_level requires a server restart, so error out in
- * this case regardless of elevel provided by caller.
*/
if (wal_level < WAL_LEVEL_LOGICAL)
- ereport(ERROR,
+ {
+ ereport(elevel,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
+ return false;
+ }
/*
* A physical replication slot(primary_slot_name) is required on the
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0d7bddbe4ed..8532bfd27e5 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -774,7 +774,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
if (rbtxn_is_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
txn->xid, LSN_FORMAT_ARGS(lsn));
/*
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
* contents built by the current transaction even after its decoding,
* which should have been invalidated due to concurrent catalog
* changing transaction.
+ *
+ * Distribute only the invalidation messages generated by the current
+ * committed transaction. Invalidation messages received from other
+ * transactions would have already been propagated to the relevant
+ * in-progress transactions. This transaction would have processed
+ * those invalidations, ensuring that subsequent transactions observe
+ * a consistent cache state.
*/
if (txn->xid != xid)
{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
{
Assert(msgs != NULL);
- ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
- ninvalidations, msgs);
+ ReorderBufferAddDistributedInvalidations(builder->reorder,
+ txn->xid, lsn,
+ ninvalidations, msgs);
}
}
}
@@ -1263,10 +1271,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->initial_xmin_horizon))
{
ereport(DEBUG1,
- (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- LSN_FORMAT_ARGS(lsn)),
- errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ builder->initial_xmin_horizon, running->oldestRunningXid));
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
@@ -1302,9 +1310,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no running transactions.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no running transactions."));
return false;
}
@@ -1351,10 +1359,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmax));
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial starting point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1375,10 +1383,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = running->nextXid;
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1399,9 +1407,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no old transactions anymore.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no old transactions anymore."));
}
/*
@@ -1905,9 +1913,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Assert(builder->state == SNAPBUILD_CONSISTENT);
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Logical decoding will begin using saved snapshot.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Logical decoding will begin using saved snapshot."));
return true;
snapshot_not_interesting:
@@ -2053,7 +2061,7 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
int ret;
struct stat stat_buf;
- sprintf(path, "%s/%X-%X.snap",
+ sprintf(path, "%s/%08X-%08X.snap",
PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..d3356bc84ee 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell *lc;
bool started_tx = false;
bool should_exit = false;
+ Relation rel = NULL;
Assert(!IsTransactionState());
@@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* worker to remove the origin tracking as if there is any
* error while dropping we won't restart it to drop the
* origin. So passing missing_ok = true.
+ *
+ * Lock the subscription and origin in the same order as we
+ * are doing during DDL commands to avoid deadlocks. See
+ * AlterSubscription_refresh.
*/
+ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+ 0, AccessShareLock);
+
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
rstate->relid,
originname,
@@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
- rstate->lsn);
+ rstate->lsn, true);
}
}
else
@@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won't
* be able to detect the waits on the latch.
+ *
+ * Also close any tables prior to the commit.
*/
+ if (rel)
+ {
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
CommitTransactionCommand();
pgstat_report_stat(false);
}
@@ -603,20 +622,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID);
+ /*
+ * Set the last_start_time even if we fail to start
+ * the worker, so that we won't retry until
+ * wal_retrieve_retry_interval has elapsed.
+ */
hentry->last_start_time = now;
+ (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid,
+ DSM_HANDLE_INVALID,
+ false);
}
}
}
}
}
+ /* Close table if opened */
+ if (rel)
+ table_close(rel, NoLock);
+
+
if (started_tx)
{
/*
@@ -1408,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();
pgstat_report_stat(true);
@@ -1541,14 +1572,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_FINISHEDCOPY,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();
copy_table_done:
elog(DEBUG1,
- "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
originname, LSN_FORMAT_ARGS(*origin_startpos));
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a23262957ac..89e241c8392 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,13 +109,6 @@
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
- * We don't allow to toggle two_phase option of a subscription because it can
- * lead to an inconsistent replica. Consider, initially, it was on and we have
- * received some prepare then we turn it off, now at commit time the server
- * will send the entire transaction data along with the commit. With some more
- * analysis, we can allow changing this option from off to on but not sure if
- * that alone would be useful.
- *
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
@@ -139,6 +132,96 @@
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
+ *
+ * RETAIN DEAD TUPLES
+ * ----------------------
+ * Each apply worker that enabled retain_dead_tuples option maintains a
+ * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
+ * prevent dead rows from being removed prematurely when the apply worker still
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
+ * otherwise, vacuum freeze could remove the required information.
+ *
+ * The logical replication launcher manages an internal replication slot named
+ * "pg_conflict_detection". It asynchronously aggregates the non-removable
+ * transaction ID from all apply workers to determine the appropriate xmin for
+ * the slot, thereby retaining necessary tuples.
+ *
+ * The non-removable transaction ID in the apply worker is advanced to the
+ * oldest running transaction ID once all concurrent transactions on the
+ * publisher have been applied and flushed locally. The process involves:
+ *
+ * - RDT_GET_CANDIDATE_XID:
+ * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
+ * candidate xid.
+ *
+ * - RDT_REQUEST_PUBLISHER_STATUS:
+ * Send a message to the walsender requesting the publisher status, which
+ * includes the latest WAL write position and information about transactions
+ * that are in the commit phase.
+ *
+ * - RDT_WAIT_FOR_PUBLISHER_STATUS:
+ * Wait for the status from the walsender. After receiving the first status,
+ * do not proceed if there are concurrent remote transactions that are still
+ * in the commit phase. These transactions might have been assigned an
+ * earlier commit timestamp but have not yet written the commit WAL record.
+ * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
+ * until all these transactions have completed.
+ *
+ * - RDT_WAIT_FOR_LOCAL_FLUSH:
+ * Advance the non-removable transaction ID if the current flush location has
+ * reached or surpassed the last received WAL position.
+ *
+ * The overall state progression is: GET_CANDIDATE_XID ->
+ * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
+ * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
+ *
+ * Retaining the dead tuples for this period is sufficient for ensuring
+ * eventual consistency using last-update-wins strategy, as dead tuples are
+ * useful for detecting conflicts only during the application of concurrent
+ * transactions from remote nodes. After applying and flushing all remote
+ * transactions that occurred concurrently with the tuple DELETE, any
+ * subsequent UPDATE from a remote node should have a later timestamp. In such
+ * cases, it is acceptable to detect an update_missing scenario and convert the
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
+ *
+ * Note that advancing the non-removable transaction ID is not supported if the
+ * publisher is also a physical standby. This is because the logical walsender
+ * on the standby can only get the WAL replay position but there may be more
+ * WALs that are being replicated from the primary and those WALs could have
+ * earlier commit timestamp.
+ *
+ * Similarly, when the publisher has subscribed to another publisher,
+ * information necessary for conflict detection cannot be retained for
+ * changes from origins other than the publisher. This is because publisher
+ * lacks the information on concurrent transactions of other publishers to
+ * which it subscribes. As the information on concurrent transactions is
+ * unavailable beyond subscriber's immediate publishers, the non-removable
+ * transaction ID might be advanced prematurely before changes from other
+ * origins have been fully applied.
+ *
+ * XXX Retaining information for changes from other origins might be possible
+ * by requesting the subscription on that origin to enable retain_dead_tuples
+ * and fetching the conflict detection slot.xmin along with the publisher's
+ * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
+ * wait for the remote slot's xmin to reach the oldest active transaction ID,
+ * ensuring that all transactions from other origins have been applied on the
+ * publisher, thereby getting the latest WAL position that includes all
+ * concurrent changes. However, this approach may impact performance, so it
+ * might not worth the effort.
+ *
+ * XXX It seems feasible to get the latest commit's WAL location from the
+ * publisher and wait till that is applied. However, we can't do that
+ * because commit timestamps can regress as a commit with a later LSN is not
+ * guaranteed to have a later timestamp than those with earlier LSNs. Having
+ * said that, even if that is possible, it won't improve performance much as
+ * the apply always lag and moves slowly as compared with the transactions
+ * on the publisher.
*-------------------------------------------------------------------------
*/
@@ -147,6 +230,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
@@ -155,6 +239,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -173,12 +258,14 @@
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
@@ -275,6 +362,78 @@ typedef enum
TRANS_PARALLEL_APPLY,
} TransApplyAction;
+/*
+ * The phases involved in advancing the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details of the transition between these
+ * phases.
+ */
+typedef enum
+{
+ RDT_GET_CANDIDATE_XID,
+ RDT_REQUEST_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_LOCAL_FLUSH
+} RetainDeadTuplesPhase;
+
+/*
+ * Critical information for managing phase transitions within the
+ * RetainDeadTuplesPhase.
+ */
+typedef struct RetainDeadTuplesData
+{
+ RetainDeadTuplesPhase phase; /* current phase */
+ XLogRecPtr remote_lsn; /* WAL write position on the publisher */
+
+ /*
+ * Oldest transaction ID that was in the commit phase on the publisher.
+ * Use FullTransactionId to prevent issues with transaction ID wraparound,
+ * where a new remote_oldestxid could falsely appear to originate from the
+ * past and block advancement.
+ */
+ FullTransactionId remote_oldestxid;
+
+ /*
+ * Next transaction ID to be assigned on the publisher. Use
+ * FullTransactionId for consistency and to allow straightforward
+ * comparisons with remote_oldestxid.
+ */
+ FullTransactionId remote_nextxid;
+
+ TimestampTz reply_time; /* when the publisher responds with status */
+
+ /*
+ * Publisher transaction ID that must be awaited to complete before
+ * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
+ * FullTransactionId for the same reason as remote_nextxid.
+ */
+ FullTransactionId remote_wait_for;
+
+ TransactionId candidate_xid; /* candidate for the non-removable
+ * transaction ID */
+ TimestampTz flushpos_update_time; /* when the remote flush position was
+ * updated in final phase
+ * (RDT_WAIT_FOR_LOCAL_FLUSH) */
+
+ /*
+ * The following fields are used to determine the timing for the next
+ * round of transaction ID advancement.
+ */
+ TimestampTz last_recv_time; /* when the last message was received */
+ TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
+ int xid_advance_interval; /* how much time (ms) to wait before
+ * attempting to advance the
+ * non-removable transaction ID */
+} RetainDeadTuplesData;
+
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs. The maximum interval is a bit arbitrary but
+ * is sufficient to not cause any undue network traffic.
+ */
+#define MIN_XID_ADVANCE_INTERVAL 100
+#define MAX_XID_ADVANCE_INTERVAL 180000
+
/* errcontext tracker */
static ApplyErrorCallbackArg apply_error_callback_arg =
{
@@ -339,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
+/*
+ * The remote WAL position that has been applied and flushed locally. We record
+ * and use this information both while sending feedback to the server and
+ * advancing oldest_nonremovable_xid.
+ */
+static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -379,6 +545,19 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
+static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
+static void request_publisher_status(RetainDeadTuplesData *rdt_data);
+static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
+ bool new_xid_found);
+
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -397,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+ Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
@@ -1023,7 +1208,7 @@ apply_handle_commit(StringInfo s)
if (commit_data.commit_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -1115,7 +1300,7 @@ apply_handle_prepare(StringInfo s)
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -2733,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
else
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or never existed
+ * is crucial to avoid misleading the user during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing except for
- * emitting a log message.
+ * The tuple to be updated could not be found or was deleted. Do
+ * nothing except for emitting a log message.
*/
- ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, newslot, list_make1(&conflicttuple));
+ ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2964,6 +3163,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
}
/*
+ * Determine whether the index can reliably locate the deleted tuple in the
+ * local relation.
+ *
+ * An index may exclude deleted tuples if it was re-indexed or re-created during
+ * change application. Therefore, an index is considered usable only if the
+ * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
+ * index tuple's xmin. This ensures that any tuples deleted prior to the index
+ * creation or re-indexing are not relevant for conflict detection in the
+ * current apply worker.
+ *
+ * Note that indexes may also be excluded if they were modified by other DDL
+ * operations, such as ALTER INDEX. However, this is acceptable, as the
+ * likelihood of such DDL changes coinciding with the need to scan dead
+ * tuples for the update_deleted is low.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
+ TransactionId conflict_detection_xmin)
+{
+ HeapTuple index_tuple;
+ TransactionId index_xmin;
+
+ index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+ if (!HeapTupleIsValid(index_tuple)) /* should not happen */
+ elog(ERROR, "cache lookup failed for index %u", localindexoid);
+
+ /*
+ * No need to check for a frozen transaction ID, as
+ * TransactionIdPrecedes() manages it internally, treating it as falling
+ * behind the conflict_detection_xmin.
+ */
+ index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
+
+ ReleaseSysCache(index_tuple);
+
+ return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Attempts to locate a deleted tuple in the local relation that matches the
+ * values of the tuple received from the publication side (in 'remoteslot').
+ * The search is performed using either the replica identity index, primary
+ * key, other available index, or a sequential scan if necessary.
+ *
+ * Returns true if the deleted tuple is found. If found, the transaction ID,
+ * origin, and commit timestamp of the deletion are stored in '*delete_xid',
+ * '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid, RepOriginId *delete_origin,
+ TimestampTz *delete_time)
+{
+ TransactionId oldestxmin;
+ ReplicationSlot *slot;
+
+ /*
+ * Return false if either dead tuples are not retained or commit timestamp
+ * data is not available.
+ */
+ if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
+ return false;
+
+ /*
+ * For conflict detection, we use the conflict slot's xmin value instead
+ * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
+ * a threshold to identify tuples that were recently deleted. These tuples
+ * are not visible to concurrent transactions, but we log an
+ * update_deleted conflict if such a tuple matches the remote update being
+ * applied.
+ *
+ * Although GetOldestNonRemovableTransactionId() can return a value older
+ * than the slot's xmin, for our current purpose it is acceptable to treat
+ * tuples deleted by transactions prior to slot.xmin as update_missing
+ * conflicts.
+ *
+ * Ideally, we would use oldest_nonremovable_xid, which is directly
+ * maintained by the leader apply worker. However, this value is not
+ * available to table synchronization or parallel apply workers, making
+ * slot.xmin a practical alternative in those contexts.
+ */
+ slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+ Assert(slot);
+
+ SpinLockAcquire(&slot->mutex);
+ oldestxmin = slot->data.xmin;
+ SpinLockRelease(&slot->mutex);
+
+ Assert(TransactionIdIsValid(oldestxmin));
+
+ if (OidIsValid(localidxoid) &&
+ IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
+ return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
+ remoteslot, oldestxmin,
+ delete_xid, delete_origin,
+ delete_time);
+ else
+ return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
+ oldestxmin, delete_xid,
+ delete_origin, delete_time);
+}
+
+/*
* This handles insert, update, delete on a partitioned table.
*/
static void
@@ -3081,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part, &localslot);
if (!found)
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or
+ * never existed is crucial to avoid misleading the user
+ * during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(partrel,
+ part_entry->localindexoid,
+ remoteslot_part,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing
- * except for emitting a log message.
+ * The tuple to be updated could not be found or was
+ * deleted. Do nothing except for emitting a log message.
*/
ReportApplyConflict(estate, partrelinfo, LOG,
- CT_UPDATE_MISSING, remoteslot_part,
- newslot, list_make1(&conflicttuple));
+ type, remoteslot_part, newslot,
+ list_make1(&conflicttuple));
return;
}
@@ -3584,6 +3906,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ RetainDeadTuplesData rdt_data = {0};
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3662,6 +3985,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
+ rdt_data.last_recv_time = last_recv_timestamp;
+
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3688,6 +4013,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
{
@@ -3703,8 +4030,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
UpdateWorkerStats(last_received, timestamp, true);
}
+ else if (c == 's') /* Primary status update */
+ {
+ rdt_data.remote_lsn = pq_getmsgint64(&s);
+ rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.reply_time = pq_getmsgint64(&s);
+
+ /*
+ * This should never happen, see
+ * ProcessStandbyPSRequestMessage. But if it happens
+ * due to a bug, we don't want to proceed as it can
+ * incorrectly advance oldest_nonremovable_xid.
+ */
+ if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+ elog(ERROR, "cannot get the latest WAL position from the publisher");
+
+ maybe_advance_nonremovable_xid(&rdt_data, true);
+
+ UpdateWorkerStats(last_received, rdt_data.reply_time, false);
+ }
/* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
@@ -3717,6 +4067,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
+ /* Reset the timestamp if no message was received */
+ rdt_data.last_recv_time = 0;
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
if (!in_remote_transaction && !in_streamed_transaction)
{
/*
@@ -3751,6 +4106,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
else
wait_time = NAPTIME_PER_CYCLE;
+ /*
+ * Ensure to wake up when it's possible to advance the non-removable
+ * transaction ID.
+ */
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -3814,6 +4177,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
@@ -3849,7 +4214,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
- static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
@@ -3910,7 +4274,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
@@ -3928,6 +4292,368 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
+ * Attempt to advance the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details.
+ */
+static void
+maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ if (!can_advance_nonremovable_xid(rdt_data))
+ return;
+
+ process_rdt_phase_transition(rdt_data, status_received);
+}
+
+/*
+ * Preliminary check to determine if advancing the non-removable transaction ID
+ * is allowed.
+ */
+static bool
+can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * It is sufficient to manage non-removable transaction ID for a
+ * subscription by the main apply worker to detect update_deleted reliably
+ * even for table sync or parallel apply workers.
+ */
+ if (!am_leader_apply_worker())
+ return false;
+
+ /* No need to advance if retaining dead tuples is not required */
+ if (!MySubscription->retaindeadtuples)
+ return false;
+
+ return true;
+}
+
+/*
+ * Process phase transitions during the non-removable transaction ID
+ * advancement. See comments atop worker.c for details of the transition.
+ */
+static void
+process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ switch (rdt_data->phase)
+ {
+ case RDT_GET_CANDIDATE_XID:
+ get_candidate_xid(rdt_data);
+ break;
+ case RDT_REQUEST_PUBLISHER_STATUS:
+ request_publisher_status(rdt_data);
+ break;
+ case RDT_WAIT_FOR_PUBLISHER_STATUS:
+ wait_for_publisher_status(rdt_data, status_received);
+ break;
+ case RDT_WAIT_FOR_LOCAL_FLUSH:
+ wait_for_local_flush(rdt_data);
+ break;
+ }
+}
+
+/*
+ * Workhorse for the RDT_GET_CANDIDATE_XID phase.
+ */
+static void
+get_candidate_xid(RetainDeadTuplesData *rdt_data)
+{
+ TransactionId oldest_running_xid;
+ TimestampTz now;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Compute the candidate_xid and request the publisher status at most once
+ * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
+ * details on how this value is dynamically adjusted. This is to avoid
+ * using CPU and network resources without making much progress.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ rdt_data->xid_advance_interval))
+ return;
+
+ /*
+ * Immediately update the timer, even if the function returns later
+ * without setting candidate_xid due to inactivity on the subscriber. This
+ * avoids frequent calls to GetOldestActiveTransactionId.
+ */
+ rdt_data->candidate_xid_time = now;
+
+ /*
+ * Consider transactions in the current database, as only dead tuples from
+ * this database are required for conflict detection.
+ */
+ oldest_running_xid = GetOldestActiveTransactionId(false, false);
+
+ /*
+ * Oldest active transaction ID (oldest_running_xid) can't be behind any
+ * of its previously computed value.
+ */
+ Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid));
+
+ /* Return if the oldest_nonremovable_xid cannot be advanced */
+ if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid))
+ {
+ adjust_xid_advance_interval(rdt_data, false);
+ return;
+ }
+
+ adjust_xid_advance_interval(rdt_data, true);
+
+ rdt_data->candidate_xid = oldest_running_xid;
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
+ */
+static void
+request_publisher_status(RetainDeadTuplesData *rdt_data)
+{
+ static StringInfo request_message = NULL;
+
+ if (!request_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ request_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(request_message);
+
+ /*
+ * Send the current time to update the remote walsender's latest reply
+ * message received time.
+ */
+ pq_sendbyte(request_message, 'p');
+ pq_sendint64(request_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");
+
+ /* Send a request for the publisher status */
+ walrcv_send(LogRepWorkerWalRcvConn,
+ request_message->data, request_message->len);
+
+ rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
+
+ /*
+ * Skip calling maybe_advance_nonremovable_xid() since further transition
+ * is possible only once we receive the publisher status message.
+ */
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
+ */
+static void
+wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ /*
+ * Return if we have requested but not yet received the publisher status.
+ */
+ if (!status_received)
+ return;
+
+ if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
+ rdt_data->remote_wait_for = rdt_data->remote_nextxid;
+
+ /*
+ * Check if all remote concurrent transactions that were active at the
+ * first status request have now completed. If completed, proceed to the
+ * next phase; otherwise, continue checking the publisher status until
+ * these transactions finish.
+ *
+ * It's possible that transactions in the commit phase during the last
+ * cycle have now finished committing, but remote_oldestxid remains older
+ * than remote_wait_for. This can happen if some old transaction came in
+ * the commit phase when we requested status in this cycle. We do not
+ * handle this case explicitly as it's rare and the benefit doesn't
+ * justify the required complexity. Tracking would require either caching
+ * all xids at the publisher or sending them to subscribers. The condition
+ * will resolve naturally once the remaining transactions are finished.
+ *
+ * Directly advancing the non-removable transaction ID is possible if
+ * there are no activities on the publisher since the last advancement
+ * cycle. However, it requires maintaining two fields, last_remote_nextxid
+ * and last_remote_lsn, within the structure for comparison with the
+ * current cycle's values. Considering the minimal cost of continuing in
+ * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
+ * advance the transaction ID here.
+ */
+ if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
+ rdt_data->remote_oldestxid))
+ rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
+ else
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainDeadTuplesData *rdt_data)
+{
+ Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+ TransactionIdIsValid(rdt_data->candidate_xid));
+
+ /*
+ * We expect the publisher and subscriber clocks to be in sync using time
+ * sync service like NTP. Otherwise, we will advance this worker's
+ * oldest_nonremovable_xid prematurely, leading to the removal of rows
+ * required to detect update_deleted reliably. This check primarily
+ * addresses scenarios where the publisher's clock falls behind; if the
+ * publisher's clock is ahead, subsequent transactions will naturally bear
+ * later commit timestamps, conforming to the design outlined atop
+ * worker.c.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(rdt_data->reply_time,
+ rdt_data->candidate_xid_time, 0))
+ ereport(ERROR,
+ errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
+ errdetail_internal("The clock on the publisher is behind that of the subscriber."));
+
+ /*
+ * Do not attempt to advance the non-removable transaction ID when table
+ * sync is in progress. During this time, changes from a single
+ * transaction may be applied by multiple table sync workers corresponding
+ * to the target tables. So, it's necessary for all table sync workers to
+ * apply and flush the corresponding changes before advancing the
+ * transaction ID, otherwise, dead tuples that are still needed for
+ * conflict detection in table sync workers could be removed prematurely.
+ * However, confirming the apply and flush progress across all table sync
+ * workers is complex and not worth the effort, so we simply return if not
+ * all tables are in the READY state.
+ *
+ * It is safe to add new tables with initial states to the subscription
+ * after this check because any changes applied to these tables should
+ * have a WAL position greater than the rdt_data->remote_lsn.
+ */
+ if (!AllTablesyncsReady())
+ return;
+
+ /*
+ * Update and check the remote flush position if we are applying changes
+ * in a loop. This is done at most once per WalWriterDelay to avoid
+ * performing costly operations in get_flush_position() too frequently
+ * during change application.
+ */
+ if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
+ TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
+ rdt_data->last_recv_time, WalWriterDelay))
+ {
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ bool have_pending_txes;
+
+ /* Fetch the latest remote flush position */
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+
+ rdt_data->flushpos_update_time = rdt_data->last_recv_time;
+ }
+
+ /* Return to wait for the changes to be applied */
+ if (last_flushpos < rdt_data->remote_lsn)
+ return;
+
+ /*
+ * Reaching here means the remote WAL position has been received, and all
+ * transactions up to that position on the publisher have been applied and
+ * flushed locally. So, we can advance the non-removable transaction ID.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
+ LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+ rdt_data->candidate_xid);
+
+ /* Notify launcher to update the xmin of the conflict slot */
+ ApplyLauncherWakeup();
+
+ /*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement. We can even use
+ * flushpos_update_time in the next round to decide whether to get the
+ * latest flush position.
+ */
+ rdt_data->phase = RDT_GET_CANDIDATE_XID;
+ rdt_data->remote_lsn = InvalidXLogRecPtr;
+ rdt_data->remote_oldestxid = InvalidFullTransactionId;
+ rdt_data->remote_nextxid = InvalidFullTransactionId;
+ rdt_data->reply_time = 0;
+ rdt_data->remote_wait_for = InvalidFullTransactionId;
+ rdt_data->candidate_xid = InvalidTransactionId;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Adjust the interval for advancing non-removable transaction IDs.
+ *
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
+ * consider the other interval or a separate GUC if the need arises.
+ */
+static void
+adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
+{
+ if (!new_xid_found && rdt_data->xid_advance_interval)
+ {
+ int max_interval = wal_receiver_status_interval
+ ? wal_receiver_status_interval * 1000
+ : MAX_XID_ADVANCE_INTERVAL;
+
+ /*
+ * No new transaction ID has been assigned since the last check, so
+ * double the interval, but not beyond the maximum allowable value.
+ */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+ max_interval);
+ }
+ else
+ {
+ /*
+ * A new transaction ID was found or the interval is not yet
+ * initialized, so set the interval to the minimum value.
+ */
+ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
+ }
+}
+
+/*
* Exit routine for apply workers due to subscription parameter changes.
*/
static void
@@ -4715,6 +5441,30 @@ InitializeLogRepWorker(void)
apply_worker_exit();
}
+ /*
+ * Restart the worker if retain_dead_tuples was enabled during startup.
+ *
+ * At this point, the replication slot used for conflict detection might
+ * not exist yet, or could be dropped soon if the launcher perceives
+ * retain_dead_tuples as disabled. To avoid unnecessary tracking of
+ * oldest_nonremovable_xid when the slot is absent or at risk of being
+ * dropped, a restart is initiated.
+ *
+ * The oldest_nonremovable_xid should be initialized only when the
+ * retain_dead_tuples is enabled before launching the worker. See
+ * logicalrep_worker_launch.
+ */
+ if (am_leader_apply_worker() &&
+ MySubscription->retaindeadtuples &&
+ !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+ {
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
+ MySubscription->name, "retain_dead_tuples"));
+
+ apply_worker_exit();
+ }
+
/* Setup synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -4871,6 +5621,14 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Skip the track_commit_timestamp check when disabling the worker due to
+ * an error, as verifying commit timestamps is unnecessary in this
+ * context.
+ */
+ if (MySubscription->retaindeadtuples)
+ CheckSubDeadTupleRetention(false, true, WARNING);
+
proc_exit(0);
}
@@ -4916,7 +5674,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
skip_xact_finish_lsn = finish_lsn;
ereport(LOG,
- errmsg("logical replication starts skipping transaction at LSN %X/%X",
+ errmsg("logical replication starts skipping transaction at LSN %X/%08X",
LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
}
@@ -4930,8 +5688,8 @@ stop_skipping_changes(void)
return;
ereport(LOG,
- (errmsg("logical replication completed skipping transaction at LSN %X/%X",
- LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+ errmsg("logical replication completed skipping transaction at LSN %X/%08X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
/* Stop skipping changes */
skip_xact_finish_lsn = InvalidXLogRecPtr;
@@ -5019,7 +5777,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
if (myskiplsn != finish_lsn)
ereport(WARNING,
errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
- errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
LSN_FORMAT_ARGS(finish_lsn),
LSN_FORMAT_ARGS(myskiplsn)));
}
@@ -5056,7 +5814,7 @@ apply_error_callback(void *arg)
logicalrep_message_type(errarg->command),
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
@@ -5074,7 +5832,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.relname,
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
@@ -5093,7 +5851,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766e6d7..f4c977262c5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -297,10 +297,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool two_phase_option_given = false;
bool origin_option_given = false;
+ /* Initialize optional parameters to defaults */
data->binary = false;
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
+ data->publish_no_origin = false;
foreach(lc, options)
{
@@ -1789,7 +1791,7 @@ LoadPublications(List *pubnames)
else
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipped loading publication: %s", pubname),
+ errmsg("skipped loading publication \"%s\"", pubname),
errdetail("The publication does not exist at this point in the WAL."),
errhint("Create the publication if it does not exist."));
}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 7440aae5a1a..8a649199ec6 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -279,7 +279,7 @@ alter_replication_slot:
;
/*
- * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %u]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%08X [TIMELINE %u]
*/
start_replication:
K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
@@ -295,7 +295,7 @@ start_replication:
}
;
-/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+/* START_REPLICATION SLOT slot LOGICAL %X/%08X options */
start_logical_replication:
K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
{
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 014ea8d25c6..b6930e28659 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -155,7 +155,7 @@ UPLOAD_MANIFEST { return K_UPLOAD_MANIFEST; }
{hexdigit}+\/{hexdigit}+ {
uint32 hi,
lo;
- if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
+ if (sscanf(yytext, "%X/%08X", &hi, &lo) != 2)
replication_yyerror(NULL, yyscanner, "invalid streaming start location");
yylval->recptr = ((uint64) hi) << 32 | lo;
return RECPTR;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87fa9cb..8605776ad86 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
@@ -154,7 +155,7 @@ int max_replication_slots = 10; /* the maximum number of replication
* Invalidate replication slots that have remained idle longer than this
* duration; '0' disables it.
*/
-int idle_replication_slot_timeout_mins = 0;
+int idle_replication_slot_timeout_secs = 0;
/*
* This GUC lists streaming replication standby server slot names that
@@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
static void ReplicationSlotShmemExit(int code, Datum arg);
+static bool IsSlotForConflictCheck(const char *name);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
@@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg)
/*
* Check whether the passed slot name is valid and report errors at elevel.
*
+ * An error will be reported for a reserved replication slot name if
+ * allow_reserved_name is set to false.
+ *
* Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
* the name to be used as a directory name on every supported OS.
*
* Returns whether the directory name is valid or not if elevel < ERROR.
*/
bool
-ReplicationSlotValidateName(const char *name, int elevel)
+ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+ int elevel)
{
const char *cp;
@@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel)
return false;
}
}
+
+ if (!allow_reserved_name && IsSlotForConflictCheck(name))
+ {
+ ereport(elevel,
+ errcode(ERRCODE_RESERVED_NAME),
+ errmsg("replication slot name \"%s\" is reserved",
+ name),
+ errdetail("The name \"%s\" is reserved for the conflict detection slot.",
+ CONFLICT_DETECTION_SLOT));
+
+ return false;
+ }
+
return true;
}
/*
+ * Return true if the replication slot name is "pg_conflict_detection".
+ */
+static bool
+IsSlotForConflictCheck(const char *name)
+{
+ return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
+}
+
+/*
* Create a new replication slot and mark it as used by this backend.
*
* name: Name of the slot
@@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
Assert(MyReplicationSlot == NULL);
- ReplicationSlotValidateName(name, ERROR);
+ /*
+ * The logical launcher or pg_upgrade may create or migrate an internal
+ * slot, so using a reserved name is allowed in these cases.
+ */
+ ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
+ ERROR);
if (failover)
{
@@ -424,6 +457,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+ slot->last_saved_restart_lsn = InvalidXLogRecPtr;
slot->inactive_since = 0;
/*
@@ -581,6 +615,17 @@ retry:
}
/*
+ * Do not allow users to acquire the reserved slot. This scenario may
+ * occur if the launcher that owns the slot has terminated unexpectedly
+ * due to an error, and a backend process attempts to reuse the slot.
+ */
+ if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("cannot acquire replication slot \"%s\"", name),
+ errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
+
+ /*
* This is the slot we want; check if it's active under some other
* process. In single user mode, we don't need this check.
*/
@@ -1165,20 +1210,41 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
restart_lsn < min_required))
@@ -1216,7 +1282,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
s = &ReplicationSlotCtl->replication_slots[i];
@@ -1230,14 +1298,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn == InvalidXLogRecPtr)
continue;
@@ -1455,6 +1542,7 @@ ReplicationSlotReserveWal(void)
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+ Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
/*
* The replication slot mechanism is used to prevent removal of required
@@ -1547,8 +1635,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
uint64 ex = oldestLSN - restart_lsn;
appendStringInfo(&err_detail,
- ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
- "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
+ ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
+ "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
ex),
LSN_FORMAT_ARGS(restart_lsn),
ex);
@@ -1568,13 +1656,10 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
case RS_INVAL_IDLE_TIMEOUT:
{
- int minutes = slot_idle_seconds / SECS_PER_MINUTE;
- int secs = slot_idle_seconds % SECS_PER_MINUTE;
-
/* translator: %s is a GUC variable name */
- appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
- minutes, secs, "idle_replication_slot_timeout",
- idle_replication_slot_timeout_mins);
+ appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
+ slot_idle_seconds, "idle_replication_slot_timeout",
+ idle_replication_slot_timeout_secs);
/* translator: %s is a GUC variable name */
appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
"idle_replication_slot_timeout");
@@ -1612,7 +1697,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
static inline bool
CanInvalidateIdleSlot(ReplicationSlot *s)
{
- return (idle_replication_slot_timeout_mins != 0 &&
+ return (idle_replication_slot_timeout_secs != 0 &&
!XLogRecPtrIsInvalid(s->data.restart_lsn) &&
s->inactive_since > 0 &&
!(RecoveryInProgress() && s->data.synced));
@@ -1673,9 +1758,9 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
if (CanInvalidateIdleSlot(s))
{
/*
- * We simulate the invalidation due to idle_timeout as the minimum
- * time idle time is one minute which makes tests take a long
- * time.
+ * Simulate the invalidation due to idle_timeout to test the
+ * timeout behavior promptly, without waiting for it to trigger
+ * naturally.
*/
#ifdef USE_INJECTION_POINTS
if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
@@ -1690,7 +1775,7 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
* idle_replication_slot_timeout GUC.
*/
if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
- idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
+ idle_replication_slot_timeout_secs))
{
*inactive_since = s->inactive_since;
return RS_INVAL_IDLE_TIMEOUT;
@@ -1835,7 +1920,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* just rely on .invalidated.
*/
if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+ {
s->data.restart_lsn = InvalidXLogRecPtr;
+ s->last_saved_restart_lsn = InvalidXLogRecPtr;
+ }
/* Let caller know */
*invalidated = true;
@@ -1844,15 +1932,6 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
SpinLockRelease(&s->mutex);
/*
- * The logical replication slots shouldn't be invalidated as GUC
- * max_slot_wal_keep_size is set to -1 and
- * idle_replication_slot_timeout is set to 0 during the binary
- * upgrade. See check_old_cluster_for_valid_slots() where we ensure
- * that no invalidated before the upgrade.
- */
- Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
-
- /*
* Calculate the idle time duration of the slot if slot is marked
* invalidated with RS_INVAL_IDLE_TIMEOUT.
*/
@@ -1998,6 +2077,10 @@ restart:
if (!s->in_use)
continue;
+ /* Prevent invalidation of logical slots during binary upgrade */
+ if (SlotIsLogical(s) && IsBinaryUpgrade)
+ continue;
+
if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
snapshotConflictHorizon,
&invalidated))
@@ -2032,6 +2115,7 @@ void
CheckPointReplicationSlots(bool is_shutdown)
{
int i;
+ bool last_saved_restart_lsn_updated = false;
elog(DEBUG1, "performing replication slot checkpoint");
@@ -2076,9 +2160,23 @@ CheckPointReplicationSlots(bool is_shutdown)
SpinLockRelease(&s->mutex);
}
+ /*
+ * Track if we're going to update slot's last_saved_restart_lsn. We
+ * need this to know if we need to recompute the required LSN.
+ */
+ if (s->last_saved_restart_lsn != s->data.restart_lsn)
+ last_saved_restart_lsn_updated = true;
+
SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);
+
+ /*
+ * Recompute the required LSN if SaveSlotToPath() updated
+ * last_saved_restart_lsn for any slot.
+ */
+ if (last_saved_restart_lsn_updated)
+ ReplicationSlotsComputeRequiredLSN();
}
/*
@@ -2354,6 +2452,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
if (!slot->just_dirtied)
slot->dirty = false;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
SpinLockRelease(&slot->mutex);
LWLockRelease(&slot->io_in_progress_lock);
@@ -2569,6 +2668,7 @@ RestoreSlotFromDisk(const char *name)
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
@@ -2993,22 +3093,3 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
ConditionVariableCancelSleep();
}
-
-/*
- * GUC check_hook for idle_replication_slot_timeout
- *
- * The value of idle_replication_slot_timeout must be set to 0 during
- * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
- */
-bool
-check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
-{
- if (IsBinaryUpgrade && *newval != 0)
- {
- GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
- "idle_replication_slot_timeout");
- return false;
- }
-
- return true;
-}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 36cc2ed4e44..69f4c6157c5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -566,7 +566,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
if (moveto < minlsn)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
+ errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
/* Do the actual slot update, depending on the slot type */
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index cc35984ad00..32cf3a48b89 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -258,7 +258,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char buffer[32];
- sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
+ sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
set_ps_display_suffix(buffer);
}
@@ -566,7 +566,7 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
numapply, LSN_FORMAT_ARGS(applyPtr));
diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l
index 7dec1f869c7..02004d621e7 100644
--- a/src/backend/replication/syncrep_scanner.l
+++ b/src/backend/replication/syncrep_scanner.l
@@ -157,17 +157,16 @@ syncrep_yyerror(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse
{
struct yyguts_t *yyg = (struct yyguts_t *) yyscanner; /* needed for yytext
* macro */
- char *syncrep_parse_error_msg = *syncrep_parse_error_msg_p;
/* report only the first error in a parse operation */
- if (syncrep_parse_error_msg)
+ if (*syncrep_parse_error_msg_p)
return;
if (yytext[0])
- syncrep_parse_error_msg = psprintf("%s at or near \"%s\"",
- message, yytext);
+ *syncrep_parse_error_msg_p = psprintf("%s at or near \"%s\"",
+ message, yytext);
else
- syncrep_parse_error_msg = psprintf("%s at end of input",
- message);
+ *syncrep_parse_error_msg_p = psprintf("%s at end of input",
+ message);
}
void
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8c4d0fd9aed..b6281101711 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -386,12 +386,12 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
{
if (first_stream)
ereport(LOG,
- (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
+ LSN_FORMAT_ARGS(startpoint), startpointTLI));
else
ereport(LOG,
- (errmsg("restarted WAL streaming at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ errmsg("restarted WAL streaming at %X/%08X on timeline %u",
+ LSN_FORMAT_ARGS(startpoint), startpointTLI));
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
@@ -470,7 +470,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
- errdetail("End of WAL reached on timeline %u at %X/%X.",
+ errdetail("End of WAL reached on timeline %u at %X/%08X.",
startpointTLI,
LSN_FORMAT_ARGS(LogstreamResult.Write))));
endofwal = true;
@@ -711,7 +711,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
LSN_FORMAT_ARGS(*startpoint));
set_ps_display(activitymsg);
}
@@ -1014,7 +1014,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
LSN_FORMAT_ARGS(LogstreamResult.Write));
set_ps_display(activitymsg);
}
@@ -1138,7 +1138,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
+ elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f2c33250e8b..ee911394a23 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -65,6 +65,7 @@
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
@@ -84,6 +85,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -408,7 +411,7 @@ IdentifySystem(void)
else
logptr = GetFlushRecPtr(&currTLI);
- snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
+ snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
if (MyDatabaseId != InvalidOid)
{
@@ -515,7 +518,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
{
char xloc[64];
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
values[i] = CStringGetTextDatum(xloc);
nulls[i] = false;
@@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
- case 'c': /* CopyDone */
- case 'f': /* CopyFail */
- case 'H': /* Flush */
- case 'S': /* Sync */
+ case PqMsg_CopyDone:
+ case PqMsg_CopyFail:
+ case PqMsg_Flush:
+ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
- case 'c': /* CopyDone */
+ case PqMsg_CopyDone:
return false;
- case 'H': /* Sync */
- case 'S': /* Flush */
+ case PqMsg_Sync:
+ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
- case 'f':
+ case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
@@ -892,12 +895,12 @@ StartReplication(StartReplicationCmd *cmd)
switchpoint < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
- LSN_FORMAT_ARGS(cmd->startpoint),
- cmd->timeline),
- errdetail("This server's history forked from timeline %u at %X/%X.",
- cmd->timeline,
- LSN_FORMAT_ARGS(switchpoint))));
+ errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ cmd->timeline),
+ errdetail("This server's history forked from timeline %u at %X/%08X.",
+ cmd->timeline,
+ LSN_FORMAT_ARGS(switchpoint)));
}
sendTimeLineValidUpto = switchpoint;
}
@@ -939,9 +942,9 @@ StartReplication(StartReplicationCmd *cmd)
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
- LSN_FORMAT_ARGS(cmd->startpoint),
- LSN_FORMAT_ARGS(FlushPtr))));
+ errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ LSN_FORMAT_ARGS(FlushPtr)));
}
/* Start streaming from the requested point */
@@ -983,7 +986,7 @@ StartReplication(StartReplicationCmd *cmd)
Datum values[2];
bool nulls[2] = {0};
- snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+ snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
LSN_FORMAT_ARGS(sendTimeLineValidUpto));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -1324,7 +1327,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ReplicationSlotPersist();
}
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
tmpbuf.data, sizeof(int64));
/* output previously gathered data in a CopyData packet */
- pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
CHECK_FOR_INTERRUPTS();
@@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
}
@@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
+ case 'p':
+ ProcessStandbyPSRequestMessage();
+ break;
+
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2429,7 +2436,7 @@ ProcessStandbyReplyMessage(void)
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr),
@@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ TransactionId oldestXidInCommit;
+ FullTransactionId nextFullXid;
+ FullTransactionId fullOldestXidInCommit;
+ WalSnd *walsnd = MyWalSnd;
+ TimestampTz replyTime;
+
+ /*
+ * This shouldn't happen because we don't support getting primary status
+ * message from standby.
+ */
+ if (RecoveryInProgress())
+ elog(ERROR, "the primary status is unavailable during recovery");
+
+ replyTime = pq_getmsgint64(&reply_message);
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * Consider transactions in the current database, as only these are the
+ * ones replicated.
+ */
+ oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ nextFullXid = ReadNextFullTransactionId();
+ fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+ oldestXidInCommit);
+ lsn = GetXLogWriteRecPtr();
+
+ elog(DEBUG2, "sending primary status");
+
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 's');
+ pq_sendint64(&output_message, lsn);
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+ pq_sendint64(&output_message, GetCurrentTimestamp());
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
+}
+
+/*
* Compute how long send/receive loops should sleep.
*
* If wal_sender_timeout is enabled we want to wake up in time to send
@@ -3246,12 +3307,12 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader);
/* Send CopyDone */
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
- elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
LSN_FORMAT_ARGS(sendTimeLineValidUpto),
LSN_FORMAT_ARGS(sentPtr));
return;
@@ -3374,7 +3435,7 @@ retry:
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
sentPtr = endptr;
@@ -3392,7 +3453,7 @@ retry:
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
@@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
/* Set local flag */
if (requestReply)