aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c2
-rw-r--r--src/backend/replication/logical/launcher.c44
-rw-r--r--src/backend/replication/logical/logical.c32
-rw-r--r--src/backend/replication/logical/origin.c6
-rw-r--r--src/backend/replication/logical/reorderbuffer.c198
-rw-r--r--src/backend/replication/logical/slotsync.c12
-rw-r--r--src/backend/replication/logical/snapbuild.c58
-rw-r--r--src/backend/replication/logical/tablesync.c21
-rw-r--r--src/backend/replication/logical/worker.c51
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c2
-rw-r--r--src/backend/replication/repl_gram.y4
-rw-r--r--src/backend/replication/repl_scanner.l2
-rw-r--r--src/backend/replication/slot.c68
-rw-r--r--src/backend/replication/slotfuncs.c2
-rw-r--r--src/backend/replication/syncrep.c4
-rw-r--r--src/backend/replication/walreceiver.c16
-rw-r--r--src/backend/replication/walsender.c42
17 files changed, 419 insertions, 145 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7b4ddf7a8f5..f7b5d093681 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -534,7 +534,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
if (options->logical)
appendStringInfoString(&cmd, " LOGICAL");
- appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
+ appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
/*
* Additional options are different depending on if we are doing logical
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..4aed0dfcebb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
{
- BgwHandleStatus status;
- int rc;
+ bool result = false;
+ bool dropped_latch = false;
for (;;)
{
+ BgwHandleStatus status;
pid_t pid;
+ int rc;
CHECK_FOR_INTERRUPTS();
@@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
+ result = worker->in_use;
LWLockRelease(LogicalRepWorkerLock);
- return worker->in_use;
+ break;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return false;
+ break; /* result is already false */
}
/*
@@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
+ dropped_latch = true;
}
}
+
+ /*
+ * If we had to clear a latch event in order to wait, be sure to restore
+ * it before exiting. Otherwise caller may miss events.
+ */
+ if (dropped_latch)
+ SetLatch(MyLatch);
+
+ return result;
}
/*
@@ -328,7 +341,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
if (max_active_replication_origins == 0)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
+ errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
/*
* We need to do the modification of the shared memory under lock so that
@@ -1016,7 +1029,7 @@ logicalrep_launcher_attach_dshmem(void)
last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
dsa_pin_mapping(last_start_times_dsa);
last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
- LogicalRepCtx->last_start_dsh, 0);
+ LogicalRepCtx->last_start_dsh, NULL);
}
MemoryContextSwitchTo(oldcontext);
@@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(WORKERTYPE_APPLY,
- sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid,
- DSM_HANDLE_INVALID);
+ if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid,
+ DSM_HANDLE_INVALID))
+ {
+ /*
+ * We get here either if we failed to launch a worker
+ * (perhaps for resource-exhaustion reasons) or if we
+ * launched one but it immediately quit. Either way, it
+ * seems appropriate to try again after
+ * wal_retrieve_retry_interval.
+ */
+ wait_time = Min(wait_time,
+ wal_retrieve_retry_interval);
+ }
}
else
{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..7e363a7c05b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -565,7 +567,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* kinds of client errors; so the client may wish to check that
* confirmed_flush_lsn matches its expectations.
*/
- elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
+ elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
LSN_FORMAT_ARGS(start_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush));
@@ -608,7 +610,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ereport(LOG,
(errmsg("starting logical decoding for slot \"%s\"",
NameStr(slot->data.name)),
- errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
+ errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(slot->data.restart_lsn))));
@@ -635,7 +637,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
/* Initialize from where to start reading WAL. */
XLogBeginRead(ctx->reader, slot->data.restart_lsn);
- elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
+ elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
LSN_FORMAT_ARGS(slot->data.restart_lsn));
/* Wait for a consistent starting point */
@@ -756,7 +758,7 @@ output_plugin_error_callback(void *arg)
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
- errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
+ errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
@@ -1723,7 +1725,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockRelease(&slot->mutex);
if (got_new_xmin)
- elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
+ elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
LSN_FORMAT_ARGS(current_lsn));
/* candidate already valid with the current flush position, apply */
@@ -1783,7 +1785,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
slot->candidate_restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
+ elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn));
}
@@ -1798,7 +1800,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
+ elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
LSN_FORMAT_ARGS(candidate_restart_lsn),
@@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
+ XLogRecPtr restart_lsn pg_attribute_unused();
SpinLockAcquire(&MyReplicationSlot->mutex);
+ /* remember the old restart lsn */
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+
/*
* Prevent moving the confirmed_flush backwards, as this could lead to
* data duplication issues caused by replicating already replicated
@@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
+#ifdef USE_INJECTION_POINTS
+ XLogSegNo seg1,
+ seg2;
+
+ XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+ XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+ /* trigger injection point, but only if segment changes */
+ if (seg1 != seg2)
+ INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index a17bacf88e7..87f10e50dcc 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -826,9 +826,9 @@ StartupReplicationOrigin(void)
last_state++;
ereport(LOG,
- (errmsg("recovered replication state of node %d to %X/%X",
- disk_state.roident,
- LSN_FORMAT_ARGS(disk_state.remote_lsn))));
+ errmsg("recovered replication state of node %d to %X/%08X",
+ disk_state.roident,
+ LSN_FORMAT_ARGS(disk_state.remote_lsn)));
}
/* now check checksum */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 67655111875..7b4e8629553 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -109,10 +109,22 @@
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+ ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ }
+
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
@@ -1397,7 +1415,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 off;
/* nothing there anymore */
- if (state->heap->bh_size == 0)
+ if (binaryheap_empty(state->heap))
return NULL;
off = DatumGetInt32(binaryheap_first(state->heap));
@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations,
- txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
* We might have decoded changes for this transaction that could load
* the cache as per the current transaction's view (consider DDL's
* happened in this transaction). We don't want the decoding of future
- * transactions to use those cache entries so execute invalidations.
+ * transactions to use those cache entries so execute only the inval
+ * messages in this transaction.
*/
if (txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
txn->final_lsn = lsn;
/*
- * Process cache invalidation messages if there are any. Even if we're not
- * interested in the transaction's contents, it could have manipulated the
- * catalog and we need to update the caches according to that.
+ * Process only cache invalidation messages in this transaction if there
+ * are any. Even if we're not interested in the transaction's contents, it
+ * could have manipulated the catalog and we need to update the caches
+ * according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferChange *change;
+
+ change = ReorderBufferAllocChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+ uint32 *ninvals_out,
+ SharedInvalidationMessage *msgs_new,
+ Size nmsgs_new)
+{
+ if (*ninvals_out == 0)
+ {
+ *ninvals_out = nmsgs_new;
+ *invals_out = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+ memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+ }
+ else
+ {
+ /* Enlarge the array of inval messages */
+ *invals_out = (SharedInvalidationMessage *)
+ repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+ (*ninvals_out + nmsgs_new));
+ memcpy(*invals_out + *ninvals_out, msgs_new,
+ nmsgs_new * sizeof(SharedInvalidationMessage));
+ *ninvals_out += nmsgs_new;
+ }
+}
+
+/*
* Accumulate the invalidations for executing them later.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferTXN *txn;
MemoryContext oldcontext;
- ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
Assert(nmsgs > 0);
- /* Accumulate invalidations. */
- if (txn->ninvalidations == 0)
- {
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
- }
- else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations,
+ &txn->ninvalidations,
+ msgs, nmsgs);
+
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together. See comments
+ * ReorderBufferAddInvalidations.
+ */
+ txn = rbtxn_get_toptxn(txn);
+
+ Assert(nmsgs > 0);
+
+ if (!rbtxn_distr_inval_overflowed(txn))
{
- txn->invalidations = (SharedInvalidationMessage *)
- repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
- (txn->ninvalidations + nmsgs));
+ /*
+ * Check the transaction has enough space for storing distributed
+ * invalidation messages.
+ */
+ if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+ {
+ /*
+ * Mark the invalidation message as overflowed and free up the
+ * messages accumulated so far.
+ */
+ txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
- memcpy(txn->invalidations + txn->ninvalidations, msgs,
- nmsgs * sizeof(SharedInvalidationMessage));
- txn->ninvalidations += nmsgs;
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ txn->ninvalidations_distributed = 0;
+ }
+ }
+ else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+ &txn->ninvalidations_distributed,
+ msgs, nmsgs);
}
- change = ReorderBufferAllocChange(rb);
- change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
- change->data.inval.ninvalidations = nmsgs;
- change->data.inval.invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(change->data.inval.invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
-
- ReorderBufferQueueChange(rb, xid, lsn, change, false);
+ /* Queue the invalidation messages into the transaction */
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..2f0c08b8fbd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -211,9 +211,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* impact the users, so we used DEBUG1 level to log the message.
*/
ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
- errmsg("could not synchronize replication slot \"%s\" because remote slot precedes local slot",
+ errmsg("could not synchronize replication slot \"%s\"",
remote_slot->name),
- errdetail("The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
+ errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
@@ -275,7 +275,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
ereport(ERROR,
errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
remote_slot->name),
- errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+ errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
}
@@ -593,7 +593,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ereport(LOG,
errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
- errdetail("Logical decoding could not find consistent point from local slot's LSN %X/%X.",
+ errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
return false;
@@ -642,7 +642,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr)));
@@ -733,7 +733,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(ERROR,
errmsg_internal("cannot synchronize local slot \"%s\"",
remote_slot->name),
- errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+ errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0d7bddbe4ed..8532bfd27e5 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -774,7 +774,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
if (rbtxn_is_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
txn->xid, LSN_FORMAT_ARGS(lsn));
/*
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
* contents built by the current transaction even after its decoding,
* which should have been invalidated due to concurrent catalog
* changing transaction.
+ *
+ * Distribute only the invalidation messages generated by the current
+ * committed transaction. Invalidation messages received from other
+ * transactions would have already been propagated to the relevant
+ * in-progress transactions. This transaction would have processed
+ * those invalidations, ensuring that subsequent transactions observe
+ * a consistent cache state.
*/
if (txn->xid != xid)
{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
{
Assert(msgs != NULL);
- ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
- ninvalidations, msgs);
+ ReorderBufferAddDistributedInvalidations(builder->reorder,
+ txn->xid, lsn,
+ ninvalidations, msgs);
}
}
}
@@ -1263,10 +1271,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->initial_xmin_horizon))
{
ereport(DEBUG1,
- (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- LSN_FORMAT_ARGS(lsn)),
- errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ builder->initial_xmin_horizon, running->oldestRunningXid));
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
@@ -1302,9 +1310,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no running transactions.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no running transactions."));
return false;
}
@@ -1351,10 +1359,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmax));
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial starting point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1375,10 +1383,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = running->nextXid;
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1399,9 +1407,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no old transactions anymore.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no old transactions anymore."));
}
/*
@@ -1905,9 +1913,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Assert(builder->state == SNAPBUILD_CONSISTENT);
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Logical decoding will begin using saved snapshot.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Logical decoding will begin using saved snapshot."));
return true;
snapshot_not_interesting:
@@ -2053,7 +2061,7 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
int ret;
struct stat stat_buf;
- sprintf(path, "%s/%X-%X.snap",
+ sprintf(path, "%s/%08X-%08X.snap",
PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..e4fd6347fd1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -603,14 +603,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID);
+ /*
+ * Set the last_start_time even if we fail to start
+ * the worker, so that we won't retry until
+ * wal_retrieve_retry_interval has elapsed.
+ */
hentry->last_start_time = now;
+ (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid,
+ DSM_HANDLE_INVALID);
}
}
}
@@ -1548,7 +1553,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table_done:
elog(DEBUG1,
- "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
originname, LSN_FORMAT_ARGS(*origin_startpos));
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..c5fb627aa56 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,13 +109,6 @@
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
- * We don't allow to toggle two_phase option of a subscription because it can
- * lead to an inconsistent replica. Consider, initially, it was on and we have
- * received some prepare then we turn it off, now at commit time the server
- * will send the entire transaction data along with the commit. With some more
- * analysis, we can allow changing this option from off to on but not sure if
- * that alone would be useful.
- *
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
@@ -1023,7 +1016,7 @@ apply_handle_commit(StringInfo s)
if (commit_data.commit_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -1115,7 +1108,7 @@ apply_handle_prepare(StringInfo s)
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -3910,7 +3903,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
@@ -4626,8 +4619,16 @@ run_apply_worker()
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so
+ * ensure we have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ PopActiveSnapshot();
CommitTransactionCommand();
}
else
@@ -4843,7 +4844,15 @@ DisableSubscriptionAndExit(void)
/* Disable the subscription */
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
DisableSubscription(MySubscription->oid);
+ PopActiveSnapshot();
CommitTransactionCommand();
/* Ensure we remove no-longer-useful entry for worker's start time */
@@ -4900,7 +4909,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
skip_xact_finish_lsn = finish_lsn;
ereport(LOG,
- errmsg("logical replication starts skipping transaction at LSN %X/%X",
+ errmsg("logical replication starts skipping transaction at LSN %X/%08X",
LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
}
@@ -4914,8 +4923,8 @@ stop_skipping_changes(void)
return;
ereport(LOG,
- (errmsg("logical replication completed skipping transaction at LSN %X/%X",
- LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+ errmsg("logical replication completed skipping transaction at LSN %X/%08X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
/* Stop skipping changes */
skip_xact_finish_lsn = InvalidXLogRecPtr;
@@ -4948,6 +4957,12 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /*
* Protect subskiplsn of pg_subscription from being concurrently updated
* while clearing it.
*/
@@ -4997,7 +5012,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
if (myskiplsn != finish_lsn)
ereport(WARNING,
errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
- errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
LSN_FORMAT_ARGS(finish_lsn),
LSN_FORMAT_ARGS(myskiplsn)));
}
@@ -5005,6 +5020,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
heap_freetuple(tup);
table_close(rel, NoLock);
+ PopActiveSnapshot();
+
if (started_tx)
CommitTransactionCommand();
}
@@ -5032,7 +5049,7 @@ apply_error_callback(void *arg)
logicalrep_message_type(errarg->command),
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
@@ -5050,7 +5067,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.relname,
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
@@ -5069,7 +5086,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766e6d7..082b4d9d327 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1789,7 +1789,7 @@ LoadPublications(List *pubnames)
else
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipped loading publication: %s", pubname),
+ errmsg("skipped loading publication \"%s\"", pubname),
errdetail("The publication does not exist at this point in the WAL."),
errhint("Create the publication if it does not exist."));
}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 7440aae5a1a..8a649199ec6 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -279,7 +279,7 @@ alter_replication_slot:
;
/*
- * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %u]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%08X [TIMELINE %u]
*/
start_replication:
K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
@@ -295,7 +295,7 @@ start_replication:
}
;
-/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+/* START_REPLICATION SLOT slot LOGICAL %X/%08X options */
start_logical_replication:
K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
{
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 014ea8d25c6..b6930e28659 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -155,7 +155,7 @@ UPLOAD_MANIFEST { return K_UPLOAD_MANIFEST; }
{hexdigit}+\/{hexdigit}+ {
uint32 hi,
lo;
- if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
+ if (sscanf(yytext, "%X/%08X", &hi, &lo) != 2)
replication_yyerror(NULL, yyscanner, "invalid streaming start location");
yylval->recptr = ((uint64) hi) << 32 | lo;
return RECPTR;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87fa9cb..f369fce2485 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+ slot->last_saved_restart_lsn = InvalidXLogRecPtr;
slot->inactive_since = 0;
/*
@@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
restart_lsn < min_required))
@@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
+ XLogRecPtr last_saved_restart_lsn;
bool invalidated;
+ ReplicationSlotPersistency persistency;
s = &ReplicationSlotCtl->replication_slots[i];
@@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex);
+ persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
+ last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
if (invalidated)
continue;
+ /*
+ * For persistent slot use last_saved_restart_lsn to compute the
+ * oldest LSN for removal of WAL segments. The segments between
+ * last_saved_restart_lsn and restart_lsn might be needed by a
+ * persistent slot in the case of database crash. Non-persistent
+ * slots can't survive the database crash, so we don't care about
+ * last_saved_restart_lsn for them.
+ */
+ if (persistency == RS_PERSISTENT)
+ {
+ if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn > last_saved_restart_lsn)
+ {
+ restart_lsn = last_saved_restart_lsn;
+ }
+ }
+
if (restart_lsn == InvalidXLogRecPtr)
continue;
@@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void)
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+ Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
/*
* The replication slot mechanism is used to prevent removal of required
@@ -1547,8 +1591,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
uint64 ex = oldestLSN - restart_lsn;
appendStringInfo(&err_detail,
- ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
- "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
+ ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
+ "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
ex),
LSN_FORMAT_ARGS(restart_lsn),
ex);
@@ -1835,7 +1879,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* just rely on .invalidated.
*/
if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+ {
s->data.restart_lsn = InvalidXLogRecPtr;
+ s->last_saved_restart_lsn = InvalidXLogRecPtr;
+ }
/* Let caller know */
*invalidated = true;
@@ -2032,6 +2079,7 @@ void
CheckPointReplicationSlots(bool is_shutdown)
{
int i;
+ bool last_saved_restart_lsn_updated = false;
elog(DEBUG1, "performing replication slot checkpoint");
@@ -2076,9 +2124,23 @@ CheckPointReplicationSlots(bool is_shutdown)
SpinLockRelease(&s->mutex);
}
+ /*
+ * Track if we're going to update slot's last_saved_restart_lsn. We
+ * need this to know if we need to recompute the required LSN.
+ */
+ if (s->last_saved_restart_lsn != s->data.restart_lsn)
+ last_saved_restart_lsn_updated = true;
+
SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);
+
+ /*
+ * Recompute the required LSN if SaveSlotToPath() updated
+ * last_saved_restart_lsn for any slot.
+ */
+ if (last_saved_restart_lsn_updated)
+ ReplicationSlotsComputeRequiredLSN();
}
/*
@@ -2354,6 +2416,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
if (!slot->just_dirtied)
slot->dirty = false;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
SpinLockRelease(&slot->mutex);
LWLockRelease(&slot->io_in_progress_lock);
@@ -2569,6 +2632,7 @@ RestoreSlotFromDisk(const char *name)
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+ slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 36cc2ed4e44..69f4c6157c5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -566,7 +566,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
if (moveto < minlsn)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
+ errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
/* Do the actual slot update, depending on the slot type */
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index cc35984ad00..32cf3a48b89 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -258,7 +258,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char buffer[32];
- sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
+ sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
set_ps_display_suffix(buffer);
}
@@ -566,7 +566,7 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
numapply, LSN_FORMAT_ARGS(applyPtr));
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8c4d0fd9aed..b6281101711 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -386,12 +386,12 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
{
if (first_stream)
ereport(LOG,
- (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
+ LSN_FORMAT_ARGS(startpoint), startpointTLI));
else
ereport(LOG,
- (errmsg("restarted WAL streaming at %X/%X on timeline %u",
- LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+ errmsg("restarted WAL streaming at %X/%08X on timeline %u",
+ LSN_FORMAT_ARGS(startpoint), startpointTLI));
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
@@ -470,7 +470,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
- errdetail("End of WAL reached on timeline %u at %X/%X.",
+ errdetail("End of WAL reached on timeline %u at %X/%08X.",
startpointTLI,
LSN_FORMAT_ARGS(LogstreamResult.Write))));
endofwal = true;
@@ -711,7 +711,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
LSN_FORMAT_ARGS(*startpoint));
set_ps_display(activitymsg);
}
@@ -1014,7 +1014,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
LSN_FORMAT_ARGS(LogstreamResult.Write));
set_ps_display(activitymsg);
}
@@ -1138,7 +1138,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
+ elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..28b8591efa5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -408,7 +408,7 @@ IdentifySystem(void)
else
logptr = GetFlushRecPtr(&currTLI);
- snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
+ snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
if (MyDatabaseId != InvalidOid)
{
@@ -515,7 +515,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
{
char xloc[64];
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
values[i] = CStringGetTextDatum(xloc);
nulls[i] = false;
@@ -892,12 +892,12 @@ StartReplication(StartReplicationCmd *cmd)
switchpoint < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
- LSN_FORMAT_ARGS(cmd->startpoint),
- cmd->timeline),
- errdetail("This server's history forked from timeline %u at %X/%X.",
- cmd->timeline,
- LSN_FORMAT_ARGS(switchpoint))));
+ errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ cmd->timeline),
+ errdetail("This server's history forked from timeline %u at %X/%08X.",
+ cmd->timeline,
+ LSN_FORMAT_ARGS(switchpoint)));
}
sendTimeLineValidUpto = switchpoint;
}
@@ -939,9 +939,9 @@ StartReplication(StartReplicationCmd *cmd)
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
- (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
- LSN_FORMAT_ARGS(cmd->startpoint),
- LSN_FORMAT_ARGS(FlushPtr))));
+ errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
+ LSN_FORMAT_ARGS(cmd->startpoint),
+ LSN_FORMAT_ARGS(FlushPtr)));
}
/* Start streaming from the requested point */
@@ -983,7 +983,7 @@ StartReplication(StartReplicationCmd *cmd)
Datum values[2];
bool nulls[2] = {0};
- snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+ snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
LSN_FORMAT_ARGS(sendTimeLineValidUpto));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -1324,7 +1324,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ReplicationSlotPersist();
}
- snprintf(xloc, sizeof(xloc), "%X/%X",
+ snprintf(xloc, sizeof(xloc), "%X/%08X",
LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
dest = CreateDestReceiver(DestRemoteSimple);
@@ -2429,7 +2429,7 @@ ProcessStandbyReplyMessage(void)
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr),
@@ -3251,7 +3251,7 @@ XLogSendPhysical(void)
WalSndCaughtUp = true;
- elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
LSN_FORMAT_ARGS(sendTimeLineValidUpto),
LSN_FORMAT_ARGS(sentPtr));
return;
@@ -3392,7 +3392,7 @@ retry:
{
char activitymsg[50];
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
@@ -3449,8 +3449,16 @@ XLogSendLogical(void)
if (flushPtr == InvalidXLogRecPtr ||
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
{
+ /*
+ * For cascading logical WAL senders, we use the replay LSN instead of
+ * the flush LSN, since logical decoding on a standby only processes
+ * WAL that has been replayed. This distinction becomes particularly
+ * important during shutdown, as new WAL is no longer replayed and the
+ * last replayed LSN marks the furthest point up to which decoding can
+ * proceed.
+ */
if (am_cascading_walsender)
- flushPtr = GetStandbyFlushRecPtr(NULL);
+ flushPtr = GetXLogReplayRecPtr(NULL);
else
flushPtr = GetFlushRecPtr(NULL);
}