diff options
Diffstat (limited to 'src/include')
-rw-r--r-- | src/include/access/commit_ts.h | 14 | ||||
-rw-r--r-- | src/include/access/rmgrlist.h | 1 | ||||
-rw-r--r-- | src/include/access/xact.h | 11 | ||||
-rw-r--r-- | src/include/access/xlog.h | 1 | ||||
-rw-r--r-- | src/include/access/xlog_internal.h | 2 | ||||
-rw-r--r-- | src/include/access/xlogdefs.h | 6 | ||||
-rw-r--r-- | src/include/access/xloginsert.h | 1 | ||||
-rw-r--r-- | src/include/access/xlogreader.h | 3 | ||||
-rw-r--r-- | src/include/access/xlogrecord.h | 1 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/indexing.h | 6 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.h | 36 | ||||
-rw-r--r-- | src/include/catalog/pg_replication_origin.h | 70 | ||||
-rw-r--r-- | src/include/replication/logical.h | 2 | ||||
-rw-r--r-- | src/include/replication/origin.h | 86 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 8 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 8 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 3 | ||||
-rw-r--r-- | src/include/utils/syscache.h | 2 |
19 files changed, 250 insertions, 13 deletions
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h index 93d1217f766..ad44db357aa 100644 --- a/src/include/access/commit_ts.h +++ b/src/include/access/commit_ts.h @@ -13,6 +13,7 @@ #include "access/xlog.h" #include "datatype/timestamp.h" +#include "replication/origin.h" #include "utils/guc.h" @@ -21,18 +22,13 @@ extern PGDLLIMPORT bool track_commit_timestamp; extern bool check_track_commit_timestamp(bool *newval, void **extra, GucSource source); -typedef uint32 CommitTsNodeId; -#define InvalidCommitTsNodeId 0 - -extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid); -extern CommitTsNodeId CommitTsGetDefaultNodeId(void); extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - CommitTsNodeId nodeid, bool do_xlog); + RepOriginId nodeid, bool do_xlog); extern bool TransactionIdGetCommitTsData(TransactionId xid, - TimestampTz *ts, CommitTsNodeId *nodeid); + TimestampTz *ts, RepOriginId *nodeid); extern TransactionId GetLatestCommitTsData(TimestampTz *ts, - CommitTsNodeId *nodeid); + RepOriginId *nodeid); extern Size CommitTsShmemBuffers(void); extern Size CommitTsShmemSize(void); @@ -58,7 +54,7 @@ extern void AdvanceOldestCommitTs(TransactionId oldestXact); typedef struct xl_commit_ts_set { TimestampTz timestamp; - CommitTsNodeId nodeid; + RepOriginId nodeid; TransactionId mainxid; /* subxact Xids follow */ } xl_commit_ts_set; diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 48f04c61716..47033da017b 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -44,3 +44,4 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 8da6aa952f0..cad1bb1d318 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -131,6 +131,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_RELFILENODES (1U << 2) #define XACT_XINFO_HAS_INVALS (1U << 3) #define XACT_XINFO_HAS_TWOPHASE (1U << 4) +#define XACT_XINFO_HAS_ORIGIN (1U << 5) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -217,6 +218,12 @@ typedef struct xl_xact_twophase } xl_xact_twophase; #define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs) +typedef struct xl_xact_origin +{ + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; +} xl_xact_origin; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -227,6 +234,7 @@ typedef struct xl_xact_commit /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */ /* xl_xact_invals follows if XINFO_HAS_INVALS */ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ + /* xl_xact_origin follows if XINFO_HAS_ORIGIN */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -267,6 +275,9 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_commit; typedef struct xl_xact_parsed_abort diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 2b1f42389cb..f08b6767ed7 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -85,6 +85,7 @@ typedef enum } RecoveryTargetType; extern XLogRecPtr XactLastRecEnd; +extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd; extern bool reachedConsistency; diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index deca1de67b0..75cf435e90f 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD085 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 6638c1d4228..18a3e7ca905 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -45,6 +45,12 @@ typedef uint64 XLogSegNo; typedef uint32 TimeLineID; /* + * Replication origin id - this is located in this file to avoid having to + * include origin.h in a bunch of xlog related places. + */ +typedef uint16 RepOriginId; + +/* * Because O_DIRECT bypasses the kernel buffers, and because we never * read those buffers except during crash recovery or if wal_level != minimal, * it is a win to use it in all cases where we sync on each write(). We could diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h index 6864c95b2c7..ac609298cc2 100644 --- a/src/include/access/xloginsert.h +++ b/src/include/access/xloginsert.h @@ -39,6 +39,7 @@ /* prototypes for public functions in xloginsert.c: */ extern void XLogBeginInsert(void); +extern void XLogIncludeOrigin(void); extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info); extern void XLogEnsureRecordSpace(int nbuffers, int ndatas); extern void XLogRegisterData(char *data, int len); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 609bfe3e40f..5164abec758 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -127,6 +127,8 @@ struct XLogReaderState uint32 main_data_len; /* main data portion's length */ uint32 main_data_bufsz; /* allocated size of the buffer */ + RepOriginId record_origin; + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -186,6 +188,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) +#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index b487ae0cc8e..7a049f0e979 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -212,5 +212,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 +#define XLR_BLOCK_ID_ORIGIN 253 #endif /* XLOGRECORD_H */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index b36e4edd843..e8334025e14 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201504261 +#define CATALOG_VERSION_NO 201504291 #endif diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index a234bde293c..71e0010a6f8 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -310,6 +310,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops)); #define PolicyPolrelidPolnameIndexId 3258 +DECLARE_UNIQUE_INDEX(pg_replication_origin_roiident_index, 6001, on pg_replication_origin using btree(roident oid_ops)); +#define ReplicationOriginIdentIndex 6001 + +DECLARE_UNIQUE_INDEX(pg_replication_origin_roname_index, 6002, on pg_replication_origin using btree(roname varchar_pattern_ops)); +#define ReplicationOriginNameIndex 6002 + /* last step of initialization script: build the indexes declared above */ BUILD_INDICES diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index e97e6b19440..55c246e73dd 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5203,6 +5203,42 @@ DESCR("for use by pg_upgrade"); DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID 12 1 0 0 0 f f f f f f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ )); DESCR("for use by pg_upgrade"); +/* replication/origin.h */ +DATA(insert OID = 6003 ( pg_replication_origin_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_create _null_ _null_ _null_ )); +DESCR("create a replication origin"); + +DATA(insert OID = 6004 ( pg_replication_origin_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_drop _null_ _null_ _null_ )); +DESCR("drop replication origin identified by its name"); + +DATA(insert OID = 6005 ( pg_replication_origin_oid PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_oid _null_ _null_ _null_ )); +DESCR("translate the replication origin's name to its id"); + +DATA(insert OID = 6006 ( pg_replication_origin_session_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_setup _null_ _null_ _null_ )); +DESCR("configure session to maintain replication progress tracking for the passed in origin"); + +DATA(insert OID = 6007 ( pg_replication_origin_session_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_reset _null_ _null_ _null_ )); +DESCR("teardown configured replication progress tracking"); + +DATA(insert OID = 6008 ( pg_replication_origin_session_is_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_is_setup _null_ _null_ _null_ )); +DESCR("is a replication origin configured in this session"); + +DATA(insert OID = 6009 ( pg_replication_origin_session_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 3220 "16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_progress _null_ _null_ _null_ )); +DESCR("get the replication progress of the current session"); + +DATA(insert OID = 6010 ( pg_replication_origin_xact_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_setup _null_ _null_ _null_ )); +DESCR("setup the transaction's origin lsn and timestamp"); + +DATA(insert OID = 6011 ( pg_replication_origin_xact_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_reset _null_ _null_ _null_ )); +DESCR("reset the transaction's origin lsn and timestamp"); + +DATA(insert OID = 6012 ( pg_replication_origin_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 3220" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_advance _null_ _null_ _null_ )); +DESCR("advance replication itentifier to specific location"); + +DATA(insert OID = 6013 ( pg_replication_origin_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_progress _null_ _null_ _null_ )); +DESCR("get an individual replication origin's replication progress"); + +DATA(insert OID = 6014 ( pg_show_replication_origin_status PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ _null_ pg_show_replication_origin_status _null_ _null_ _null_ )); +DESCR("get progress for all replication origins"); /* * Symbolic values for provolatile column: these indicate whether the result diff --git a/src/include/catalog/pg_replication_origin.h b/src/include/catalog/pg_replication_origin.h new file mode 100644 index 00000000000..3483809034b --- /dev/null +++ b/src/include/catalog/pg_replication_origin.h @@ -0,0 +1,70 @@ +/*------------------------------------------------------------------------- + * + * pg_replication_origin.h + * Persistent replication origin registry + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_replication_origin.h + * + * NOTES + * the genbki.pl script reads this file and generates .bki + * information from the DATA() statements. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_REPLICATION_ORIGIN_H +#define PG_REPLICATION_ORIGIN_H + +#include "catalog/genbki.h" +#include "access/xlogdefs.h" + +/* ---------------- + * pg_replication_origin. cpp turns this into + * typedef struct FormData_pg_replication_origin + * ---------------- + */ +#define ReplicationOriginRelationId 6000 + +CATALOG(pg_replication_origin,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS +{ + /* + * Locally known id that get included into WAL. + * + * This should never leave the system. + * + * Needs to fit into a uint16, so we don't waste too much space in WAL + * records. For this reason we don't use a normal Oid column here, since + * we need to handle allocation of new values manually. + */ + Oid roident; + + /* + * Variable-length fields start here, but we allow direct access to + * roname. + */ + + /* external, free-format, name */ + text roname BKI_FORCE_NOT_NULL; + +#ifdef CATALOG_VARLEN /* further variable-length fields */ +#endif +} FormData_pg_replication_origin; + +typedef FormData_pg_replication_origin *Form_pg_replication_origin; + +/* ---------------- + * compiler constants for pg_replication_origin + * ---------------- + */ +#define Natts_pg_replication_origin 2 +#define Anum_pg_replication_origin_roident 1 +#define Anum_pg_replication_origin_roname 2 + +/* ---------------- + * pg_replication_origin has no initial contents + * ---------------- + */ + +#endif /* PG_REPLICATION_ORIGIN_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index cce4394d4e3..dfdbe6535f1 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,4 +97,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); + #endif diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h new file mode 100644 index 00000000000..ca26bc3e64a --- /dev/null +++ b/src/include/replication/origin.h @@ -0,0 +1,86 @@ +/*------------------------------------------------------------------------- + * origin.h + * Exports from replication/logical/origin.c + * + * Copyright (c) 2013-2015, PostgreSQL Global Development Group + * + * src/include/replication/origin.h + *------------------------------------------------------------------------- + */ +#ifndef PG_ORIGIN_H +#define PG_ORIGIN_H + +#include "access/xlogdefs.h" +#include "catalog/pg_replication_origin.h" +#include "replication/logical.h" + +typedef struct xl_replorigin_set +{ + XLogRecPtr remote_lsn; + RepOriginId node_id; + bool force; +} xl_replorigin_set; + +typedef struct xl_replorigin_drop +{ + RepOriginId node_id; +} xl_replorigin_drop; + +#define XLOG_REPLORIGIN_SET 0x00 +#define XLOG_REPLORIGIN_DROP 0x10 + +#define InvalidRepOriginId 0 +#define DoNotReplicateId UINT16_MAX + +extern PGDLLIMPORT RepOriginId replorigin_sesssion_origin; +extern PGDLLIMPORT XLogRecPtr replorigin_sesssion_origin_lsn; +extern PGDLLIMPORT TimestampTz replorigin_sesssion_origin_timestamp; + +/* API for querying & manipulating replication origins */ +extern RepOriginId replorigin_by_name(char *name, bool missing_ok); +extern RepOriginId replorigin_create(char *name); +extern void replorigin_drop(RepOriginId roident); +extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, + char **roname); + +/* API for querying & manipulating replication progress tracking */ +extern void replorigin_advance(RepOriginId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit, + bool go_backward, bool wal_log); +extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush); + +extern void replorigin_session_advance(XLogRecPtr remote_commit, + XLogRecPtr local_commit); +extern void replorigin_session_setup(RepOriginId node); +extern void replorigin_session_reset(void); +extern XLogRecPtr replorigin_session_get_progress(bool flush); + +/* Checkpoint/Startup integration */ +extern void CheckPointReplicationOrigin(void); +extern void StartupReplicationOrigin(void); + +/* WAL logging */ +void replorigin_redo(XLogReaderState *record); +void replorigin_desc(StringInfo buf, XLogReaderState *record); +const char * replorigin_identify(uint8 info); + +/* shared memory allocation */ +extern Size ReplicationOriginShmemSize(void); +extern void ReplicationOriginShmemInit(void); + +/* SQL callable functions */ +extern Datum pg_replication_origin_create(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_drop(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_oid(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_advance(PG_FUNCTION_ARGS); +extern Datum pg_replication_origin_progress(PG_FUNCTION_ARGS); +extern Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS); + +#endif /* PG_ORIGIN_H */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 0935c1bac3c..bec1a56017c 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -74,6 +74,13 @@ typedef void (*LogicalDecodeCommitCB) ( XLogRecPtr commit_lsn); /* + * Filter changes by origin. + */ +typedef bool (*LogicalDecodeFilterByOriginCB) ( + struct LogicalDecodingContext *, + RepOriginId origin_id); + +/* * Called to shutdown an output plugin. */ typedef void (*LogicalDecodeShutdownCB) ( @@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f1e0f57e7c2..6a5528a7344 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -68,6 +68,8 @@ typedef struct ReorderBufferChange /* The type of change. */ enum ReorderBufferChangeType action; + RepOriginId origin_id; + /* * Context data for the change, which part of the union is valid depends * on action/action_internal. @@ -166,6 +168,10 @@ typedef struct ReorderBufferTXN */ XLogRecPtr restart_decoding_lsn; + /* origin of the change that caused this transaction */ + RepOriginId origin_id; + XLogRecPtr origin_lsn; + /* * Commit time, only known when we read the actual commit record. */ @@ -339,7 +345,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time); + TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e3c2efc1f3d..cff3b999221 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -134,8 +134,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray; #define ReplicationSlotControlLock (&MainLWLockArray[37].lock) #define CommitTsControlLock (&MainLWLockArray[38].lock) #define CommitTsLock (&MainLWLockArray[39].lock) +#define ReplicationOriginLock (&MainLWLockArray[40].lock) -#define NUM_INDIVIDUAL_LWLOCKS 40 +#define NUM_INDIVIDUAL_LWLOCKS 41 /* * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index ff9a4f2af3b..6634099cbe2 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -77,6 +77,8 @@ enum SysCacheIdentifier RANGETYPE, RELNAMENSP, RELOID, + REPLORIGIDENT, + REPLORIGNAME, RULERELNAME, STATRELATTINH, TABLESPACEOID, |