aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/launcher.c44
-rw-r--r--src/backend/replication/logical/logical.c32
-rw-r--r--src/backend/replication/logical/origin.c6
-rw-r--r--src/backend/replication/logical/reorderbuffer.c198
-rw-r--r--src/backend/replication/logical/slotsync.c12
-rw-r--r--src/backend/replication/logical/snapbuild.c58
-rw-r--r--src/backend/replication/logical/tablesync.c21
-rw-r--r--src/backend/replication/logical/worker.c51
8 files changed, 312 insertions, 110 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..4aed0dfcebb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
{
- BgwHandleStatus status;
- int rc;
+ bool result = false;
+ bool dropped_latch = false;
for (;;)
{
+ BgwHandleStatus status;
pid_t pid;
+ int rc;
CHECK_FOR_INTERRUPTS();
@@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
+ result = worker->in_use;
LWLockRelease(LogicalRepWorkerLock);
- return worker->in_use;
+ break;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return false;
+ break; /* result is already false */
}
/*
@@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
+ dropped_latch = true;
}
}
+
+ /*
+ * If we had to clear a latch event in order to wait, be sure to restore
+ * it before exiting. Otherwise caller may miss events.
+ */
+ if (dropped_latch)
+ SetLatch(MyLatch);
+
+ return result;
}
/*
@@ -328,7 +341,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
if (max_active_replication_origins == 0)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
+ errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
/*
* We need to do the modification of the shared memory under lock so that
@@ -1016,7 +1029,7 @@ logicalrep_launcher_attach_dshmem(void)
last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
dsa_pin_mapping(last_start_times_dsa);
last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
- LogicalRepCtx->last_start_dsh, 0);
+ LogicalRepCtx->last_start_dsh, NULL);
}
MemoryContextSwitchTo(oldcontext);
@@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(WORKERTYPE_APPLY,
- sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid,
- DSM_HANDLE_INVALID);
+ if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid,
+ DSM_HANDLE_INVALID))
+ {
+ /*
+ * We get here either if we failed to launch a worker
+ * (perhaps for resource-exhaustion reasons) or if we
+ * launched one but it immediately quit. Either way, it
+ * seems appropriate to try again after
+ * wal_retrieve_retry_interval.
+ */
+ wait_time = Min(wait_time,
+ wal_retrieve_retry_interval);
+ }
}
else
{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..7e363a7c05b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -565,7 +567,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* kinds of client errors; so the client may wish to check that
* confirmed_flush_lsn matches its expectations.
*/
- elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
+ elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
LSN_FORMAT_ARGS(start_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush));
@@ -608,7 +610,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ereport(LOG,
(errmsg("starting logical decoding for slot \"%s\"",
NameStr(slot->data.name)),
- errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
+ errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(slot->data.restart_lsn))));
@@ -635,7 +637,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
/* Initialize from where to start reading WAL. */
XLogBeginRead(ctx->reader, slot->data.restart_lsn);
- elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
+ elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
LSN_FORMAT_ARGS(slot->data.restart_lsn));
/* Wait for a consistent starting point */
@@ -756,7 +758,7 @@ output_plugin_error_callback(void *arg)
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
- errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
+ errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
@@ -1723,7 +1725,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockRelease(&slot->mutex);
if (got_new_xmin)
- elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
+ elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
LSN_FORMAT_ARGS(current_lsn));
/* candidate already valid with the current flush position, apply */
@@ -1783,7 +1785,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
slot->candidate_restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
+ elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn));
}
@@ -1798,7 +1800,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
+ elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
LSN_FORMAT_ARGS(candidate_restart_lsn),
@@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
+ XLogRecPtr restart_lsn pg_attribute_unused();
SpinLockAcquire(&MyReplicationSlot->mutex);
+ /* remember the old restart lsn */
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+
/*
* Prevent moving the confirmed_flush backwards, as this could lead to
* data duplication issues caused by replicating already replicated
@@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
+#ifdef USE_INJECTION_POINTS
+ XLogSegNo seg1,
+ seg2;
+
+ XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+ XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+ /* trigger injection point, but only if segment changes */
+ if (seg1 != seg2)
+ INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index a17bacf88e7..87f10e50dcc 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -826,9 +826,9 @@ StartupReplicationOrigin(void)
last_state++;
ereport(LOG,
- (errmsg("recovered replication state of node %d to %X/%X",
- disk_state.roident,
- LSN_FORMAT_ARGS(disk_state.remote_lsn))));
+ errmsg("recovered replication state of node %d to %X/%08X",
+ disk_state.roident,
+ LSN_FORMAT_ARGS(disk_state.remote_lsn)));
}
/* now check checksum */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 67655111875..7b4e8629553 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -109,10 +109,22 @@
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+ ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ }
+
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
@@ -1397,7 +1415,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 off;
/* nothing there anymore */
- if (state->heap->bh_size == 0)
+ if (binaryheap_empty(state->heap))
return NULL;
off = DatumGetInt32(binaryheap_first(state->heap));
@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(txn->ninvalidations,
- txn->invalidations);
+ if (rbtxn_distr_inval_overflowed(txn))
+ {
+ Assert(txn->ninvalidations_distributed == 0);
+ InvalidateSystemCaches();
+ }
+ else
+ {
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+ txn->invalidations_distributed);
+ }
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
* We might have decoded changes for this transaction that could load
* the cache as per the current transaction's view (consider DDL's
* happened in this transaction). We don't want the decoding of future
- * transactions to use those cache entries so execute invalidations.
+ * transactions to use those cache entries so execute only the inval
+ * messages in this transaction.
*/
if (txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
txn->final_lsn = lsn;
/*
- * Process cache invalidation messages if there are any. Even if we're not
- * interested in the transaction's contents, it could have manipulated the
- * catalog and we need to update the caches according to that.
+ * Process only cache invalidation messages in this transaction if there
+ * are any. Even if we're not interested in the transaction's contents, it
+ * could have manipulated the catalog and we need to update the caches
+ * according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferChange *change;
+
+ change = ReorderBufferAllocChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+ uint32 *ninvals_out,
+ SharedInvalidationMessage *msgs_new,
+ Size nmsgs_new)
+{
+ if (*ninvals_out == 0)
+ {
+ *ninvals_out = nmsgs_new;
+ *invals_out = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+ memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+ }
+ else
+ {
+ /* Enlarge the array of inval messages */
+ *invals_out = (SharedInvalidationMessage *)
+ repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+ (*ninvals_out + nmsgs_new));
+ memcpy(*invals_out + *ninvals_out, msgs_new,
+ nmsgs_new * sizeof(SharedInvalidationMessage));
+ *ninvals_out += nmsgs_new;
+ }
+}
+
+/*
* Accumulate the invalidations for executing them later.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferTXN *txn;
MemoryContext oldcontext;
- ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
Assert(nmsgs > 0);
- /* Accumulate invalidations. */
- if (txn->ninvalidations == 0)
- {
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
- }
- else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations,
+ &txn->ninvalidations,
+ msgs, nmsgs);
+
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs)
+{
+ ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together. See comments
+ * ReorderBufferAddInvalidations.
+ */
+ txn = rbtxn_get_toptxn(txn);
+
+ Assert(nmsgs > 0);
+
+ if (!rbtxn_distr_inval_overflowed(txn))
{
- txn->invalidations = (SharedInvalidationMessage *)
- repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
- (txn->ninvalidations + nmsgs));
+ /*
+ * Check the transaction has enough space for storing distributed
+ * invalidation messages.
+ */
+ if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+ {
+ /*
+ * Mark the invalidation message as overflowed and free up the
+ * messages accumulated so far.
+ */
+ txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
- memcpy(txn->invalidations + txn->ninvalidations, msgs,
- nmsgs * sizeof(SharedInvalidationMessage));
- txn->ninvalidations += nmsgs;
+ if (txn->invalidations_distributed)
+ {
+ pfree(txn->invalidations_distributed);
+ txn->invalidations_distributed = NULL;
+ txn->ninvalidations_distributed = 0;
+ }
+ }
+ else
+ ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+ &txn->ninvalidations_distributed,
+ msgs, nmsgs);
}
- change = ReorderBufferAllocChange(rb);
- change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
- change->data.inval.ninvalidations = nmsgs;
- change->data.inval.invalidations = (SharedInvalidationMessage *)
- palloc(sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(change->data.inval.invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
-
- ReorderBufferQueueChange(rb, xid, lsn, change, false);
+ /* Queue the invalidation messages into the transaction */
+ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..2f0c08b8fbd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -211,9 +211,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* impact the users, so we used DEBUG1 level to log the message.
*/
ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
- errmsg("could not synchronize replication slot \"%s\" because remote slot precedes local slot",
+ errmsg("could not synchronize replication slot \"%s\"",
remote_slot->name),
- errdetail("The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
+ errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
@@ -275,7 +275,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
ereport(ERROR,
errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
remote_slot->name),
- errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+ errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
}
@@ -593,7 +593,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ereport(LOG,
errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
- errdetail("Logical decoding could not find consistent point from local slot's LSN %X/%X.",
+ errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
return false;
@@ -642,7 +642,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr)));
@@ -733,7 +733,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(ERROR,
errmsg_internal("cannot synchronize local slot \"%s\"",
remote_slot->name),
- errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+ errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0d7bddbe4ed..8532bfd27e5 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -774,7 +774,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
if (rbtxn_is_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
txn->xid, LSN_FORMAT_ARGS(lsn));
/*
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
* contents built by the current transaction even after its decoding,
* which should have been invalidated due to concurrent catalog
* changing transaction.
+ *
+ * Distribute only the invalidation messages generated by the current
+ * committed transaction. Invalidation messages received from other
+ * transactions would have already been propagated to the relevant
+ * in-progress transactions. This transaction would have processed
+ * those invalidations, ensuring that subsequent transactions observe
+ * a consistent cache state.
*/
if (txn->xid != xid)
{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
{
Assert(msgs != NULL);
- ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
- ninvalidations, msgs);
+ ReorderBufferAddDistributedInvalidations(builder->reorder,
+ txn->xid, lsn,
+ ninvalidations, msgs);
}
}
}
@@ -1263,10 +1271,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->initial_xmin_horizon))
{
ereport(DEBUG1,
- (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- LSN_FORMAT_ARGS(lsn)),
- errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ builder->initial_xmin_horizon, running->oldestRunningXid));
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
@@ -1302,9 +1310,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no running transactions.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no running transactions."));
return false;
}
@@ -1351,10 +1359,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmax));
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial starting point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1375,10 +1383,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = running->nextXid;
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1399,9 +1407,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no old transactions anymore.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no old transactions anymore."));
}
/*
@@ -1905,9 +1913,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Assert(builder->state == SNAPBUILD_CONSISTENT);
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Logical decoding will begin using saved snapshot.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Logical decoding will begin using saved snapshot."));
return true;
snapshot_not_interesting:
@@ -2053,7 +2061,7 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
int ret;
struct stat stat_buf;
- sprintf(path, "%s/%X-%X.snap",
+ sprintf(path, "%s/%08X-%08X.snap",
PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..e4fd6347fd1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -603,14 +603,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID);
+ /*
+ * Set the last_start_time even if we fail to start
+ * the worker, so that we won't retry until
+ * wal_retrieve_retry_interval has elapsed.
+ */
hentry->last_start_time = now;
+ (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid,
+ DSM_HANDLE_INVALID);
}
}
}
@@ -1548,7 +1553,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table_done:
elog(DEBUG1,
- "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
originname, LSN_FORMAT_ARGS(*origin_startpos));
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..c5fb627aa56 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,13 +109,6 @@
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
- * We don't allow to toggle two_phase option of a subscription because it can
- * lead to an inconsistent replica. Consider, initially, it was on and we have
- * received some prepare then we turn it off, now at commit time the server
- * will send the entire transaction data along with the commit. With some more
- * analysis, we can allow changing this option from off to on but not sure if
- * that alone would be useful.
- *
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
@@ -1023,7 +1016,7 @@ apply_handle_commit(StringInfo s)
if (commit_data.commit_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -1115,7 +1108,7 @@ apply_handle_prepare(StringInfo s)
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -3910,7 +3903,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
@@ -4626,8 +4619,16 @@ run_apply_worker()
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so
+ * ensure we have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ PopActiveSnapshot();
CommitTransactionCommand();
}
else
@@ -4843,7 +4844,15 @@ DisableSubscriptionAndExit(void)
/* Disable the subscription */
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
DisableSubscription(MySubscription->oid);
+ PopActiveSnapshot();
CommitTransactionCommand();
/* Ensure we remove no-longer-useful entry for worker's start time */
@@ -4900,7 +4909,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
skip_xact_finish_lsn = finish_lsn;
ereport(LOG,
- errmsg("logical replication starts skipping transaction at LSN %X/%X",
+ errmsg("logical replication starts skipping transaction at LSN %X/%08X",
LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
}
@@ -4914,8 +4923,8 @@ stop_skipping_changes(void)
return;
ereport(LOG,
- (errmsg("logical replication completed skipping transaction at LSN %X/%X",
- LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+ errmsg("logical replication completed skipping transaction at LSN %X/%08X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
/* Stop skipping changes */
skip_xact_finish_lsn = InvalidXLogRecPtr;
@@ -4948,6 +4957,12 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /*
* Protect subskiplsn of pg_subscription from being concurrently updated
* while clearing it.
*/
@@ -4997,7 +5012,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
if (myskiplsn != finish_lsn)
ereport(WARNING,
errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
- errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
LSN_FORMAT_ARGS(finish_lsn),
LSN_FORMAT_ARGS(myskiplsn)));
}
@@ -5005,6 +5020,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
heap_freetuple(tup);
table_close(rel, NoLock);
+ PopActiveSnapshot();
+
if (started_tx)
CommitTransactionCommand();
}
@@ -5032,7 +5049,7 @@ apply_error_callback(void *arg)
logicalrep_message_type(errarg->command),
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
@@ -5050,7 +5067,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.relname,
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
@@ -5069,7 +5086,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,