aboutsummaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/conflict.h3
-rw-r--r--src/include/replication/logicallauncher.h3
-rw-r--r--src/include/replication/reorderbuffer.h16
-rw-r--r--src/include/replication/slot.h40
-rw-r--r--src/include/replication/worker_internal.h14
5 files changed, 73 insertions, 3 deletions
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 6c59125f256..ff3cb8416ec 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -32,6 +32,9 @@ typedef enum
/* The updated row value violates unique constraint */
CT_UPDATE_EXISTS,
+ /* The row to be updated was concurrently deleted by a different origin */
+ CT_UPDATE_DELETED,
+
/* The row to be updated is missing */
CT_UPDATE_MISSING,
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..b29453e8e4f 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -25,8 +25,11 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeup(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void CreateConflictDetectionSlot(void);
+
extern bool IsLogicalLauncher(void);
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 24e88c409ba..fa0745552f8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -176,6 +176,7 @@ typedef struct ReorderBufferChange
#define RBTXN_SENT_PREPARE 0x0200
#define RBTXN_IS_COMMITTED 0x0400
#define RBTXN_IS_ABORTED 0x0800
+#define RBTXN_DISTR_INVAL_OVERFLOWED 0x1000
#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
@@ -265,6 +266,12 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
+/* Is the array of distributed inval messages overflowed? */
+#define rbtxn_distr_inval_overflowed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
+)
+
/* Is this a top-level transaction? */
#define rbtxn_is_toptxn(txn) \
( \
@@ -422,6 +429,12 @@ typedef struct ReorderBufferTXN
uint32 ninvalidations;
SharedInvalidationMessage *invalidations;
+ /*
+ * Stores cache invalidation messages distributed by other transactions.
+ */
+ uint32 ninvalidations_distributed;
+ SharedInvalidationMessage *invalidations_distributed;
+
/* ---
* Position in one of two lists:
* * list of subtransactions if we are *known* to be subxact
@@ -738,6 +751,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
CommandId cmin, CommandId cmax, CommandId combocid);
extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
+extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, Size nmsgs,
+ SharedInvalidationMessage *msgs);
extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index eb0b93b1114..e8fc342d1a9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -21,6 +21,13 @@
#define PG_REPLSLOT_DIR "pg_replslot"
/*
+ * The reserved name for a replication slot used to retain dead tuples for
+ * conflict detection in logical replication. See
+ * maybe_advance_nonremovable_xid() for detail.
+ */
+#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
+
+/*
* Behaviour of replication slots, upon release or crash.
*
* Slots marked as PERSISTENT are crash-safe and will not be dropped when
@@ -215,6 +222,33 @@ typedef struct ReplicationSlot
* recently stopped.
*/
TimestampTz inactive_since;
+
+ /*
+ * Latest restart_lsn that has been flushed to disk. For persistent slots
+ * the flushed LSN should be taken into account when calculating the
+ * oldest LSN for WAL segments removal.
+ *
+ * Do not assume that restart_lsn will always move forward, i.e., that the
+ * previously flushed restart_lsn is always behind data.restart_lsn. In
+ * streaming replication using a physical slot, the restart_lsn is updated
+ * based on the flushed WAL position reported by the walreceiver.
+ *
+ * This replication mode allows duplicate WAL records to be received and
+ * overwritten. If the walreceiver receives older WAL records and then
+ * reports them as flushed to the walsender, the restart_lsn may appear to
+ * move backward.
+ *
+ * This typically occurs at the beginning of replication. One reason is
+ * that streaming replication starts at the beginning of a segment, so, if
+ * restart_lsn is in the middle of a segment, it will be updated to an
+ * earlier LSN, see RequestXLogStreaming. Another reason is that the
+ * walreceiver chooses its startpoint based on the replayed LSN, so, if
+ * some records have been received but not yet applied, they will be
+ * received again and leads to updating the restart_lsn to an earlier
+ * position.
+ */
+ XLogRecPtr last_saved_restart_lsn;
+
} ReplicationSlot;
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -258,7 +292,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
extern PGDLLIMPORT char *synchronized_standby_slots;
-extern PGDLLIMPORT int idle_replication_slot_timeout_mins;
+extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
/* shmem initialization functions */
extern Size ReplicationSlotsShmemSize(void);
@@ -284,7 +318,9 @@ extern void ReplicationSlotMarkDirty(void);
/* misc stuff */
extern void ReplicationSlotInitialize(void);
-extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern bool ReplicationSlotValidateName(const char *name,
+ bool allow_reserved_name,
+ int elevel);
extern void ReplicationSlotReserveWal(void);
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..7c0204dd6f4 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -86,6 +86,17 @@ typedef struct LogicalRepWorker
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
+ /*
+ * Changes made by this transaction and subsequent ones must be preserved.
+ * This ensures that update_deleted conflicts can be accurately detected
+ * during the apply phase of logical replication by this worker.
+ *
+ * The logical replication launcher manages an internal replication slot
+ * named "pg_conflict_detection". It asynchronously collects this ID to
+ * decide when to advance the xmin value of the slot.
+ */
+ TransactionId oldest_nonremovable_xid;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -245,7 +256,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running,
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
- dsm_handle subworker_dsm);
+ dsm_handle subworker_dsm,
+ bool retain_dead_tuples);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);