diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 623 |
1 files changed, 622 insertions, 1 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c5fb627aa56..b59221c4d06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,96 @@ * failover = true when creating the subscription. Enabling failover allows us * to smoothly transition to the promoted standby, ensuring that we can * subscribe to the new primary without losing any data. + * + * RETAIN DEAD TUPLES + * ---------------------- + * Each apply worker that enabled retain_dead_tuples option maintains a + * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to + * prevent dead rows from being removed prematurely when the apply worker still + * needs them to detect conflicts reliably. This helps to retain the required + * commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. + * + * The logical replication launcher manages an internal replication slot named + * "pg_conflict_detection". It asynchronously aggregates the non-removable + * transaction ID from all apply workers to determine the appropriate xmin for + * the slot, thereby retaining necessary tuples. + * + * The non-removable transaction ID in the apply worker is advanced to the + * oldest running transaction ID once all concurrent transactions on the + * publisher have been applied and flushed locally. The process involves: + * + * - RDT_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RDT_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RDT_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RDT_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, detecting concurrent remote + * transactions with earlier timestamps than the DELETE is necessary, as the + * UPDATEs in remote transactions should be ignored if their timestamp is + * earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * Similarly, when the publisher has subscribed to another publisher, + * information necessary for conflict detection cannot be retained for + * changes from origins other than the publisher. This is because publisher + * lacks the information on concurrent transactions of other publishers to + * which it subscribes. As the information on concurrent transactions is + * unavailable beyond subscriber's immediate publishers, the non-removable + * transaction ID might be advanced prematurely before changes from other + * origins have been fully applied. + * + * XXX Retaining information for changes from other origins might be possible + * by requesting the subscription on that origin to enable retain_dead_tuples + * and fetching the conflict detection slot.xmin along with the publisher's + * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could + * wait for the remote slot's xmin to reach the oldest active transaction ID, + * ensuring that all transactions from other origins have been applied on the + * publisher, thereby getting the latest WAL position that includes all + * concurrent changes. However, this approach may impact performance, so it + * might not worth the effort. + * + * XXX It seems feasible to get the latest commit's WAL location from the + * publisher and wait till that is applied. However, we can't do that + * because commit timestamps can regress as a commit with a later LSN is not + * guaranteed to have a later timestamp than those with earlier LSNs. Having + * said that, even if that is possible, it won't improve performance much as + * the apply always lag and moves slowly as compared with the transactions + * on the publisher. *------------------------------------------------------------------------- */ @@ -140,6 +230,7 @@ #include <sys/stat.h> #include <unistd.h> +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -148,6 +239,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -166,12 +258,14 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -268,6 +362,78 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See comments atop worker.c for details of the transition between these + * phases. + */ +typedef enum +{ + RDT_GET_CANDIDATE_XID, + RDT_REQUEST_PUBLISHER_STATUS, + RDT_WAIT_FOR_PUBLISHER_STATUS, + RDT_WAIT_FOR_LOCAL_FLUSH +} RetainDeadTuplesPhase; + +/* + * Critical information for managing phase transitions within the + * RetainDeadTuplesPhase. + */ +typedef struct RetainDeadTuplesData +{ + RetainDeadTuplesPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + + /* + * Oldest transaction ID that was in the commit phase on the publisher. + * Use FullTransactionId to prevent issues with transaction ID wraparound, + * where a new remote_oldestxid could falsely appear to originate from the + * past and block advancement. + */ + FullTransactionId remote_oldestxid; + + /* + * Next transaction ID to be assigned on the publisher. Use + * FullTransactionId for consistency and to allow straightforward + * comparisons with remote_oldestxid. + */ + FullTransactionId remote_nextxid; + + TimestampTz reply_time; /* when the publisher responds with status */ + + /* + * Publisher transaction ID that must be awaited to complete before + * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use + * FullTransactionId for the same reason as remote_nextxid. + */ + FullTransactionId remote_wait_for; + + TransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainDeadTuplesData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We record + * and use this information both while sending feedback to the server and + * advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -372,6 +545,19 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received); +static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data); +static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received); +static void get_candidate_xid(RetainDeadTuplesData *rdt_data); +static void request_publisher_status(RetainDeadTuplesData *rdt_data); +static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received); +static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainDeadTuplesData rdt_data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + rdt_data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&rdt_data, false); } else if (c == 'k') { @@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&rdt_data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + rdt_data.remote_lsn = pq_getmsgint64(&s); + rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(rdt_data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&rdt_data, true); + + UpdateWorkerStats(last_received, rdt_data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + rdt_data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&rdt_data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to advance the non-removable + * transaction ID. + */ + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&rdt_data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) } /* + * Attempt to advance the non-removable transaction ID. + * + * See comments atop worker.c for details. + */ +static void +maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + if (!can_advance_nonremovable_xid(rdt_data)) + return; + + process_rdt_phase_transition(rdt_data, status_received); +} + +/* + * Preliminary check to determine if advancing the non-removable transaction ID + * is allowed. + */ +static bool +can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect conflicts reliably even + * for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return false; + + /* No need to advance if retaining dead tuples is not required */ + if (!MySubscription->retaindeadtuples) + return false; + + return true; +} + +/* + * Process phase transitions during the non-removable transaction ID + * advancement. See comments atop worker.c for details of the transition. + */ +static void +process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + switch (rdt_data->phase) + { + case RDT_GET_CANDIDATE_XID: + get_candidate_xid(rdt_data); + break; + case RDT_REQUEST_PUBLISHER_STATUS: + request_publisher_status(rdt_data); + break; + case RDT_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(rdt_data, status_received); + break; + case RDT_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(rdt_data); + break; + } +} + +/* + * Workhorse for the RDT_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainDeadTuplesData *rdt_data) +{ + TransactionId oldest_running_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + rdt_data->xid_advance_interval)) + return; + + /* + * Immediately update the timer, even if the function returns later + * without setting candidate_xid due to inactivity on the subscriber. This + * avoids frequent calls to GetOldestActiveTransactionId. + */ + rdt_data->candidate_xid_time = now; + + /* + * Consider transactions in the current database, as only dead tuples from + * this database are required for conflict detection. + */ + oldest_running_xid = GetOldestActiveTransactionId(false, false); + + /* + * Oldest active transaction ID (oldest_running_xid) can't be behind any + * of its previously computed value. + */ + Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)) + { + adjust_xid_advance_interval(rdt_data, false); + return; + } + + adjust_xid_advance_interval(rdt_data, true); + + rdt_data->candidate_xid = oldest_running_xid; + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainDeadTuplesData *rdt_data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) + rdt_data->remote_wait_for = rdt_data->remote_nextxid; + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + * + * It's possible that transactions in the commit phase during the last + * cycle have now finished committing, but remote_oldestxid remains older + * than remote_wait_for. This can happen if some old transaction came in + * the commit phase when we requested status in this cycle. We do not + * handle this case explicitly as it's rare and the benefit doesn't + * justify the required complexity. Tracking would require either caching + * all xids at the publisher or sending them to subscribers. The condition + * will resolve naturally once the remaining transactions are finished. + * + * Directly advancing the non-removable transaction ID is possible if + * there are no activities on the publisher since the last advancement + * cycle. However, it requires maintaining two fields, last_remote_nextxid + * and last_remote_lsn, within the structure for comparison with the + * current cycle's values. Considering the minimal cost of continuing in + * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to + * advance the transaction ID here. + */ + if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for, + rdt_data->remote_oldestxid)) + rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH; + else + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainDeadTuplesData *rdt_data) +{ + Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) && + TransactionIdIsValid(rdt_data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect conflicts reliably. This check primarily addresses + * scenarios where the publisher's clock falls behind; if the publisher's + * clock is ahead, subsequent transactions will naturally bear later + * commit timestamps, conforming to the design outlined atop worker.c. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(rdt_data->reply_time, + rdt_data->candidate_xid_time, 0)) + ereport(ERROR, + errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"), + errdetail_internal("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the rdt_data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costly operations in get_flush_position() too frequently + * during change application. + */ + if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time && + TimestampDifferenceExceeds(rdt_data->flushpos_update_time, + rdt_data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + rdt_data->flushpos_update_time = rdt_data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < rdt_data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + rdt_data->candidate_xid); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. We can even use + * flushpos_update_time in the next round to decide whether to get the + * latest flush position. + */ + rdt_data->phase = RDT_GET_CANDIDATE_XID; + rdt_data->remote_lsn = InvalidXLogRecPtr; + rdt_data->remote_oldestxid = InvalidFullTransactionId; + rdt_data->remote_nextxid = InvalidFullTransactionId; + rdt_data->reply_time = 0; + rdt_data->remote_wait_for = InvalidFullTransactionId; + rdt_data->candidate_xid = InvalidTransactionId; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) +{ + if (!new_xid_found && rdt_data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + +/* * Exit routine for apply workers due to subscription parameter changes. */ static void @@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void) apply_worker_exit(); } + /* + * Restart the worker if retain_dead_tuples was enabled during startup. + * + * At this point, the replication slot used for conflict detection might + * not exist yet, or could be dropped soon if the launcher perceives + * retain_dead_tuples as disabled. To avoid unnecessary tracking of + * oldest_nonremovable_xid when the slot is absent or at risk of being + * dropped, a restart is initiated. + * + * The oldest_nonremovable_xid should be initialized only when the + * retain_dead_tuples is enabled before launching the worker. See + * logicalrep_worker_launch. + */ + if (am_leader_apply_worker() && + MySubscription->retaindeadtuples && + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", + MySubscription->name, "retain_dead_tuples")); + + apply_worker_exit(); + } + /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Skip the track_commit_timestamp check when disabling the worker due to + * an error, as verifying commit timestamps is unnecessary in this + * context. + */ + if (MySubscription->retaindeadtuples) + CheckSubDeadTupleRetention(false, true, WARNING); + proc_exit(0); } |