aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2015-04-29 19:30:53 +0200
committerAndres Freund <andres@anarazel.de>2015-04-29 19:30:53 +0200
commit5aa2350426c4fdb3d04568b65aadac397012bbcb (patch)
tree954c3123dc58905bbda6407565383c65850204e7 /src/backend
parentc6e96a2f986e4dad72c14b14d4cc17d02b2a6aad (diff)
downloadpostgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.tar.gz
postgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.zip
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/heap/heapam.c19
-rw-r--r--src/backend/access/rmgrdesc/Makefile4
-rw-r--r--src/backend/access/rmgrdesc/replorigindesc.c61
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c24
-rw-r--r--src/backend/access/transam/commit_ts.c53
-rw-r--r--src/backend/access/transam/rmgr.c1
-rw-r--r--src/backend/access/transam/xact.c76
-rw-r--r--src/backend/access/transam/xlog.c8
-rw-r--r--src/backend/access/transam/xloginsert.c27
-rw-r--r--src/backend/access/transam/xlogreader.c6
-rw-r--r--src/backend/catalog/Makefile2
-rw-r--r--src/backend/catalog/catalog.c8
-rw-r--r--src/backend/catalog/system_views.sql7
-rw-r--r--src/backend/replication/logical/Makefile3
-rw-r--r--src/backend/replication/logical/decode.c49
-rw-r--r--src/backend/replication/logical/logical.c29
-rw-r--r--src/backend/replication/logical/origin.c1485
-rw-r--r--src/backend/replication/logical/reorderbuffer.c5
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/utils/cache/syscache.c23
20 files changed, 1824 insertions, 69 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 457cd708fd3..b504ccd05c3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2189,6 +2189,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
(char *) heaptup->t_data + SizeofHeapTupleHeader,
heaptup->t_len - SizeofHeapTupleHeader);
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr);
@@ -2499,6 +2502,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
XLogRegisterBufData(0, tupledata, totaldatalen);
+
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP2_ID, info);
PageSetLSN(page, recptr);
@@ -2920,6 +2927,9 @@ l1:
- SizeofHeapTupleHeader);
}
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
PageSetLSN(page, recptr);
@@ -4650,6 +4660,8 @@ failed:
tuple->t_data->t_infomask2);
XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
+ /* we don't decode row locks atm, so no need to log the origin */
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
@@ -5429,6 +5441,8 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
+ /* inplace updates aren't decoded atm, don't log the origin */
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(page, recptr);
@@ -6787,6 +6801,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
old_key_tuple->t_len - SizeofHeapTupleHeader);
}
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, info);
return recptr;
@@ -6860,6 +6877,8 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
+ /* will be looked at irrespective of origin */
+
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
return recptr;
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index d18e8ec9980..c72a1f245d1 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
- hashdesc.o heapdesc.o \
- mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
+ hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
+ replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/replorigindesc.c b/src/backend/access/rmgrdesc/replorigindesc.c
new file mode 100644
index 00000000000..19bae9a0f84
--- /dev/null
+++ b/src/backend/access/rmgrdesc/replorigindesc.c
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * replorigindesc.c
+ * rmgr descriptor routines for replication/logical/replication_origin.c
+ *
+ * Portions Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/replorigindesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/origin.h"
+
+void
+replorigin_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ {
+ xl_replorigin_set *xlrec;
+ xlrec = (xl_replorigin_set *) rec;
+
+ appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
+ xlrec->node_id,
+ (uint32) (xlrec->remote_lsn >> 32),
+ (uint32) xlrec->remote_lsn,
+ xlrec->force);
+ break;
+ }
+ case XLOG_REPLORIGIN_DROP:
+ {
+ xl_replorigin_drop *xlrec;
+ xlrec = (xl_replorigin_drop *) rec;
+
+ appendStringInfo(buf, "drop %u", xlrec->node_id);
+ break;
+ }
+ }
+}
+
+const char *
+replorigin_identify(uint8 info)
+{
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ return "SET";
+ case XLOG_REPLORIGIN_DROP:
+ return "DROP";
+ default:
+ return NULL;
+ }
+}
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index b036b6d5242..3297e1d3790 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -101,6 +101,16 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
data += sizeof(xl_xact_twophase);
}
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ xl_xact_origin *xl_origin = (xl_xact_origin *) data;
+
+ parsed->origin_lsn = xl_origin->origin_lsn;
+ parsed->origin_timestamp = xl_origin->origin_timestamp;
+
+ data += sizeof(xl_xact_origin);
+ }
}
void
@@ -156,7 +166,7 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
}
static void
-xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
{
xl_xact_parsed_commit parsed;
int i;
@@ -218,6 +228,15 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfo(buf, "; sync");
+
+ if (parsed.xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ appendStringInfo(buf, "; origin: node %u, lsn %X/%X, at %s",
+ origin_id,
+ (uint32)(parsed.origin_lsn >> 32),
+ (uint32)parsed.origin_lsn,
+ timestamptz_to_str(parsed.origin_timestamp));
+ }
}
static void
@@ -274,7 +293,8 @@ xact_desc(StringInfo buf, XLogReaderState *record)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec,
+ XLogRecGetOrigin(record));
}
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index dc23ab27b65..40042a5fd53 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -49,18 +49,18 @@
*/
/*
- * We need 8+4 bytes per xact. Note that enlarging this struct might mean
+ * We need 8+2 bytes per xact. Note that enlarging this struct might mean
* the largest possible file name is more than 5 chars long; see
* SlruScanDirectory.
*/
typedef struct CommitTimestampEntry
{
TimestampTz time;
- CommitTsNodeId nodeid;
+ RepOriginId nodeid;
} CommitTimestampEntry;
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
- sizeof(CommitTsNodeId))
+ sizeof(RepOriginId))
#define COMMIT_TS_XACTS_PER_PAGE \
(BLCKSZ / SizeOfCommitTimestampEntry)
@@ -93,43 +93,18 @@ CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp;
-static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
-
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- CommitTsNodeId nodeid, int pageno);
+ RepOriginId nodeid, int pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- CommitTsNodeId nodeid, int slotno);
+ RepOriginId nodeid, int slotno);
static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid);
-
-
-/*
- * CommitTsSetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-void
-CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
-{
- default_node_id = nodeid;
-}
-
-/*
- * CommitTsGetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-CommitTsNodeId
-CommitTsGetDefaultNodeId(void)
-{
- return default_node_id;
-}
+ RepOriginId nodeid);
/*
* TransactionTreeSetCommitTsData
@@ -156,7 +131,7 @@ CommitTsGetDefaultNodeId(void)
void
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid, bool do_xlog)
+ RepOriginId nodeid, bool do_xlog)
{
int i;
TransactionId headxid;
@@ -234,7 +209,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
static void
SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- CommitTsNodeId nodeid, int pageno)
+ RepOriginId nodeid, int pageno)
{
int slotno;
int i;
@@ -259,7 +234,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- CommitTsNodeId nodeid, int slotno)
+ RepOriginId nodeid, int slotno)
{
int entryno = TransactionIdToCTsEntry(xid);
CommitTimestampEntry entry;
@@ -282,7 +257,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
*/
bool
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
- CommitTsNodeId *nodeid)
+ RepOriginId *nodeid)
{
int pageno = TransactionIdToCTsPage(xid);
int entryno = TransactionIdToCTsEntry(xid);
@@ -322,7 +297,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (ts)
*ts = 0;
if (nodeid)
- *nodeid = InvalidCommitTsNodeId;
+ *nodeid = InvalidRepOriginId;
return false;
}
@@ -373,7 +348,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
* as NULL if not wanted.
*/
TransactionId
-GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
+GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
{
TransactionId xid;
@@ -503,7 +478,7 @@ CommitTsShmemInit(void)
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
- commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
}
else
Assert(found);
@@ -857,7 +832,7 @@ WriteTruncateXlogRec(int pageno)
static void
WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid)
+ RepOriginId nodeid)
{
xl_commit_ts_set record;
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index acd825fad4f..7c4d773ce0f 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -23,6 +23,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1495bb499f5..511bcbbc519 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/origin.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1073,21 +1075,27 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */);
- }
- /*
- * We only need to log the commit timestamp separately if the node
- * identifier is a valid value; the commit record above already contains
- * the timestamp info otherwise, and will be used to load it.
- */
- if (markXidCommitted)
- {
- CommitTsNodeId node_id;
+ /*
+ * Record plain commit ts if not replaying remote actions, or if no
+ * timestamp is configured.
+ */
+ if (replorigin_sesssion_origin == InvalidRepOriginId ||
+ replorigin_sesssion_origin == DoNotReplicateId ||
+ replorigin_sesssion_origin_timestamp == 0)
+ replorigin_sesssion_origin_timestamp = xactStopTimestamp;
+ else
+ replorigin_session_advance(replorigin_sesssion_origin_lsn,
+ XactLastRecEnd);
- node_id = CommitTsGetDefaultNodeId();
+ /*
+ * We don't need to WAL log origin or timestamp here, the commit
+ * record contains all the necessary information and will redo the SET
+ * action during replay.
+ */
TransactionTreeSetCommitTsData(xid, nchildren, children,
- xactStopTimestamp,
- node_id, node_id != InvalidCommitTsNodeId);
+ replorigin_sesssion_origin_timestamp,
+ replorigin_sesssion_origin, false);
}
/*
@@ -1176,9 +1184,11 @@ RecordTransactionCommit(void)
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
+ xl_xact_origin xl_origin;
uint8 info;
@@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_twophase.xid = twophase_xid;
}
+ /* dump transaction origin information */
+ if (replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_sesssion_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
+ }
+
if (xl_xinfo.xinfo != 0)
info |= XLOG_XACT_HAS_INFO;
@@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ /* we allow filtering by xacts */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_XACT_ID, info);
}
@@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time,
static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
- XLogRecPtr lsn)
+ XLogRecPtr lsn,
+ RepOriginId origin_id)
{
TransactionId max_xid;
int i;
+ TimestampTz commit_time;
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
@@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
LWLockRelease(XidGenLock);
}
+ Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId));
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ commit_time = parsed->origin_timestamp;
+ else
+ commit_time = parsed->xact_time;
+
/* Set the transaction commit timestamp and metadata */
TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
- parsed->xact_time, InvalidCommitTsNodeId,
+ commit_time, origin_id,
false);
if (standbyState == STANDBY_DISABLED)
@@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
StandbyReleaseLockTree(xid, 0, NULL);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */, false /* WAL */);
+ }
+
/* Make sure files supposed to be dropped are dropped */
if (parsed->nrels > 0)
{
@@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record),
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid,
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
RemoveTwoPhaseFile(parsed.twophase_xid, false);
}
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 25809961028..da7b6c2fadd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -44,6 +44,7 @@
#include "postmaster/startup.h"
#include "replication/logical.h"
#include "replication/slot.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -295,6 +296,7 @@ static TimeLineID curFileTLI;
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
+XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -6212,6 +6214,11 @@ StartupXLOG(void)
StartupMultiXact();
/*
+ * Recover knowledge about replay progress of known replication partners.
+ */
+ StartupReplicationOrigin();
+
+ /*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
* the unlogged LSN counter can be reset too.
@@ -8394,6 +8401,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
+ CheckPointReplicationOrigin();
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 618f8792f89..0cdb6af052d 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -26,6 +26,7 @@
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "miscadmin.h"
+#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "utils/memutils.h"
@@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint32 mainrdata_len; /* total # of bytes in chain */
+/* Should te in-progress insertion log the origin */
+static bool include_origin = false;
+
/*
* These are used to hold the record header while constructing a record.
* 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@@ -83,10 +87,12 @@ static uint32 mainrdata_len; /* total # of bytes in chain */
static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;
+#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+
#define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
- SizeOfXLogRecordDataHeaderLong)
+ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
/*
* An array of XLogRecData structs, to hold registered data.
@@ -193,6 +199,7 @@ XLogResetInsertion(void)
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
+ include_origin = false;
begininsert_called = false;
}
@@ -375,6 +382,16 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
}
/*
+ * Should this record include the replication origin if one is set up?
+ */
+void
+XLogIncludeOrigin(void)
+{
+ Assert(begininsert_called);
+ include_origin = true;
+}
+
+/*
* Insert an XLOG record having the specified RMID and info bytes, with the
* body of the record being the data and buffer references registered earlier
* with XLogRegister* calls.
@@ -678,6 +695,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
scratch += sizeof(BlockNumber);
}
+ /* followed by the record's origin, if any */
+ if (include_origin && replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ *(scratch++) = XLR_BLOCK_ID_ORIGIN;
+ memcpy(scratch, &replorigin_sesssion_origin, sizeof(replorigin_sesssion_origin));
+ scratch += sizeof(replorigin_sesssion_origin);
+ }
+
/* followed by main data, if any */
if (mainrdata_len > 0)
{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 77be1b8ef3c..3661e7229aa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -21,6 +21,7 @@
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
+#include "replication/origin.h"
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
@@ -975,6 +976,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
ResetDecoder(state);
state->decoded_record = record;
+ state->record_origin = InvalidRepOriginId;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
@@ -1009,6 +1011,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
break; /* by convention, the main data fragment is
* always last */
}
+ else if (block_id == XLR_BLOCK_ID_ORIGIN)
+ {
+ COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
+ }
else if (block_id <= XLR_MAX_BLOCK_ID)
{
/* XLogRecordBlockHeader */
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index c73f20d6a5e..37d05d1acc6 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
- pg_foreign_table.h pg_policy.h \
+ pg_foreign_table.h pg_policy.h pg_replication_origin.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
pg_transform.h \
toasting.h indexing.h \
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index e9d3cdcc9d4..fa2aa27eff3 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_origin.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
@@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId)
relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId ||
- relationId == DbRoleSettingRelationId)
+ relationId == DbRoleSettingRelationId ||
+ relationId == ReplicationOriginRelationId)
return true;
/* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId ||
@@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId ||
- relationId == DbRoleSettingDatidRolidIndexId)
+ relationId == DbRoleSettingDatidRolidIndexId ||
+ relationId == ReplicationOriginIdentIndex ||
+ relationId == ReplicationOriginNameIndex)
return true;
/* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4c35ef43496..2ad01f4cb41 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -778,6 +778,13 @@ CREATE VIEW pg_user_mappings AS
REVOKE ALL on pg_user_mapping FROM public;
+
+CREATE VIEW pg_replication_origin_status AS
+ SELECT *
+ FROM pg_show_replication_origin_status();
+
+REVOKE ALL ON pg_replication_origin_status FROM public;
+
--
-- We have a few function definitions in here, too.
-- At some point there might be enough to justify breaking them out into
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 310a45c5c05..8adea13bf4e 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+ snapbuild.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index eb7293f2f33..88424964ef3 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -40,6 +40,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
@@ -131,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
+ case RM_REPLORIGIN_ID:
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@@ -422,6 +424,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Consolidated commit record handling between the different form of commit
* records.
@@ -430,8 +441,17 @@ static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid)
{
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ XLogRecPtr commit_time = InvalidXLogRecPtr;
+ XLogRecPtr origin_id = InvalidRepOriginId;
int i;
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ commit_time = parsed->origin_timestamp;
+ }
+
/*
* Process invalidation messages, even if we're not interested in the
* transaction's contents, since the various caches need to always be
@@ -452,12 +472,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions
* if not.
*
- * There basically two reasons we might not be interested in this
+ * There can be several reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
* LSN. This can happen because we previously decoded it and now just
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
*
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
@@ -472,7 +493,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
@@ -492,7 +514,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- parsed->xact_time);
+ commit_time, origin_id, origin_lsn);
}
/*
@@ -537,8 +559,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -579,8 +606,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -628,8 +660,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
@@ -673,6 +710,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (rnode.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
data = tupledata;
@@ -685,6 +726,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 774ebbc749c..45d143686ac 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -39,6 +39,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "storage/proc.h"
@@ -720,6 +721,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+bool
+filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "shutdown";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
new file mode 100644
index 00000000000..ab9ae0b6c2f
--- /dev/null
+++ b/src/backend/replication/logical/origin.c
@@ -0,0 +1,1485 @@
+/*-------------------------------------------------------------------------
+ *
+ * origin.c
+ * Logical replication progress tracking support.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/origin.c
+ *
+ * NOTES
+ *
+ * This file provides the following:
+ * * An infrastructure to name nodes in a replication setup
+ * * A facility to efficiently store and persist replication progress in a
+ * efficient and durable manner.
+ *
+ * Replication origin consist out of a descriptive, user defined, external
+ * name and a short, thus space efficient, internal 2 byte one. This split
+ * exists because replication origin have to be stored in WAL and shared
+ * memory and long descriptors would be inefficient. For now only use 2 bytes
+ * for the internal id of a replication origin as it seems unlikely that there
+ * soon will be more than 65k nodes in one replication setup; and using only
+ * two bytes allow us to be more space efficient.
+ *
+ * Replication progress is tracked in a shared memory table
+ * (ReplicationStates) that's dumped to disk every checkpoint. Entries
+ * ('slots') in this table are identified by the internal id. That's the case
+ * because it allows to increase replication progress during crash
+ * recovery. To allow doing so we store the original LSN (from the originating
+ * system) of a transaction in the commit record. That allows to recover the
+ * precise replayed state after crash recovery; without requiring synchronous
+ * commits. Allowing logical replication to use asynchronous commit is
+ * generally good for performance, but especially important as it allows a
+ * single threaded replay process to keep up with a source that has multiple
+ * backends generating changes concurrently. For efficiency and simplicity
+ * reasons a backend can setup one replication origin that's from then used as
+ * the source of changes produced by the backend, until reset again.
+ *
+ * This infrastructure is intended to be used in cooperation with logical
+ * decoding. When replaying from a remote system the configured origin is
+ * provided to output plugins, allowing prevention of replication loops and
+ * other filtering.
+ *
+ * There are several levels of locking at work:
+ *
+ * * To create and drop replication origins a exclusive lock on
+ * pg_replication_slot is required for the duration. That allows us to
+ * safely and conflict free assign new origins using a dirty snapshot.
+ *
+ * * When creating a in-memory replication progress slot the ReplicationOirgin
+ * LWLock has to be held exclusively; when iterating over the replication
+ * progress a shared lock has to be held, the same when advancing the
+ * replication progress of a individual backend that has not setup as the
+ * session's replication origin.
+ *
+ * * When manipulating or looking at the remote_lsn and local_lsn fields of a
+ * replication progress slot that slot's lwlock has to be held. That's
+ * primarily because we do not assume 8 byte writes (the LSN) is atomic on
+ * all our platforms, but it also simplifies memory ordering concerns
+ * between the remote and local lsn. We use a lwlock instead of a spinlock
+ * so it's less harmful to hold the lock over a WAL write
+ * (c.f. AdvanceReplicationProgress).
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/origin.h"
+#include "replication/logical.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/copydir.h"
+
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/pg_lsn.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+ /*
+ * Local identifier for the remote node.
+ */
+ RepOriginId roident;
+
+ /*
+ * Location of the latest commit from the remote side.
+ */
+ XLogRecPtr remote_lsn;
+
+ /*
+ * Remember the local lsn of the commit record so we can XLogFlush() to it
+ * during a checkpoint so we know the commit record actually is safe on
+ * disk.
+ */
+ XLogRecPtr local_lsn;
+
+ /*
+ * Slot is setup in backend?
+ */
+ pid_t acquired_by;
+
+ /*
+ * Lock protecting remote_lsn and local_lsn.
+ */
+ LWLock lock;
+} ReplicationState;
+
+/*
+ * On disk version of ReplicationState.
+ */
+typedef struct ReplicationStateOnDisk
+{
+ RepOriginId roident;
+ XLogRecPtr remote_lsn;
+} ReplicationStateOnDisk;
+
+
+typedef struct ReplicationStateCtl
+{
+ int tranche_id;
+ LWLockTranche tranche;
+ ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
+} ReplicationStateCtl;
+
+/* external variables */
+RepOriginId replorigin_sesssion_origin = InvalidRepOriginId; /* assumed identity */
+XLogRecPtr replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+TimestampTz replorigin_sesssion_origin_timestamp = 0;
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_replication_slots.
+ *
+ * XXX: Should we use a separate variable to size this rather than
+ * max_replication_slots?
+ */
+static ReplicationState *replication_states;
+static ReplicationStateCtl *replication_states_ctl;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepOriginId.
+ */
+static ReplicationState *session_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
+
+static void
+replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
+{
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("only superusers can query or manipulate replication origins")));
+
+ if (check_slots && max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
+
+ if (!recoveryOK && RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("cannot manipulate replication origins during recovery")));
+
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for working with replication origins themselves.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Check for a persistent replication origin identified by name.
+ *
+ * Returns InvalidOid if the node isn't known yet and missing_ok is true.
+ */
+RepOriginId
+replorigin_by_name(char *roname, bool missing_ok)
+{
+ Form_pg_replication_origin ident;
+ Oid roident = InvalidOid;
+ HeapTuple tuple;
+ Datum roname_d;
+
+ roname_d = CStringGetTextDatum(roname);
+
+ tuple = SearchSysCache1(REPLORIGNAME, roname_d);
+ if (HeapTupleIsValid(tuple))
+ {
+ ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
+ roident = ident->roident;
+ ReleaseSysCache(tuple);
+ }
+ else if (!missing_ok)
+ elog(ERROR, "cache lookup failed for replication origin '%s'",
+ roname);
+
+ return roident;
+}
+
+/*
+ * Create a replication origin.
+ *
+ * Needs to be called in a transaction.
+ */
+RepOriginId
+replorigin_create(char *roname)
+{
+ Oid roident;
+ HeapTuple tuple = NULL;
+ Relation rel;
+ Datum roname_d;
+ SnapshotData SnapshotDirty;
+ SysScanDesc scan;
+ ScanKeyData key;
+
+ roname_d = CStringGetTextDatum(roname);
+
+ Assert(IsTransactionState());
+
+ /*
+ * We need the numeric replication origin to be 16bit wide, so we cannot
+ * rely on the normal oid allocation. Instead we simply scan
+ * pg_replication_origin for the first unused id. That's not particularly
+ * efficient, but this should be an fairly infrequent operation - we can
+ * easily spend a bit more code on this when it turns out it needs to be
+ * faster.
+ *
+ * We handle concurrency by taking an exclusive lock (allowing reads!)
+ * over the table for the duration of the search. Because we use a "dirty
+ * snapshot" we can read rows that other in-progress sessions have
+ * written, even though they would be invisible with normal snapshots. Due
+ * to the exclusive lock there's no danger that new rows can appear while
+ * we're checking.
+ */
+ InitDirtySnapshot(SnapshotDirty);
+
+ rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+
+ for (roident = InvalidOid + 1; roident < UINT16_MAX; roident++)
+ {
+ bool nulls[Natts_pg_replication_origin];
+ Datum values[Natts_pg_replication_origin];
+ bool collides;
+ CHECK_FOR_INTERRUPTS();
+
+ ScanKeyInit(&key,
+ Anum_pg_replication_origin_roident,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(roident));
+
+ scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
+ true /* indexOK */,
+ &SnapshotDirty,
+ 1, &key);
+
+ collides = HeapTupleIsValid(systable_getnext(scan));
+
+ systable_endscan(scan);
+
+ if (!collides)
+ {
+ /*
+ * Ok, found an unused roident, insert the new row and do a CCI,
+ * so our callers can look it up if they want to.
+ */
+ memset(&nulls, 0, sizeof(nulls));
+
+ values[Anum_pg_replication_origin_roident -1] = ObjectIdGetDatum(roident);
+ values[Anum_pg_replication_origin_roname - 1] = roname_d;
+
+ tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+ simple_heap_insert(rel, tuple);
+ CatalogUpdateIndexes(rel, tuple);
+ CommandCounterIncrement();
+ break;
+ }
+ }
+
+ /* now release lock again, */
+ heap_close(rel, ExclusiveLock);
+
+ if (tuple == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no free replication oid could be found")));
+
+ heap_freetuple(tuple);
+ return roident;
+}
+
+
+/*
+ * Drop replication origin.
+ *
+ * Needs to be called in a transaction.
+ */
+void
+replorigin_drop(RepOriginId roident)
+{
+ HeapTuple tuple = NULL;
+ Relation rel;
+ int i;
+
+ Assert(IsTransactionState());
+
+ rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+
+ /* cleanup the slot state info */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state = &replication_states[i];
+
+ /* found our slot */
+ if (state->roident == roident)
+ {
+ if (state->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("cannot drop replication origin with oid %d, in use by pid %d",
+ state->roident,
+ state->acquired_by)));
+ }
+
+ /* first WAL log */
+ {
+ xl_replorigin_drop xlrec;
+
+ xlrec.node_id = roident;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+ XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
+ }
+
+ /* then reset the in-memory entry */
+ state->roident = InvalidRepOriginId;
+ state->remote_lsn = InvalidXLogRecPtr;
+ state->local_lsn = InvalidXLogRecPtr;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationOriginLock);
+
+ tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
+ simple_heap_delete(rel, &tuple->t_self);
+ ReleaseSysCache(tuple);
+
+ CommandCounterIncrement();
+
+ /* now release lock again, */
+ heap_close(rel, ExclusiveLock);
+}
+
+
+/*
+ * Lookup replication origin via it's oid and return the name.
+ *
+ * The external name is palloc'd in the calling context.
+ *
+ * Returns true if the origin is known, false otherwise.
+ */
+bool
+replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
+{
+ HeapTuple tuple;
+ Form_pg_replication_origin ric;
+
+ Assert(OidIsValid((Oid) roident));
+ Assert(roident != InvalidRepOriginId);
+ Assert(roident != DoNotReplicateId);
+
+ tuple = SearchSysCache1(REPLORIGIDENT,
+ ObjectIdGetDatum((Oid) roident));
+
+ if (HeapTupleIsValid(tuple))
+ {
+ ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
+ *roname = text_to_cstring(&ric->roname);
+ ReleaseSysCache(tuple);
+
+ return true;
+ }
+ else
+ {
+ *roname = NULL;
+
+ if (!missing_ok)
+ elog(ERROR, "cache lookup failed for replication origin with oid %u",
+ roident);
+
+ return false;
+ }
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for handling replication progress.
+ * ---------------------------------------------------------------------------
+ */
+
+Size
+ReplicationOriginShmemSize(void)
+{
+ Size size = 0;
+
+ /*
+ * XXX: max_replication_slots is arguablethe wrong thing to use here, here
+ * we keep the replay state of *remote* transactions. But for now it seems
+ * sufficient to reuse it, lest we introduce a separate guc.
+ */
+ if (max_replication_slots == 0)
+ return size;
+
+ size = add_size(size, offsetof(ReplicationStateCtl, states));
+
+ size = add_size(size,
+ mul_size(max_replication_slots, sizeof(ReplicationState)));
+ return size;
+}
+
+void
+ReplicationOriginShmemInit(void)
+{
+ bool found;
+
+ if (max_replication_slots == 0)
+ return;
+
+ replication_states_ctl = (ReplicationStateCtl *)
+ ShmemInitStruct("ReplicationOriginState",
+ ReplicationOriginShmemSize(),
+ &found);
+ replication_states = replication_states_ctl->states;
+
+ if (!found)
+ {
+ int i;
+
+ replication_states_ctl->tranche_id = LWLockNewTrancheId();
+ replication_states_ctl->tranche.name = "ReplicationOrigins";
+ replication_states_ctl->tranche.array_base =
+ &replication_states[0].lock;
+ replication_states_ctl->tranche.array_stride =
+ sizeof(ReplicationState);
+
+ MemSet(replication_states, 0, ReplicationOriginShmemSize());
+
+ for (i = 0; i < max_replication_slots; i++)
+ LWLockInitialize(&replication_states[i].lock,
+ replication_states_ctl->tranche_id);
+ }
+
+ LWLockRegisterTranche(replication_states_ctl->tranche_id,
+ &replication_states_ctl->tranche);
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of each replication origin's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+------------------------+------------------+-----+--------+
+ * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
+ * +-------+------------------------+------------------+-----+--------+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStateOnDisk structs. Note that the maximum number of
+ * ReplicationStates is determined by max_replication_slots.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationOrigin(void)
+{
+ const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
+ const char *path = "pg_logical/replorigin_checkpoint";
+ int tmpfd;
+ int i;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ pg_crc32c crc;
+
+ if (max_replication_slots == 0)
+ return;
+
+ INIT_CRC32C(crc);
+
+ /* make sure no old temp file is remaining */
+ if (unlink(tmppath) < 0 && errno != ENOENT)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m",
+ path)));
+
+ /*
+ * no other backend can perform this at the same time, we're protected by
+ * CheckpointLock.
+ */
+ tmpfd = OpenTransientFile((char *) tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (tmpfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ tmppath)));
+
+ /* write magic */
+ if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+ COMP_CRC32C(crc, &magic, sizeof(magic));
+
+ /* prevent concurrent creations/drops */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ /* write actual data */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationStateOnDisk disk_state;
+ ReplicationState *curstate = &replication_states[i];
+ XLogRecPtr local_lsn;
+
+ if (curstate->roident == InvalidRepOriginId)
+ continue;
+
+ LWLockAcquire(&curstate->lock, LW_SHARED);
+
+ disk_state.roident = curstate->roident;
+
+ disk_state.remote_lsn = curstate->remote_lsn;
+ local_lsn = curstate->local_lsn;
+
+ LWLockRelease(&curstate->lock);
+
+ /* make sure we only write out a commit that's persistent */
+ XLogFlush(local_lsn);
+
+ if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
+ sizeof(disk_state))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+
+ COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+
+ /* write out the CRC */
+ FIN_CRC32C(crc);
+ if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+
+ /* fsync the temporary file */
+ if (pg_fsync(tmpfd) != 0)
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(tmpfd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ fsync_fname((char *) path, false);
+ fsync_fname("pg_logical", true);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationOrigin.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationOrigin(void)
+{
+ const char *path = "pg_logical/replorigin_checkpoint";
+ int fd;
+ int readBytes;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ int last_state = 0;
+ pg_crc32c file_crc;
+ pg_crc32c crc;
+
+ /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+ static bool already_started = false;
+ Assert(!already_started);
+ already_started = true;
+#endif
+
+ if (max_replication_slots == 0)
+ return;
+
+ INIT_CRC32C(crc);
+
+ elog(DEBUG2, "starting up replication origin progress state");
+
+ fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * might have had max_replication_slots == 0 last run, or we just brought up a
+ * standby.
+ */
+ if (fd < 0 && errno == ENOENT)
+ return;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+
+ /* verify magic, thats written even if nothing was active */
+ readBytes = read(fd, &magic, sizeof(magic));
+ if (readBytes != sizeof(magic))
+ ereport(PANIC,
+ (errmsg("could not read file \"%s\": %m",
+ path)));
+ COMP_CRC32C(crc, &magic, sizeof(magic));
+
+ if (magic != REPLICATION_STATE_MAGIC)
+ ereport(PANIC,
+ (errmsg("replication checkpoint has wrong magic %u instead of %u",
+ magic, REPLICATION_STATE_MAGIC)));
+
+ /* we can skip locking here, no other access is possible */
+
+ /* recover individual states, until there are no more to be found */
+ while (true)
+ {
+ ReplicationStateOnDisk disk_state;
+
+ readBytes = read(fd, &disk_state, sizeof(disk_state));
+
+ /* no further data */
+ if (readBytes == sizeof(crc))
+ {
+ /* not pretty, but simple ... */
+ file_crc = *(pg_crc32c*) &disk_state;
+ break;
+ }
+
+ if (readBytes < 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ path)));
+ }
+
+ if (readBytes != sizeof(disk_state))
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(disk_state))));
+ }
+
+ COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+
+ if (last_state == max_replication_slots)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found, increase max_replication_slots")));
+
+ /* copy data to shared memory */
+ replication_states[last_state].roident = disk_state.roident;
+ replication_states[last_state].remote_lsn = disk_state.remote_lsn;
+ last_state++;
+
+ elog(LOG, "recovered replication state of node %u to %X/%X",
+ disk_state.roident,
+ (uint32)(disk_state.remote_lsn >> 32),
+ (uint32)disk_state.remote_lsn);
+ }
+
+ /* now check checksum */
+ FIN_CRC32C(crc);
+ if (file_crc != crc)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
+ crc, file_crc)));
+
+ CloseTransientFile(fd);
+}
+
+void
+replorigin_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ {
+ xl_replorigin_set *xlrec =
+ (xl_replorigin_set *) XLogRecGetData(record);
+
+ replorigin_advance(xlrec->node_id,
+ xlrec->remote_lsn, record->EndRecPtr,
+ xlrec->force /* backward */,
+ false /* WAL log */);
+ break;
+ }
+ case XLOG_REPLORIGIN_DROP:
+ {
+ xl_replorigin_drop *xlrec;
+ int i;
+
+ xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state = &replication_states[i];
+
+ /* found our slot */
+ if (state->roident == xlrec->node_id)
+ {
+ /* reset entry */
+ state->roident = InvalidRepOriginId;
+ state->remote_lsn = InvalidXLogRecPtr;
+ state->local_lsn = InvalidXLogRecPtr;
+ break;
+ }
+ }
+ break;
+ }
+ default:
+ elog(PANIC, "replorigin_redo: unknown op code %u", info);
+ }
+}
+
+
+/*
+ * Tell the replication origin progress machinery that a commit from 'node'
+ * that originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replorigin_sesssion_origin_lsn and replorigin_sesssion_origin that ensures we
+ * won't loose knowledge about that after a crash if the the transaction had a
+ * persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ *
+ * Needs to be called with a RowExclusiveLock on pg_replication_origin,
+ * unless running in recovery.
+ */
+void
+replorigin_advance(RepOriginId node,
+ XLogRecPtr remote_commit, XLogRecPtr local_commit,
+ bool go_backward, bool wal_log)
+{
+ int i;
+ ReplicationState *replication_state = NULL;
+ ReplicationState *free_state = NULL;
+
+ Assert(node != InvalidRepOriginId);
+
+ /* we don't track DoNotReplicateId */
+ if (node == DoNotReplicateId)
+ return;
+
+ /*
+ * XXX: For the case where this is called by WAL replay, it'd be more
+ * efficient to restore into a backend local hashtable and only dump into
+ * shmem after recovery is finished. Let's wait with implementing that
+ * till it's shown to be a measurable expense
+ */
+
+ /* Lock exclusively, as we may have to create a new table entry. */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ /*
+ * Search for either an existing slot for the origin, or a free one we can
+ * use.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *curstate = &replication_states[i];
+
+ /* remember where to insert if necessary */
+ if (curstate->roident == InvalidRepOriginId &&
+ free_state == NULL)
+ {
+ free_state = curstate;
+ continue;
+ }
+
+ /* not our slot */
+ if (curstate->roident != node)
+ {
+ continue;
+ }
+
+ /* ok, found slot */
+ replication_state = curstate;
+
+ LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
+
+ /* Make sure it's not used by somebody else */
+ if (replication_state->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with oid %d is already active for pid %d",
+ replication_state->roident,
+ replication_state->acquired_by)));
+ }
+
+ break;
+ }
+
+ if (replication_state == NULL && free_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state slot could be found for replication origin with oid %u",
+ node),
+ errhint("Increase max_replication_slots and try again.")));
+
+ if (replication_state == NULL)
+ {
+ /* initialize new slot */
+ LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
+ replication_state = free_state;
+ Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ replication_state->roident = node;
+ }
+
+ Assert(replication_state->roident != InvalidRepOriginId);
+
+ /*
+ * If somebody "forcefully" sets this slot, WAL log it, so it's durable
+ * and the standby gets the message. Primarily this will be called during
+ * WAL replay (of commit records) where no WAL logging is necessary.
+ */
+ if (wal_log)
+ {
+ xl_replorigin_set xlrec;
+ xlrec.remote_lsn = remote_commit;
+ xlrec.node_id = node;
+ xlrec.force = go_backward;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+
+ XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
+ }
+
+ /*
+ * Due to - harmless - race conditions during a checkpoint we could see
+ * values here that are older than the ones we already have in
+ * memory. Don't overwrite those.
+ */
+ if (go_backward || replication_state->remote_lsn < remote_commit)
+ replication_state->remote_lsn = remote_commit;
+ if (local_commit != InvalidXLogRecPtr &&
+ (go_backward || replication_state->local_lsn < local_commit))
+ replication_state->local_lsn = local_commit;
+ LWLockRelease(&replication_state->lock);
+
+ /*
+ * Release *after* changing the LSNs, slot isn't acquired and thus could
+ * otherwise be dropped anytime.
+ */
+ LWLockRelease(ReplicationOriginLock);
+}
+
+
+XLogRecPtr
+replorigin_get_progress(RepOriginId node, bool flush)
+{
+ int i;
+ XLogRecPtr local_lsn = InvalidXLogRecPtr;
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+
+ /* prevent slots from being concurrently dropped */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state;
+
+ state = &replication_states[i];
+
+ if (state->roident == node)
+ {
+ LWLockAcquire(&state->lock, LW_SHARED);
+
+ remote_lsn = state->remote_lsn;
+ local_lsn = state->local_lsn;
+
+ LWLockRelease(&state->lock);
+
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+
+ if (flush && local_lsn != InvalidXLogRecPtr)
+ XLogFlush(local_lsn);
+
+ return remote_lsn;
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ if (session_replication_state != NULL &&
+ session_replication_state->acquired_by == MyProcPid)
+ {
+ session_replication_state->acquired_by = 0;
+ session_replication_state = NULL;
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Setup a replication origin in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * replorigin_session_advance().
+ *
+ * Obviously only one such cached origin can exist per process and the current
+ * cached value can only be set again after the previous value is torn down
+ * with replorigin_session_reset().
+ */
+void
+replorigin_session_setup(RepOriginId node)
+{
+ static bool registered_cleanup;
+ int i;
+ int free_slot = -1;
+
+ if (!registered_cleanup)
+ {
+ on_shmem_exit(ReplicationOriginExitCleanup, 0);
+ registered_cleanup = true;
+ }
+
+ Assert(max_replication_slots > 0);
+
+ if (session_replication_state != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot setup replication origin when one is already setup")));
+
+ /* Lock exclusively, as we may have to create a new table entry. */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ /*
+ * Search for either an existing slot for the origin, or a free one we can
+ * use.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *curstate = &replication_states[i];
+
+ /* remember where to insert if necessary */
+ if (curstate->roident == InvalidRepOriginId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (curstate->roident != node)
+ continue;
+
+ else if (curstate->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication identiefer %d is already active for pid %d",
+ curstate->roident, curstate->acquired_by)));
+ }
+
+ /* ok, found slot */
+ session_replication_state = curstate;
+ }
+
+
+ if (session_replication_state == NULL && free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state slot could be found for replication origin with oid %u",
+ node),
+ errhint("Increase max_replication_slots and try again.")));
+ else if (session_replication_state == NULL)
+ {
+ /* initialize new slot */
+ session_replication_state = &replication_states[free_slot];
+ Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
+ session_replication_state->roident = node;
+ }
+
+
+ Assert(session_replication_state->roident != InvalidRepOriginId);
+
+ session_replication_state->acquired_by = MyProcPid;
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Reset replay state previously setup in this session.
+ *
+ * This function may only be called if a origin was setup with
+ * replorigin_session_setup().
+ */
+void
+replorigin_session_reset(void)
+{
+ Assert(max_replication_slots != 0);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ session_replication_state->acquired_by = 0;
+ session_replication_state = NULL;
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Do the same work replorigin_advance() does, just on the session's
+ * configured origin.
+ *
+ * This is noticeably cheaper than using replorigin_advance().
+ */
+void
+replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
+{
+ Assert(session_replication_state != NULL);
+ Assert(session_replication_state->roident != InvalidRepOriginId);
+
+ LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
+ if (session_replication_state->local_lsn < local_commit)
+ session_replication_state->local_lsn = local_commit;
+ if (session_replication_state->remote_lsn < remote_commit)
+ session_replication_state->remote_lsn = remote_commit;
+ LWLockRelease(&session_replication_state->lock);
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup replication origin.
+ */
+XLogRecPtr
+replorigin_session_get_progress(bool flush)
+{
+ XLogRecPtr remote_lsn;
+ XLogRecPtr local_lsn;
+
+ Assert(session_replication_state != NULL);
+
+ LWLockAcquire(&session_replication_state->lock, LW_SHARED);
+ remote_lsn = session_replication_state->remote_lsn;
+ local_lsn = session_replication_state->local_lsn;
+ LWLockRelease(&session_replication_state->lock);
+
+ if (flush && local_lsn != InvalidXLogRecPtr)
+ XLogFlush(local_lsn);
+
+ return remote_lsn;
+}
+
+
+
+/* ---------------------------------------------------------------------------
+ * SQL functions for working with replication origin.
+ *
+ * These mostly should be fairly short wrappers around more generic functions.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Create replication origin for the passed in name, and return the assigned
+ * oid.
+ */
+Datum
+pg_replication_origin_create(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ roident = replorigin_create(name);
+
+ pfree(name);
+
+ PG_RETURN_OID(roident);
+}
+
+/*
+ * Drop replication origin.
+ */
+Datum
+pg_replication_origin_drop(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+
+ roident = replorigin_by_name(name, false);
+ Assert(OidIsValid(roident));
+
+ replorigin_drop(roident);
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Return oid of a replication origin.
+ */
+Datum
+pg_replication_origin_oid(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ roident = replorigin_by_name(name, true);
+
+ pfree(name);
+
+ if (OidIsValid(roident))
+ PG_RETURN_OID(roident);
+ PG_RETURN_NULL();
+}
+
+/*
+ * Setup a replication origin for this session.
+ */
+Datum
+pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId origin;
+
+ replorigin_check_prerequisites(true, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ origin = replorigin_by_name(name, false);
+ replorigin_session_setup(origin);
+
+ replorigin_sesssion_origin = origin;
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Reset previously setup origin in this session
+ */
+Datum
+pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(true, false);
+
+ replorigin_session_reset();
+
+ /* FIXME */
+ replorigin_sesssion_origin = InvalidRepOriginId;
+ replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+ replorigin_sesssion_origin_timestamp = 0;
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Has a replication origin been setup for this session.
+ */
+Datum
+pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(false, false);
+
+ PG_RETURN_BOOL(replorigin_sesssion_origin != InvalidRepOriginId);
+}
+
+
+/*
+ * Return the replication progress for origin setup in the current session.
+ *
+ * If 'flush' is set to true it is ensured that the returned value corresponds
+ * to a local transaction that has been flushed. this is useful if asychronous
+ * commits are used when replaying replicated transactions.
+ */
+Datum
+pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+ bool flush = PG_GETARG_BOOL(0);
+
+ replorigin_check_prerequisites(true, false);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ remote_lsn = replorigin_session_get_progress(flush);
+
+ if (remote_lsn == InvalidXLogRecPtr)
+ PG_RETURN_NULL();
+
+ PG_RETURN_LSN(remote_lsn);
+}
+
+Datum
+pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr location = PG_GETARG_LSN(0);
+
+ replorigin_check_prerequisites(true, false);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ replorigin_sesssion_origin_lsn = location;
+ replorigin_sesssion_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(true, false);
+
+ replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+ replorigin_sesssion_origin_timestamp = 0;
+
+ PG_RETURN_VOID();
+}
+
+
+Datum
+pg_replication_origin_advance(PG_FUNCTION_ARGS)
+{
+ text *name = PG_GETARG_TEXT_P(0);
+ XLogRecPtr remote_commit = PG_GETARG_LSN(1);
+ RepOriginId node;
+
+ replorigin_check_prerequisites(true, false);
+
+ /* lock to prevent the replication origin from vanishing */
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ node = replorigin_by_name(text_to_cstring(name), false);
+
+ /*
+ * Can't sensibly pass a local commit to be flushed at checkpoint - this
+ * xact hasn't committed yet. This is why this function should be used to
+ * set up the intial replication state, but not for replay.
+ */
+ replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
+ true /* go backward */, true /* wal log */);
+
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * Return the replication progress for an individual replication origin.
+ *
+ * If 'flush' is set to true it is ensured that the returned value corresponds
+ * to a local transaction that has been flushed. this is useful if asychronous
+ * commits are used when replaying replicated transactions.
+ */
+Datum
+pg_replication_origin_progress(PG_FUNCTION_ARGS)
+{
+ char *name;
+ bool flush;
+ RepOriginId roident;
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+
+ replorigin_check_prerequisites(true, true);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ flush = PG_GETARG_BOOL(1);
+
+ roident = replorigin_by_name(name, false);
+ Assert(OidIsValid(roident));
+
+ remote_lsn = replorigin_get_progress(roident, flush);
+
+ if (remote_lsn == InvalidXLogRecPtr)
+ PG_RETURN_NULL();
+
+ PG_RETURN_LSN(remote_lsn);
+}
+
+
+Datum
+pg_show_replication_origin_status(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int i;
+#define REPLICATION_ORIGIN_PROGRESS_COLS 4
+
+ /* we we want to return 0 rows if slot is set to zero */
+ replorigin_check_prerequisites(false, true);
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
+ elog(ERROR, "wrong function definition");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+
+ /* prevent slots from being concurrently dropped */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ /*
+ * Iterate through all possible replication_states, display if they are
+ * filled. Note that we do not take any locks, so slightly corrupted/out
+ * of date values are a possibility.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state;
+ Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
+ bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
+ char *roname;
+
+ state = &replication_states[i];
+
+ /* unused slot, nothing to display */
+ if (state->roident == InvalidRepOriginId)
+ continue;
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, 1, sizeof(nulls));
+
+ values[0] = ObjectIdGetDatum(state->roident);
+ nulls[0] = false;
+
+ /*
+ * We're not preventing the origin to be dropped concurrently, so
+ * silently accept that it might be gone.
+ */
+ if (replorigin_by_oid(state->roident, true,
+ &roname))
+ {
+ values[1] = CStringGetTextDatum(roname);
+ nulls[1] = false;
+ }
+
+ LWLockAcquire(&state->lock, LW_SHARED);
+
+ values[ 2] = LSNGetDatum(state->remote_lsn);
+ nulls[2] = false;
+
+ values[3] = LSNGetDatum(state->local_lsn);
+ nulls[3] = false;
+
+ LWLockRelease(&state->lock);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ LWLockRelease(ReplicationOriginLock);
+
+#undef REPLICATION_ORIGIN_PROGRESS_COLS
+
+ return (Datum) 0;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index dc855830c4e..c9c1d1036e0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time)
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
volatile Snapshot snapshot_now;
@@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 16b98086868..32ac58f7d1a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
@@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, ReplicationSlotsShmemSize());
+ size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
CheckpointerShmemInit();
AutoVacuumShmemInit();
ReplicationSlotsShmemInit();
+ ReplicationOriginShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index 644bbcc167c..f58e1cebf2a 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_origin.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_transform.h"
@@ -621,6 +622,28 @@ static const struct cachedesc cacheinfo[] = {
},
128
},
+ {ReplicationOriginRelationId, /* REPLORIGIDENT */
+ ReplicationOriginIdentIndex,
+ 1,
+ {
+ Anum_pg_replication_origin_roident,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
+ {ReplicationOriginRelationId, /* REPLORIGNAME */
+ ReplicationOriginNameIndex,
+ 1,
+ {
+ Anum_pg_replication_origin_roname,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,