diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 34 | ||||
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 230 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 4 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 37 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 623 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 2 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 48 | ||||
-rw-r--r-- | src/backend/replication/syncrep_scanner.l | 11 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 91 |
10 files changed, 1016 insertions, 67 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f7b5d093681..239641bfbb6 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -232,6 +232,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters."))); } + PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver, + "received message via replication"); + /* * Set always-secure search path for the cases where the connection is * used to run SQL queries, so malicious users can't get control. @@ -418,31 +421,22 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) "IDENTIFY_SYSTEM", WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("could not receive database system identifier and timeline ID from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); - } /* * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in * 9.4 and onwards. */ if (PQnfields(res) < 3 || PQntuples(res) != 1) - { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); - - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid response from primary server"), errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", - ntuples, nfields, 1, 3))); - } + PQntuples(res), PQnfields(res), 1, 3))); primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); PQclear(res); @@ -604,13 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, return false; } else if (PQresultStatus(res) != PGRES_COPY_BOTH) - { - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); - } PQclear(res); return true; } @@ -718,26 +709,17 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, cmd, WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("could not receive timeline history file from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); - } if (PQnfields(res) != 2 || PQntuples(res) != 1) - { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); - - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid response from primary server"), errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", - ntuples, nfields))); - } + PQntuples(res), PQnfields(res)))); *filename = pstrdup(PQgetvalue(res, 0, 0)); *len = PQgetlength(res, 0, 1); @@ -841,13 +823,10 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, return -1; } else - { - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); - } } if (rawlen < -1) ereport(ERROR, @@ -971,13 +950,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - PQclear(res); ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("could not create replication slot \"%s\": %s", slotname, pchomp(PQerrorMessage(conn->streamConn))))); - } if (lsn) *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..1fa931a7422 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -441,7 +441,8 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg)); + dsm_segment_handle(winfo->dsm_seg), + false); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..37377f7eb63 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static bool acquire_conflict_slot_if_exists(void); +static void advance_conflict_slot_xmin(TransactionId new_xmin); /* @@ -148,6 +151,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retaindeadtuples = subform->subretaindeadtuples; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid relid, dsm_handle subworker_dsm, + bool retain_dead_tuples) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker + * - The replication slot used in conflict detection is created when + * retain_dead_tuples is enabled */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + Assert(!retain_dead_tuples || MyReplicationSlot); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -454,6 +462,9 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->oldest_nonremovable_xid = retain_dead_tuples + ? MyReplicationSlot->data.xmin + : InvalidTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -779,6 +790,8 @@ logicalrep_worker_detach(void) } LWLockRelease(LogicalRepWorkerLock); + + list_free(workers); } /* Block concurrent access. */ @@ -1118,7 +1131,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1150,6 +1166,12 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + /* + * Acquire the conflict detection slot at startup to ensure it can be + * dropped if no longer needed after a restart. + */ + acquire_conflict_slot_if_exists(); + /* Enter main loop */ for (;;) { @@ -1159,6 +1181,9 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + bool retain_dead_tuples = false; + TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1168,7 +1193,14 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_SIZES); oldctx = MemoryContextSwitchTo(subctx); - /* Start any missing workers for enabled subscriptions. */ + /* + * Start any missing workers for enabled subscriptions. + * + * Also, during the iteration through all subscriptions, we compute + * the minimum XID required to protect deleted tuples for conflict + * detection if one of the subscription enables retain_dead_tuples + * option. + */ sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1178,6 +1210,38 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + if (sub->retaindeadtuples) + { + retain_dead_tuples = true; + + /* + * Can't advance xmin of the slot unless all the subscriptions + * with retain_dead_tuples are enabled. This is required to + * ensure that we don't advance the xmin of + * CONFLICT_DETECTION_SLOT if one of the subscriptions is not + * enabled. Otherwise, we won't be able to detect conflicts + * reliably for such a subscription even though it has set the + * retain_dead_tuples option. + */ + can_advance_xmin &= sub->enabled; + + /* + * Create a replication slot to retain information necessary + * for conflict detection such as dead tuples, commit + * timestamps, and origins. + * + * The slot is created before starting the apply worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + * + * The slot is created even for a disabled subscription to + * ensure that conflict-related information is available when + * applying remote changes that occurred before the + * subscription was enabled. + */ + CreateConflictDetectionSlot(); + } + if (!sub->enabled) continue; @@ -1186,7 +1250,27 @@ ApplyLauncherMain(Datum main_arg) LWLockRelease(LogicalRepWorkerLock); if (w != NULL) - continue; /* worker is running already */ + { + /* + * Compute the minimum xmin required to protect dead tuples + * required for conflict detection among all running apply + * workers that enables retain_dead_tuples. + */ + if (sub->retaindeadtuples && can_advance_xmin) + compute_min_nonremovable_xid(w, &xmin); + + /* worker is running already */ + continue; + } + + /* + * Can't advance xmin of the slot unless all the workers + * corresponding to subscriptions with retain_dead_tuples are + * running, disabling the further computation of the minimum + * nonremovable xid. + */ + if (sub->retaindeadtuples) + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1210,7 +1294,8 @@ ApplyLauncherMain(Datum main_arg) if (!logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID)) + DSM_HANDLE_INVALID, + sub->retaindeadtuples)) { /* * We get here either if we failed to launch a worker @@ -1230,6 +1315,20 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription + * that requires us to retain dead tuples. Otherwise, if required, + * advance the slot's xmin to protect dead tuples required for the + * conflict detection. + */ + if (MyReplicationSlot) + { + if (!retain_dead_tuples) + ReplicationSlotDropAcquired(); + else if (can_advance_xmin) + advance_conflict_slot_xmin(xmin); + } + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1258,6 +1357,125 @@ ApplyLauncherMain(Datum main_arg) } /* + * Determine the minimum non-removable transaction ID across all apply workers + * for subscriptions that have retain_dead_tuples enabled. Store the result + * in *xmin. + */ +static void +compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +{ + TransactionId nonremovable_xid; + + Assert(worker != NULL); + + /* + * The replication slot for conflict detection must be created before the + * worker starts. + */ + Assert(MyReplicationSlot); + + SpinLockAcquire(&worker->relmutex); + nonremovable_xid = worker->oldest_nonremovable_xid; + SpinLockRelease(&worker->relmutex); + + Assert(TransactionIdIsValid(nonremovable_xid)); + + if (!TransactionIdIsValid(*xmin) || + TransactionIdPrecedes(nonremovable_xid, *xmin)) + *xmin = nonremovable_xid; +} + +/* + * Acquire the replication slot used to retain information for conflict + * detection, if it exists. + * + * Return true if successfully acquired, otherwise return false. + */ +static bool +acquire_conflict_slot_if_exists(void) +{ + if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + return false; + + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return true; +} + +/* + * Advance the xmin the replication slot used to retain information required + * for conflict detection. + */ +static void +advance_conflict_slot_xmin(TransactionId new_xmin) +{ + Assert(MyReplicationSlot); + Assert(TransactionIdIsValid(new_xmin)); + Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + + /* Return if the xmin value of the slot cannot be advanced */ + if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) + return; + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = new_xmin; + MyReplicationSlot->data.xmin = new_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + + /* + * Like PhysicalConfirmReceivedLocation(), do not save slot information + * each time. This is acceptable because all concurrent transactions on + * the publisher that require the data preceding the slot's xmin should + * have already been applied and flushed on the subscriber before the xmin + * is advanced. So, even if the slot's xmin regresses after a restart, it + * will be advanced again in the next cycle. Therefore, no data required + * for conflict detection will be prematurely removed. + */ + return; +} + +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + TransactionId xmin_horizon; + + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); +} + +/* * Is current process the logical replication launcher? */ bool diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7b4e8629553..34cf05668ae 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2599,7 +2599,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, if (++changes_count >= CHANGES_THRESHOLD) { - rb->update_progress_txn(rb, txn, change->lsn); + rb->update_progress_txn(rb, txn, prev_lsn); changes_count = 0; } } @@ -4917,7 +4917,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e4fd6347fd1..d3356bc84ee 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; bool should_exit = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, true); } } else @@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -615,13 +634,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + false); } } } } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + + if (started_tx) { /* @@ -1413,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); CommitTransactionCommand(); pgstat_report_stat(true); @@ -1546,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); CommitTransactionCommand(); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c5fb627aa56..b59221c4d06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,96 @@ * failover = true when creating the subscription. Enabling failover allows us * to smoothly transition to the promoted standby, ensuring that we can * subscribe to the new primary without losing any data. + * + * RETAIN DEAD TUPLES + * ---------------------- + * Each apply worker that enabled retain_dead_tuples option maintains a + * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to + * prevent dead rows from being removed prematurely when the apply worker still + * needs them to detect conflicts reliably. This helps to retain the required + * commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. + * + * The logical replication launcher manages an internal replication slot named + * "pg_conflict_detection". It asynchronously aggregates the non-removable + * transaction ID from all apply workers to determine the appropriate xmin for + * the slot, thereby retaining necessary tuples. + * + * The non-removable transaction ID in the apply worker is advanced to the + * oldest running transaction ID once all concurrent transactions on the + * publisher have been applied and flushed locally. The process involves: + * + * - RDT_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RDT_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RDT_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RDT_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, detecting concurrent remote + * transactions with earlier timestamps than the DELETE is necessary, as the + * UPDATEs in remote transactions should be ignored if their timestamp is + * earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * Similarly, when the publisher has subscribed to another publisher, + * information necessary for conflict detection cannot be retained for + * changes from origins other than the publisher. This is because publisher + * lacks the information on concurrent transactions of other publishers to + * which it subscribes. As the information on concurrent transactions is + * unavailable beyond subscriber's immediate publishers, the non-removable + * transaction ID might be advanced prematurely before changes from other + * origins have been fully applied. + * + * XXX Retaining information for changes from other origins might be possible + * by requesting the subscription on that origin to enable retain_dead_tuples + * and fetching the conflict detection slot.xmin along with the publisher's + * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could + * wait for the remote slot's xmin to reach the oldest active transaction ID, + * ensuring that all transactions from other origins have been applied on the + * publisher, thereby getting the latest WAL position that includes all + * concurrent changes. However, this approach may impact performance, so it + * might not worth the effort. + * + * XXX It seems feasible to get the latest commit's WAL location from the + * publisher and wait till that is applied. However, we can't do that + * because commit timestamps can regress as a commit with a later LSN is not + * guaranteed to have a later timestamp than those with earlier LSNs. Having + * said that, even if that is possible, it won't improve performance much as + * the apply always lag and moves slowly as compared with the transactions + * on the publisher. *------------------------------------------------------------------------- */ @@ -140,6 +230,7 @@ #include <sys/stat.h> #include <unistd.h> +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -148,6 +239,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -166,12 +258,14 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -268,6 +362,78 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See comments atop worker.c for details of the transition between these + * phases. + */ +typedef enum +{ + RDT_GET_CANDIDATE_XID, + RDT_REQUEST_PUBLISHER_STATUS, + RDT_WAIT_FOR_PUBLISHER_STATUS, + RDT_WAIT_FOR_LOCAL_FLUSH +} RetainDeadTuplesPhase; + +/* + * Critical information for managing phase transitions within the + * RetainDeadTuplesPhase. + */ +typedef struct RetainDeadTuplesData +{ + RetainDeadTuplesPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + + /* + * Oldest transaction ID that was in the commit phase on the publisher. + * Use FullTransactionId to prevent issues with transaction ID wraparound, + * where a new remote_oldestxid could falsely appear to originate from the + * past and block advancement. + */ + FullTransactionId remote_oldestxid; + + /* + * Next transaction ID to be assigned on the publisher. Use + * FullTransactionId for consistency and to allow straightforward + * comparisons with remote_oldestxid. + */ + FullTransactionId remote_nextxid; + + TimestampTz reply_time; /* when the publisher responds with status */ + + /* + * Publisher transaction ID that must be awaited to complete before + * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use + * FullTransactionId for the same reason as remote_nextxid. + */ + FullTransactionId remote_wait_for; + + TransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainDeadTuplesData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We record + * and use this information both while sending feedback to the server and + * advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -372,6 +545,19 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received); +static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data); +static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received); +static void get_candidate_xid(RetainDeadTuplesData *rdt_data); +static void request_publisher_status(RetainDeadTuplesData *rdt_data); +static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received); +static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainDeadTuplesData rdt_data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + rdt_data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&rdt_data, false); } else if (c == 'k') { @@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&rdt_data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + rdt_data.remote_lsn = pq_getmsgint64(&s); + rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(rdt_data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&rdt_data, true); + + UpdateWorkerStats(last_received, rdt_data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + rdt_data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&rdt_data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to advance the non-removable + * transaction ID. + */ + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&rdt_data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) } /* + * Attempt to advance the non-removable transaction ID. + * + * See comments atop worker.c for details. + */ +static void +maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + if (!can_advance_nonremovable_xid(rdt_data)) + return; + + process_rdt_phase_transition(rdt_data, status_received); +} + +/* + * Preliminary check to determine if advancing the non-removable transaction ID + * is allowed. + */ +static bool +can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect conflicts reliably even + * for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return false; + + /* No need to advance if retaining dead tuples is not required */ + if (!MySubscription->retaindeadtuples) + return false; + + return true; +} + +/* + * Process phase transitions during the non-removable transaction ID + * advancement. See comments atop worker.c for details of the transition. + */ +static void +process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + switch (rdt_data->phase) + { + case RDT_GET_CANDIDATE_XID: + get_candidate_xid(rdt_data); + break; + case RDT_REQUEST_PUBLISHER_STATUS: + request_publisher_status(rdt_data); + break; + case RDT_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(rdt_data, status_received); + break; + case RDT_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(rdt_data); + break; + } +} + +/* + * Workhorse for the RDT_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainDeadTuplesData *rdt_data) +{ + TransactionId oldest_running_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + rdt_data->xid_advance_interval)) + return; + + /* + * Immediately update the timer, even if the function returns later + * without setting candidate_xid due to inactivity on the subscriber. This + * avoids frequent calls to GetOldestActiveTransactionId. + */ + rdt_data->candidate_xid_time = now; + + /* + * Consider transactions in the current database, as only dead tuples from + * this database are required for conflict detection. + */ + oldest_running_xid = GetOldestActiveTransactionId(false, false); + + /* + * Oldest active transaction ID (oldest_running_xid) can't be behind any + * of its previously computed value. + */ + Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)) + { + adjust_xid_advance_interval(rdt_data, false); + return; + } + + adjust_xid_advance_interval(rdt_data, true); + + rdt_data->candidate_xid = oldest_running_xid; + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainDeadTuplesData *rdt_data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) + rdt_data->remote_wait_for = rdt_data->remote_nextxid; + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + * + * It's possible that transactions in the commit phase during the last + * cycle have now finished committing, but remote_oldestxid remains older + * than remote_wait_for. This can happen if some old transaction came in + * the commit phase when we requested status in this cycle. We do not + * handle this case explicitly as it's rare and the benefit doesn't + * justify the required complexity. Tracking would require either caching + * all xids at the publisher or sending them to subscribers. The condition + * will resolve naturally once the remaining transactions are finished. + * + * Directly advancing the non-removable transaction ID is possible if + * there are no activities on the publisher since the last advancement + * cycle. However, it requires maintaining two fields, last_remote_nextxid + * and last_remote_lsn, within the structure for comparison with the + * current cycle's values. Considering the minimal cost of continuing in + * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to + * advance the transaction ID here. + */ + if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for, + rdt_data->remote_oldestxid)) + rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH; + else + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainDeadTuplesData *rdt_data) +{ + Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) && + TransactionIdIsValid(rdt_data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect conflicts reliably. This check primarily addresses + * scenarios where the publisher's clock falls behind; if the publisher's + * clock is ahead, subsequent transactions will naturally bear later + * commit timestamps, conforming to the design outlined atop worker.c. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(rdt_data->reply_time, + rdt_data->candidate_xid_time, 0)) + ereport(ERROR, + errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"), + errdetail_internal("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the rdt_data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costly operations in get_flush_position() too frequently + * during change application. + */ + if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time && + TimestampDifferenceExceeds(rdt_data->flushpos_update_time, + rdt_data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + rdt_data->flushpos_update_time = rdt_data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < rdt_data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + rdt_data->candidate_xid); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. We can even use + * flushpos_update_time in the next round to decide whether to get the + * latest flush position. + */ + rdt_data->phase = RDT_GET_CANDIDATE_XID; + rdt_data->remote_lsn = InvalidXLogRecPtr; + rdt_data->remote_oldestxid = InvalidFullTransactionId; + rdt_data->remote_nextxid = InvalidFullTransactionId; + rdt_data->reply_time = 0; + rdt_data->remote_wait_for = InvalidFullTransactionId; + rdt_data->candidate_xid = InvalidTransactionId; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) +{ + if (!new_xid_found && rdt_data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + +/* * Exit routine for apply workers due to subscription parameter changes. */ static void @@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void) apply_worker_exit(); } + /* + * Restart the worker if retain_dead_tuples was enabled during startup. + * + * At this point, the replication slot used for conflict detection might + * not exist yet, or could be dropped soon if the launcher perceives + * retain_dead_tuples as disabled. To avoid unnecessary tracking of + * oldest_nonremovable_xid when the slot is absent or at risk of being + * dropped, a restart is initiated. + * + * The oldest_nonremovable_xid should be initialized only when the + * retain_dead_tuples is enabled before launching the worker. See + * logicalrep_worker_launch. + */ + if (am_leader_apply_worker() && + MySubscription->retaindeadtuples && + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", + MySubscription->name, "retain_dead_tuples")); + + apply_worker_exit(); + } + /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Skip the track_commit_timestamp check when disabling the worker due to + * an error, as verifying commit timestamps is unnecessary in this + * context. + */ + if (MySubscription->retaindeadtuples) + CheckSubDeadTupleRetention(false, true, WARNING); + proc_exit(0); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 082b4d9d327..f4c977262c5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -297,10 +297,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool two_phase_option_given = false; bool origin_option_given = false; + /* Initialize optional parameters to defaults */ data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->publish_no_origin = false; foreach(lc, options) { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e44ad576bc7..8605776ad86 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsSlotForConflictCheck(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsSlotForConflictCheck(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name), + errdetail("The name \"%s\" is reserved for the conflict detection slot.", + CONFLICT_DETECTION_SLOT)); + + return false; + } + return true; } /* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsSlotForConflictCheck(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + +/* * Create a new replication slot and mark it as used by this backend. * * name: Name of the slot @@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher or pg_upgrade may create or migrate an internal + * slot, so using a reserved name is allowed in these cases. + */ + ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(), + ERROR); if (failover) { @@ -582,6 +615,17 @@ retry: } /* + * Do not allow users to acquire the reserved slot. This scenario may + * occur if the launcher that owns the slot has terminated unexpectedly + * due to an error, and a backend process attempts to reuse the slot. + */ + if (!IsLogicalLauncher() && IsSlotForConflictCheck(name)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("cannot acquire replication slot \"%s\"", name), + errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher.")); + + /* * This is the slot we want; check if it's active under some other * process. In single user mode, we don't need this check. */ diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l index 7dec1f869c7..02004d621e7 100644 --- a/src/backend/replication/syncrep_scanner.l +++ b/src/backend/replication/syncrep_scanner.l @@ -157,17 +157,16 @@ syncrep_yyerror(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse { struct yyguts_t *yyg = (struct yyguts_t *) yyscanner; /* needed for yytext * macro */ - char *syncrep_parse_error_msg = *syncrep_parse_error_msg_p; /* report only the first error in a parse operation */ - if (syncrep_parse_error_msg) + if (*syncrep_parse_error_msg_p) return; if (yytext[0]) - syncrep_parse_error_msg = psprintf("%s at or near \"%s\"", - message, yytext); + *syncrep_parse_error_msg_p = psprintf("%s at or near \"%s\"", + message, yytext); else - syncrep_parse_error_msg = psprintf("%s at end of input", - message); + *syncrep_parse_error_msg_p = psprintf("%s at end of input", + message); } void diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 28b8591efa5..ee911394a23 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -65,6 +65,7 @@ #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" +#include "libpq/protocol.h" #include "miscadmin.h" #include "nodes/replnodes.h" #include "pgstat.h" @@ -84,6 +85,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: maxmsglen = PQ_LARGE_MESSAGE_LIMIT; break; - case 'c': /* CopyDone */ - case 'f': /* CopyFail */ - case 'H': /* Flush */ - case 'S': /* Sync */ + case PqMsg_CopyDone: + case PqMsg_CopyFail: + case PqMsg_Flush: + case PqMsg_Sync: maxmsglen = PQ_SMALL_MESSAGE_LIMIT; break; default: @@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, /* Process the message */ switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: AppendIncrementalManifestData(ib, buf->data, buf->len); return true; - case 'c': /* CopyDone */ + case PqMsg_CopyDone: return false; - case 'H': /* Sync */ - case 'S': /* Flush */ + case PqMsg_Sync: + case PqMsg_Flush: /* Ignore these while in CopyOut mode as we do elsewhere. */ return true; - case 'f': + case PqMsg_CopyFail: ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", @@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, tmpbuf.data, sizeof(int64)); /* output previously gathered data in a CopyData packet */ - pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); CHECK_FOR_INTERRUPTS(); @@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void) case PqMsg_CopyDone: if (!streamingDoneSending) { - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; } @@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'p': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void) } /* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestXidInCommit; + FullTransactionId nextFullXid; + FullTransactionId fullOldestXidInCommit; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + /* + * This shouldn't happen because we don't support getting primary status + * message from standby. + */ + if (RecoveryInProgress()) + elog(ERROR, "the primary status is unavailable during recovery"); + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + /* + * Consider transactions in the current database, as only these are the + * ones replicated. + */ + oldestXidInCommit = GetOldestActiveTransactionId(true, false); + nextFullXid = ReadNextFullTransactionId(); + fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid, + oldestXidInCommit); + lsn = GetXLogWriteRecPtr(); + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit)); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); +} + +/* * Compute how long send/receive loops should sleep. * * If wal_sender_timeout is enabled we want to wake up in time to send @@ -3246,7 +3307,7 @@ XLogSendPhysical(void) wal_segment_close(xlogreader); /* Send CopyDone */ - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; WalSndCaughtUp = true; @@ -3374,7 +3435,7 @@ retry: memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); sentPtr = endptr; @@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) pq_sendbyte(&output_message, requestReply ? 1 : 0); /* ... and send it wrapped in CopyData */ - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); /* Set local flag */ if (requestReply) |