aboutsummaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/commit_ts.h14
-rw-r--r--src/include/access/rmgrlist.h1
-rw-r--r--src/include/access/xact.h11
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/access/xlogdefs.h6
-rw-r--r--src/include/access/xloginsert.h1
-rw-r--r--src/include/access/xlogreader.h3
-rw-r--r--src/include/access/xlogrecord.h1
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/indexing.h6
-rw-r--r--src/include/catalog/pg_proc.h36
-rw-r--r--src/include/catalog/pg_replication_origin.h70
-rw-r--r--src/include/replication/logical.h2
-rw-r--r--src/include/replication/origin.h86
-rw-r--r--src/include/replication/output_plugin.h8
-rw-r--r--src/include/replication/reorderbuffer.h8
-rw-r--r--src/include/storage/lwlock.h3
-rw-r--r--src/include/utils/syscache.h2
19 files changed, 250 insertions, 13 deletions
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 93d1217f766..ad44db357aa 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -13,6 +13,7 @@
#include "access/xlog.h"
#include "datatype/timestamp.h"
+#include "replication/origin.h"
#include "utils/guc.h"
@@ -21,18 +22,13 @@ extern PGDLLIMPORT bool track_commit_timestamp;
extern bool check_track_commit_timestamp(bool *newval, void **extra,
GucSource source);
-typedef uint32 CommitTsNodeId;
-#define InvalidCommitTsNodeId 0
-
-extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
-extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid, bool do_xlog);
+ RepOriginId nodeid, bool do_xlog);
extern bool TransactionIdGetCommitTsData(TransactionId xid,
- TimestampTz *ts, CommitTsNodeId *nodeid);
+ TimestampTz *ts, RepOriginId *nodeid);
extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
- CommitTsNodeId *nodeid);
+ RepOriginId *nodeid);
extern Size CommitTsShmemBuffers(void);
extern Size CommitTsShmemSize(void);
@@ -58,7 +54,7 @@ extern void AdvanceOldestCommitTs(TransactionId oldestXact);
typedef struct xl_commit_ts_set
{
TimestampTz timestamp;
- CommitTsNodeId nodeid;
+ RepOriginId nodeid;
TransactionId mainxid;
/* subxact Xids follow */
} xl_commit_ts_set;
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 48f04c61716..47033da017b 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -44,3 +44,4 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 8da6aa952f0..cad1bb1d318 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -131,6 +131,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
#define XACT_XINFO_HAS_INVALS (1U << 3)
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
+#define XACT_XINFO_HAS_ORIGIN (1U << 5)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
@@ -217,6 +218,12 @@ typedef struct xl_xact_twophase
} xl_xact_twophase;
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+typedef struct xl_xact_origin
+{
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
+} xl_xact_origin;
+
typedef struct xl_xact_commit
{
TimestampTz xact_time; /* time of commit */
@@ -227,6 +234,7 @@ typedef struct xl_xact_commit
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
/* xl_xact_invals follows if XINFO_HAS_INVALS */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+ /* xl_xact_origin follows if XINFO_HAS_ORIGIN */
} xl_xact_commit;
#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
@@ -267,6 +275,9 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
typedef struct xl_xact_parsed_abort
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2b1f42389cb..f08b6767ed7 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -85,6 +85,7 @@ typedef enum
} RecoveryTargetType;
extern XLogRecPtr XactLastRecEnd;
+extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
extern bool reachedConsistency;
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index deca1de67b0..75cf435e90f 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD085 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 6638c1d4228..18a3e7ca905 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
typedef uint32 TimeLineID;
/*
+ * Replication origin id - this is located in this file to avoid having to
+ * include origin.h in a bunch of xlog related places.
+ */
+typedef uint16 RepOriginId;
+
+/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
* it is a win to use it in all cases where we sync on each write(). We could
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 6864c95b2c7..ac609298cc2 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -39,6 +39,7 @@
/* prototypes for public functions in xloginsert.c: */
extern void XLogBeginInsert(void);
+extern void XLogIncludeOrigin(void);
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
extern void XLogEnsureRecordSpace(int nbuffers, int ndatas);
extern void XLogRegisterData(char *data, int len);
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 609bfe3e40f..5164abec758 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -127,6 +127,8 @@ struct XLogReaderState
uint32 main_data_len; /* main data portion's length */
uint32 main_data_bufsz; /* allocated size of the buffer */
+ RepOriginId record_origin;
+
/* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
@@ -186,6 +188,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
#define XLogRecGetData(decoder) ((decoder)->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index b487ae0cc8e..7a049f0e979 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -212,5 +212,6 @@ typedef struct XLogRecordDataHeaderLong
#define XLR_BLOCK_ID_DATA_SHORT 255
#define XLR_BLOCK_ID_DATA_LONG 254
+#define XLR_BLOCK_ID_ORIGIN 253
#endif /* XLOGRECORD_H */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index b36e4edd843..e8334025e14 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201504261
+#define CATALOG_VERSION_NO 201504291
#endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index a234bde293c..71e0010a6f8 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -310,6 +310,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid
DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops));
#define PolicyPolrelidPolnameIndexId 3258
+DECLARE_UNIQUE_INDEX(pg_replication_origin_roiident_index, 6001, on pg_replication_origin using btree(roident oid_ops));
+#define ReplicationOriginIdentIndex 6001
+
+DECLARE_UNIQUE_INDEX(pg_replication_origin_roname_index, 6002, on pg_replication_origin using btree(roname varchar_pattern_ops));
+#define ReplicationOriginNameIndex 6002
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e97e6b19440..55c246e73dd 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5203,6 +5203,42 @@ DESCR("for use by pg_upgrade");
DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID 12 1 0 0 0 f f f f f f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ ));
DESCR("for use by pg_upgrade");
+/* replication/origin.h */
+DATA(insert OID = 6003 ( pg_replication_origin_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_create _null_ _null_ _null_ ));
+DESCR("create a replication origin");
+
+DATA(insert OID = 6004 ( pg_replication_origin_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_drop _null_ _null_ _null_ ));
+DESCR("drop replication origin identified by its name");
+
+DATA(insert OID = 6005 ( pg_replication_origin_oid PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_oid _null_ _null_ _null_ ));
+DESCR("translate the replication origin's name to its id");
+
+DATA(insert OID = 6006 ( pg_replication_origin_session_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_setup _null_ _null_ _null_ ));
+DESCR("configure session to maintain replication progress tracking for the passed in origin");
+
+DATA(insert OID = 6007 ( pg_replication_origin_session_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_reset _null_ _null_ _null_ ));
+DESCR("teardown configured replication progress tracking");
+
+DATA(insert OID = 6008 ( pg_replication_origin_session_is_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_is_setup _null_ _null_ _null_ ));
+DESCR("is a replication origin configured in this session");
+
+DATA(insert OID = 6009 ( pg_replication_origin_session_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 3220 "16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_progress _null_ _null_ _null_ ));
+DESCR("get the replication progress of the current session");
+
+DATA(insert OID = 6010 ( pg_replication_origin_xact_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_setup _null_ _null_ _null_ ));
+DESCR("setup the transaction's origin lsn and timestamp");
+
+DATA(insert OID = 6011 ( pg_replication_origin_xact_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_reset _null_ _null_ _null_ ));
+DESCR("reset the transaction's origin lsn and timestamp");
+
+DATA(insert OID = 6012 ( pg_replication_origin_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 3220" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_advance _null_ _null_ _null_ ));
+DESCR("advance replication itentifier to specific location");
+
+DATA(insert OID = 6013 ( pg_replication_origin_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_progress _null_ _null_ _null_ ));
+DESCR("get an individual replication origin's replication progress");
+
+DATA(insert OID = 6014 ( pg_show_replication_origin_status PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ _null_ pg_show_replication_origin_status _null_ _null_ _null_ ));
+DESCR("get progress for all replication origins");
/*
* Symbolic values for provolatile column: these indicate whether the result
diff --git a/src/include/catalog/pg_replication_origin.h b/src/include/catalog/pg_replication_origin.h
new file mode 100644
index 00000000000..3483809034b
--- /dev/null
+++ b/src/include/catalog/pg_replication_origin.h
@@ -0,0 +1,70 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_origin.h
+ * Persistent replication origin registry
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_origin.h
+ *
+ * NOTES
+ * the genbki.pl script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_ORIGIN_H
+#define PG_REPLICATION_ORIGIN_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ * pg_replication_origin. cpp turns this into
+ * typedef struct FormData_pg_replication_origin
+ * ----------------
+ */
+#define ReplicationOriginRelationId 6000
+
+CATALOG(pg_replication_origin,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+ /*
+ * Locally known id that get included into WAL.
+ *
+ * This should never leave the system.
+ *
+ * Needs to fit into a uint16, so we don't waste too much space in WAL
+ * records. For this reason we don't use a normal Oid column here, since
+ * we need to handle allocation of new values manually.
+ */
+ Oid roident;
+
+ /*
+ * Variable-length fields start here, but we allow direct access to
+ * roname.
+ */
+
+ /* external, free-format, name */
+ text roname BKI_FORCE_NOT_NULL;
+
+#ifdef CATALOG_VARLEN /* further variable-length fields */
+#endif
+} FormData_pg_replication_origin;
+
+typedef FormData_pg_replication_origin *Form_pg_replication_origin;
+
+/* ----------------
+ * compiler constants for pg_replication_origin
+ * ----------------
+ */
+#define Natts_pg_replication_origin 2
+#define Anum_pg_replication_origin_roident 1
+#define Anum_pg_replication_origin_roname 2
+
+/* ----------------
+ * pg_replication_origin has no initial contents
+ * ----------------
+ */
+
+#endif /* PG_REPLICATION_ORIGIN_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index cce4394d4e3..dfdbe6535f1 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,4 +97,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+
#endif
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
new file mode 100644
index 00000000000..ca26bc3e64a
--- /dev/null
+++ b/src/include/replication/origin.h
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ * origin.h
+ * Exports from replication/logical/origin.c
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/origin.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_ORIGIN_H
+#define PG_ORIGIN_H
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_replication_origin.h"
+#include "replication/logical.h"
+
+typedef struct xl_replorigin_set
+{
+ XLogRecPtr remote_lsn;
+ RepOriginId node_id;
+ bool force;
+} xl_replorigin_set;
+
+typedef struct xl_replorigin_drop
+{
+ RepOriginId node_id;
+} xl_replorigin_drop;
+
+#define XLOG_REPLORIGIN_SET 0x00
+#define XLOG_REPLORIGIN_DROP 0x10
+
+#define InvalidRepOriginId 0
+#define DoNotReplicateId UINT16_MAX
+
+extern PGDLLIMPORT RepOriginId replorigin_sesssion_origin;
+extern PGDLLIMPORT XLogRecPtr replorigin_sesssion_origin_lsn;
+extern PGDLLIMPORT TimestampTz replorigin_sesssion_origin_timestamp;
+
+/* API for querying & manipulating replication origins */
+extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
+extern RepOriginId replorigin_create(char *name);
+extern void replorigin_drop(RepOriginId roident);
+extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
+ char **roname);
+
+/* API for querying & manipulating replication progress tracking */
+extern void replorigin_advance(RepOriginId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit,
+ bool go_backward, bool wal_log);
+extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
+
+extern void replorigin_session_advance(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void replorigin_session_setup(RepOriginId node);
+extern void replorigin_session_reset(void);
+extern XLogRecPtr replorigin_session_get_progress(bool flush);
+
+/* Checkpoint/Startup integration */
+extern void CheckPointReplicationOrigin(void);
+extern void StartupReplicationOrigin(void);
+
+/* WAL logging */
+void replorigin_redo(XLogReaderState *record);
+void replorigin_desc(StringInfo buf, XLogReaderState *record);
+const char * replorigin_identify(uint8 info);
+
+/* shared memory allocation */
+extern Size ReplicationOriginShmemSize(void);
+extern void ReplicationOriginShmemInit(void);
+
+/* SQL callable functions */
+extern Datum pg_replication_origin_create(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_drop(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_oid(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_advance(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_progress(PG_FUNCTION_ARGS);
+extern Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS);
+
+#endif /* PG_ORIGIN_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 0935c1bac3c..bec1a56017c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,13 @@ typedef void (*LogicalDecodeCommitCB) (
XLogRecPtr commit_lsn);
/*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (
+ struct LogicalDecodingContext *,
+ RepOriginId origin_id);
+
+/*
* Called to shutdown an output plugin.
*/
typedef void (*LogicalDecodeShutdownCB) (
@@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57e7c2..6a5528a7344 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -68,6 +68,8 @@ typedef struct ReorderBufferChange
/* The type of change. */
enum ReorderBufferChangeType action;
+ RepOriginId origin_id;
+
/*
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
@@ -166,6 +168,10 @@ typedef struct ReorderBufferTXN
*/
XLogRecPtr restart_decoding_lsn;
+ /* origin of the change that caused this transaction */
+ RepOriginId origin_id;
+ XLogRecPtr origin_lsn;
+
/*
* Commit time, only known when we read the actual commit record.
*/
@@ -339,7 +345,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time);
+ TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e3c2efc1f3d..cff3b999221 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -134,8 +134,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
#define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
#define CommitTsControlLock (&MainLWLockArray[38].lock)
#define CommitTsLock (&MainLWLockArray[39].lock)
+#define ReplicationOriginLock (&MainLWLockArray[40].lock)
-#define NUM_INDIVIDUAL_LWLOCKS 40
+#define NUM_INDIVIDUAL_LWLOCKS 41
/*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index ff9a4f2af3b..6634099cbe2 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
RANGETYPE,
RELNAMENSP,
RELOID,
+ REPLORIGIDENT,
+ REPLORIGNAME,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,