diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 44 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 41 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 198 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 12 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 19 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 31 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 2 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 64 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 10 |
10 files changed, 360 insertions, 67 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..4aed0dfcebb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle) { - BgwHandleStatus status; - int rc; + bool result = false; + bool dropped_latch = false; for (;;) { + BgwHandleStatus status; pid_t pid; + int rc; CHECK_FOR_INTERRUPTS(); @@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { + result = worker->in_use; LWLockRelease(LogicalRepWorkerLock); - return worker->in_use; + break; } LWLockRelease(LogicalRepWorkerLock); @@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, if (generation == worker->generation) logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); - return false; + break; /* result is already false */ } /* @@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, { ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); + dropped_latch = true; } } + + /* + * If we had to clear a latch event in order to wait, be sure to restore + * it before exiting. Otherwise caller may miss events. + */ + if (dropped_latch) + SetLatch(MyLatch); + + return result; } /* @@ -328,7 +341,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, if (max_active_replication_origins == 0) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0"))); + errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0"))); /* * We need to do the modification of the shared memory under lock so that @@ -1016,7 +1029,7 @@ logicalrep_launcher_attach_dshmem(void) last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); dsa_pin_mapping(last_start_times_dsa); last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, - LogicalRepCtx->last_start_dsh, 0); + LogicalRepCtx->last_start_dsh, NULL); } MemoryContextSwitchTo(oldcontext); @@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg) (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); + if (!logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID)) + { + /* + * We get here either if we failed to launch a worker + * (perhaps for resource-exhaustion reasons) or if we + * launched one but it immediately quit. Either way, it + * seems appropriate to try again after + * wal_retrieve_retry_interval. + */ + wait_time = Min(wait_time, + wal_retrieve_retry_interval); + } } else { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a8d2e024d34..f1eb798f3e9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/xlog_internal.h" #include "access/xlogutils.h" #include "fmgr.h" #include "miscadmin.h" @@ -41,6 +42,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/memutils.h" @@ -1825,10 +1827,26 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + XLogRecPtr restart_lsn pg_attribute_unused(); SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + /* remember the old restart lsn */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + + /* + * Prevent moving the confirmed_flush backwards, as this could lead to + * data duplication issues caused by replicating already replicated + * changes. + * + * This can happen when a client acknowledges an LSN it doesn't have + * to do anything for, and thus didn't store persistently. After a + * restart, the client can send the prior LSN that it stored + * persistently as an acknowledgement, but we need to ignore such an + * LSN. See similar case handling in CreateDecodingContext. + */ + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -1869,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) /* first write new xmin to disk, so we know what's up after a crash */ if (updated_xmin || updated_restart) { +#ifdef USE_INJECTION_POINTS + XLogSegNo seg1, + seg2; + + XLByteToSeg(restart_lsn, seg1, wal_segment_size); + XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size); + + /* trigger injection point, but only if segment changes */ + if (seg1 != seg2) + INJECTION_POINT("logical-replication-slot-advance-segment", NULL); +#endif + ReplicationSlotMarkDirty(); ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); @@ -1893,7 +1923,14 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) else { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + + /* + * Prevent moving the confirmed_flush backwards. See comments above + * for the details. + */ + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; + SpinLockRelease(&MyReplicationSlot->mutex); } } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 67655111875..7b4e8629553 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -109,10 +109,22 @@ #include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * Each transaction has an 8MB limit for invalidation messages distributed from + * other transactions. This limit is set considering scenarios with many + * concurrent logical decoding operations. When the distributed invalidation + * messages reach this threshold, the transaction is marked as + * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost + * some inval messages and hence don't know what needs to be invalidated. + */ +#define MAX_DISTR_INVAL_MSG_PER_TXN \ + ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage)) + /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + } + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); @@ -1397,7 +1415,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) int32 off; /* nothing there anymore */ - if (state->heap->bh_size == 0) + if (binaryheap_empty(state->heap)) return NULL; off = DatumGetInt32(binaryheap_first(state->heap)); @@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, - txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, * We might have decoded changes for this transaction that could load * the cache as per the current transaction's view (consider DDL's * happened in this transaction). We don't want the decoding of future - * transactions to use those cache entries so execute invalidations. + * transactions to use those cache entries so execute only the inval + * messages in this transaction. */ if (txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) txn->final_lsn = lsn; /* - * Process cache invalidation messages if there are any. Even if we're not - * interested in the transaction's contents, it could have manipulated the - * catalog and we need to update the caches according to that. + * Process only cache invalidation messages in this transaction if there + * are any. Even if we're not interested in the transaction's contents, it + * could have manipulated the catalog and we need to update the caches + * according to that. */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3422,6 +3461,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, } /* + * Add new invalidation messages to the reorder buffer queue. + */ +static void +ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferChange *change; + + change = ReorderBufferAllocChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); +} + +/* + * A helper function for ReorderBufferAddInvalidations() and + * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation + * messages to the **invals_out. + */ +static void +ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, + uint32 *ninvals_out, + SharedInvalidationMessage *msgs_new, + Size nmsgs_new) +{ + if (*ninvals_out == 0) + { + *ninvals_out = nmsgs_new; + *invals_out = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs_new); + memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new); + } + else + { + /* Enlarge the array of inval messages */ + *invals_out = (SharedInvalidationMessage *) + repalloc(*invals_out, sizeof(SharedInvalidationMessage) * + (*ninvals_out + nmsgs_new)); + memcpy(*invals_out + *ninvals_out, msgs_new, + nmsgs_new * sizeof(SharedInvalidationMessage)); + *ninvals_out += nmsgs_new; + } +} + +/* * Accumulate the invalidations for executing them later. * * This needs to be called for each XLOG_XACT_INVALIDATIONS message and @@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { ReorderBufferTXN *txn; MemoryContext oldcontext; - ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) - { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else + ReorderBufferAccumulateInvalidations(&txn->invalidations, + &txn->ninvalidations, + msgs, nmsgs); + + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Accumulate the invalidations distributed by other committed transactions + * for executing them later. + * + * This function is similar to ReorderBufferAddInvalidations() but stores + * the given inval messages to the txn->invalidations_distributed with the + * overflow check. + * + * This needs to be called by committed transactions to distribute their + * inval messages to in-progress transactions. + */ +void +ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferTXN *txn; + MemoryContext oldcontext; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction, if available, + * so that we can execute them all together. See comments + * ReorderBufferAddInvalidations. + */ + txn = rbtxn_get_toptxn(txn); + + Assert(nmsgs > 0); + + if (!rbtxn_distr_inval_overflowed(txn)) { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + /* + * Check the transaction has enough space for storing distributed + * invalidation messages. + */ + if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN) + { + /* + * Mark the invalidation message as overflowed and free up the + * messages accumulated so far. + */ + txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED; - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + txn->ninvalidations_distributed = 0; + } + } + else + ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed, + &txn->ninvalidations_distributed, + msgs, nmsgs); } - change = ReorderBufferAllocChange(rb); - change->action = REORDER_BUFFER_CHANGE_INVALIDATION; - change->data.inval.ninvalidations = nmsgs; - change->data.inval.invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(change->data.inval.invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - - ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* Queue the invalidation messages into the transaction */ + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 656e66e0ae0..3ec3abfa3da 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -211,9 +211,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, * impact the users, so we used DEBUG1 level to log the message. */ ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, - errmsg("could not synchronize replication slot \"%s\" because remote slot precedes local slot", + errmsg("could not synchronize replication slot \"%s\"", remote_slot->name), - errdetail("The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.", + errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%X and catalog xmin %u, but the standby has LSN %X/%X and catalog xmin %u.", LSN_FORMAT_ARGS(remote_slot->restart_lsn), remote_slot->catalog_xmin, LSN_FORMAT_ARGS(slot->data.restart_lsn), @@ -593,7 +593,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ereport(LOG, errmsg("could not synchronize replication slot \"%s\"", remote_slot->name), - errdetail("Logical decoding could not find consistent point from local slot's LSN %X/%X.", + errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); return false; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 0d7bddbe4ed..adf18c397db 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact * contents built by the current transaction even after its decoding, * which should have been invalidated due to concurrent catalog * changing transaction. + * + * Distribute only the invalidation messages generated by the current + * committed transaction. Invalidation messages received from other + * transactions would have already been propagated to the relevant + * in-progress transactions. This transaction would have processed + * those invalidations, ensuring that subsequent transactions observe + * a consistent cache state. */ if (txn->xid != xid) { @@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddDistributedInvalidations(builder->reorder, + txn->xid, lsn, + ninvalidations, msgs); } } } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8e1e8762f62..c90f23ee5b0 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -603,14 +603,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { - logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid, - DSM_HANDLE_INVALID); + /* + * Set the last_start_time even if we fail to start + * the worker, so that we won't retry until + * wal_retrieve_retry_interval has elapsed. + */ hentry->last_start_time = now; + (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid, + DSM_HANDLE_INVALID); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4151a4b2a96..fd11805a44c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -109,13 +109,6 @@ * If ever a user needs to be aware of the tri-state value, they can fetch it * from the pg_subscription catalog (see column subtwophasestate). * - * We don't allow to toggle two_phase option of a subscription because it can - * lead to an inconsistent replica. Consider, initially, it was on and we have - * received some prepare then we turn it off, now at commit time the server - * will send the entire transaction data along with the commit. With some more - * analysis, we can allow changing this option from off to on but not sure if - * that alone would be useful. - * * Finally, to avoid problems mentioned in previous paragraphs from any * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' * to 'off' and then again back to 'on') there is a restriction for @@ -4626,8 +4619,16 @@ run_apply_worker() walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); StartTransactionCommand(); + + /* + * Updating pg_subscription might involve TOAST table access, so + * ensure we have a valid snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + PopActiveSnapshot(); CommitTransactionCommand(); } else @@ -4843,7 +4844,15 @@ DisableSubscriptionAndExit(void) /* Disable the subscription */ StartTransactionCommand(); + + /* + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + DisableSubscription(MySubscription->oid); + PopActiveSnapshot(); CommitTransactionCommand(); /* Ensure we remove no-longer-useful entry for worker's start time */ @@ -4948,6 +4957,12 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) } /* + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* * Protect subskiplsn of pg_subscription from being concurrently updated * while clearing it. */ @@ -5005,6 +5020,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) heap_freetuple(tup); table_close(rel, NoLock); + PopActiveSnapshot(); + if (started_tx) CommitTransactionCommand(); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 693a766e6d7..082b4d9d327 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1789,7 +1789,7 @@ LoadPublications(List *pubnames) else ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("skipped loading publication: %s", pubname), + errmsg("skipped loading publication \"%s\"", pubname), errdetail("The publication does not exist at this point in the WAL."), errhint("Create the publication if it does not exist.")); } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..f9fec50ae88 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_saved_restart_lsn = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr last_saved_restart_lsn; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + last_saved_restart_lsn = s->last_saved_restart_lsn; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* + * For persistent slot use last_saved_restart_lsn to compute the + * oldest LSN for removal of WAL segments. The segments between + * last_saved_restart_lsn and restart_lsn might be needed by a + * persistent slot in the case of database crash. Non-persistent + * slots can't survive the database crash, so we don't care about + * last_saved_restart_lsn for them. + */ + if (persistency == RS_PERSISTENT) + { + if (last_saved_restart_lsn != InvalidXLogRecPtr && + restart_lsn > last_saved_restart_lsn) + { + restart_lsn = last_saved_restart_lsn; + } + } + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1835,7 +1879,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->last_saved_restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2032,6 +2079,7 @@ void CheckPointReplicationSlots(bool is_shutdown) { int i; + bool last_saved_restart_lsn_updated = false; elog(DEBUG1, "performing replication slot checkpoint"); @@ -2076,9 +2124,23 @@ CheckPointReplicationSlots(bool is_shutdown) SpinLockRelease(&s->mutex); } + /* + * Track if we're going to update slot's last_saved_restart_lsn. We + * need this to know if we need to recompute the required LSN. + */ + if (s->last_saved_restart_lsn != s->data.restart_lsn) + last_saved_restart_lsn_updated = true; + SaveSlotToPath(s, path, LOG); } LWLockRelease(ReplicationSlotAllocationLock); + + /* + * Recompute the required LSN if SaveSlotToPath() updated + * last_saved_restart_lsn for any slot. + */ + if (last_saved_restart_lsn_updated) + ReplicationSlotsComputeRequiredLSN(); } /* @@ -2354,6 +2416,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2569,6 +2632,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->last_saved_restart_lsn = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9fa8beb6103..f2c33250e8b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3449,8 +3449,16 @@ XLogSendLogical(void) if (flushPtr == InvalidXLogRecPtr || logical_decoding_ctx->reader->EndRecPtr >= flushPtr) { + /* + * For cascading logical WAL senders, we use the replay LSN instead of + * the flush LSN, since logical decoding on a standby only processes + * WAL that has been replayed. This distinction becomes particularly + * important during shutdown, as new WAL is no longer replayed and the + * last replayed LSN marks the furthest point up to which decoding can + * proceed. + */ if (am_cascading_walsender) - flushPtr = GetStandbyFlushRecPtr(NULL); + flushPtr = GetXLogReplayRecPtr(NULL); else flushPtr = GetFlushRecPtr(NULL); } |