aboutsummaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/logicallauncher.h3
-rw-r--r--src/include/replication/slot.h30
-rw-r--r--src/include/replication/worker_internal.h13
3 files changed, 44 insertions, 2 deletions
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..b29453e8e4f 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -25,8 +25,11 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeup(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void CreateConflictDetectionSlot(void);
+
extern bool IsLogicalLauncher(void);
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76aeeb92242..e8fc342d1a9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -21,6 +21,13 @@
#define PG_REPLSLOT_DIR "pg_replslot"
/*
+ * The reserved name for a replication slot used to retain dead tuples for
+ * conflict detection in logical replication. See
+ * maybe_advance_nonremovable_xid() for detail.
+ */
+#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
+
+/*
* Behaviour of replication slots, upon release or crash.
*
* Slots marked as PERSISTENT are crash-safe and will not be dropped when
@@ -220,6 +227,25 @@ typedef struct ReplicationSlot
* Latest restart_lsn that has been flushed to disk. For persistent slots
* the flushed LSN should be taken into account when calculating the
* oldest LSN for WAL segments removal.
+ *
+ * Do not assume that restart_lsn will always move forward, i.e., that the
+ * previously flushed restart_lsn is always behind data.restart_lsn. In
+ * streaming replication using a physical slot, the restart_lsn is updated
+ * based on the flushed WAL position reported by the walreceiver.
+ *
+ * This replication mode allows duplicate WAL records to be received and
+ * overwritten. If the walreceiver receives older WAL records and then
+ * reports them as flushed to the walsender, the restart_lsn may appear to
+ * move backward.
+ *
+ * This typically occurs at the beginning of replication. One reason is
+ * that streaming replication starts at the beginning of a segment, so, if
+ * restart_lsn is in the middle of a segment, it will be updated to an
+ * earlier LSN, see RequestXLogStreaming. Another reason is that the
+ * walreceiver chooses its startpoint based on the replayed LSN, so, if
+ * some records have been received but not yet applied, they will be
+ * received again and leads to updating the restart_lsn to an earlier
+ * position.
*/
XLogRecPtr last_saved_restart_lsn;
@@ -292,7 +318,9 @@ extern void ReplicationSlotMarkDirty(void);
/* misc stuff */
extern void ReplicationSlotInitialize(void);
-extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern bool ReplicationSlotValidateName(const char *name,
+ bool allow_reserved_name,
+ int elevel);
extern void ReplicationSlotReserveWal(void);
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..0c7b8440a61 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -86,6 +86,16 @@ typedef struct LogicalRepWorker
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
+ /*
+ * The changes made by this and later transactions must be retained to
+ * ensure reliable conflict detection during the apply phase.
+ *
+ * The logical replication launcher manages an internal replication slot
+ * named "pg_conflict_detection". It asynchronously collects this ID to
+ * decide when to advance the xmin value of the slot.
+ */
+ TransactionId oldest_nonremovable_xid;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -245,7 +255,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running,
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
- dsm_handle subworker_dsm);
+ dsm_handle subworker_dsm,
+ bool retain_dead_tuples);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);