diff options
author | Andres Freund <andres@anarazel.de> | 2015-04-29 19:30:53 +0200 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2015-04-29 19:30:53 +0200 |
commit | 5aa2350426c4fdb3d04568b65aadac397012bbcb (patch) | |
tree | 954c3123dc58905bbda6407565383c65850204e7 /src/backend/access/transam | |
parent | c6e96a2f986e4dad72c14b14d4cc17d02b2a6aad (diff) | |
download | postgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.tar.gz postgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.zip |
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/commit_ts.c | 53 | ||||
-rw-r--r-- | src/backend/access/transam/rmgr.c | 1 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 76 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 8 | ||||
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 27 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 6 |
6 files changed, 114 insertions, 57 deletions
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index dc23ab27b65..40042a5fd53 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -49,18 +49,18 @@ */ /* - * We need 8+4 bytes per xact. Note that enlarging this struct might mean + * We need 8+2 bytes per xact. Note that enlarging this struct might mean * the largest possible file name is more than 5 chars long; see * SlruScanDirectory. */ typedef struct CommitTimestampEntry { TimestampTz time; - CommitTsNodeId nodeid; + RepOriginId nodeid; } CommitTimestampEntry; #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ - sizeof(CommitTsNodeId)) + sizeof(RepOriginId)) #define COMMIT_TS_XACTS_PER_PAGE \ (BLCKSZ / SizeOfCommitTimestampEntry) @@ -93,43 +93,18 @@ CommitTimestampShared *commitTsShared; /* GUC variable */ bool track_commit_timestamp; -static CommitTsNodeId default_node_id = InvalidCommitTsNodeId; - static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, - CommitTsNodeId nodeid, int pageno); + RepOriginId nodeid, int pageno); static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, - CommitTsNodeId nodeid, int slotno); + RepOriginId nodeid, int slotno); static int ZeroCommitTsPage(int pageno, bool writeXlog); static bool CommitTsPagePrecedes(int page1, int page2); static void WriteZeroPageXlogRec(int pageno); static void WriteTruncateXlogRec(int pageno); static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - CommitTsNodeId nodeid); - - -/* - * CommitTsSetDefaultNodeId - * - * Set default nodeid for current backend. - */ -void -CommitTsSetDefaultNodeId(CommitTsNodeId nodeid) -{ - default_node_id = nodeid; -} - -/* - * CommitTsGetDefaultNodeId - * - * Set default nodeid for current backend. - */ -CommitTsNodeId -CommitTsGetDefaultNodeId(void) -{ - return default_node_id; -} + RepOriginId nodeid); /* * TransactionTreeSetCommitTsData @@ -156,7 +131,7 @@ CommitTsGetDefaultNodeId(void) void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - CommitTsNodeId nodeid, bool do_xlog) + RepOriginId nodeid, bool do_xlog) { int i; TransactionId headxid; @@ -234,7 +209,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, - CommitTsNodeId nodeid, int pageno) + RepOriginId nodeid, int pageno) { int slotno; int i; @@ -259,7 +234,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, */ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, - CommitTsNodeId nodeid, int slotno) + RepOriginId nodeid, int slotno) { int entryno = TransactionIdToCTsEntry(xid); CommitTimestampEntry entry; @@ -282,7 +257,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, */ bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, - CommitTsNodeId *nodeid) + RepOriginId *nodeid) { int pageno = TransactionIdToCTsPage(xid); int entryno = TransactionIdToCTsEntry(xid); @@ -322,7 +297,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, if (ts) *ts = 0; if (nodeid) - *nodeid = InvalidCommitTsNodeId; + *nodeid = InvalidRepOriginId; return false; } @@ -373,7 +348,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, * as NULL if not wanted. */ TransactionId -GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid) +GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) { TransactionId xid; @@ -503,7 +478,7 @@ CommitTsShmemInit(void) commitTsShared->xidLastCommit = InvalidTransactionId; TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); - commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId; + commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; } else Assert(found); @@ -857,7 +832,7 @@ WriteTruncateXlogRec(int pageno) static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - CommitTsNodeId nodeid) + RepOriginId nodeid) { xl_commit_ts_set record; diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index acd825fad4f..7c4d773ce0f 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -23,6 +23,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1495bb499f5..511bcbbc519 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -40,8 +40,10 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/walsender.h" #include "replication/syncrep.h" +#include "replication/origin.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -1073,21 +1075,27 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, InvalidTransactionId /* plain commit */); - } - /* - * We only need to log the commit timestamp separately if the node - * identifier is a valid value; the commit record above already contains - * the timestamp info otherwise, and will be used to load it. - */ - if (markXidCommitted) - { - CommitTsNodeId node_id; + /* + * Record plain commit ts if not replaying remote actions, or if no + * timestamp is configured. + */ + if (replorigin_sesssion_origin == InvalidRepOriginId || + replorigin_sesssion_origin == DoNotReplicateId || + replorigin_sesssion_origin_timestamp == 0) + replorigin_sesssion_origin_timestamp = xactStopTimestamp; + else + replorigin_session_advance(replorigin_sesssion_origin_lsn, + XactLastRecEnd); - node_id = CommitTsGetDefaultNodeId(); + /* + * We don't need to WAL log origin or timestamp here, the commit + * record contains all the necessary information and will redo the SET + * action during replay. + */ TransactionTreeSetCommitTsData(xid, nchildren, children, - xactStopTimestamp, - node_id, node_id != InvalidCommitTsNodeId); + replorigin_sesssion_origin_timestamp, + replorigin_sesssion_origin, false); } /* @@ -1176,9 +1184,11 @@ RecordTransactionCommit(void) if (wrote_xlog && markXidCommitted) SyncRepWaitForLSN(XactLastRecEnd); + /* remember end of last commit record */ + XactLastCommitEnd = XactLastRecEnd; + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; - cleanup: /* Clean up local data */ if (rels) @@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_relfilenodes xl_relfilenodes; xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; + xl_xact_origin xl_origin; uint8 info; @@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time, xl_twophase.xid = twophase_xid; } + /* dump transaction origin information */ + if (replorigin_sesssion_origin != InvalidRepOriginId) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_sesssion_origin_lsn; + xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp; + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + /* we allow filtering by xacts */ + XLogIncludeOrigin(); + return XLogInsert(RM_XACT_ID, info); } @@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time, static void xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + RepOriginId origin_id) { TransactionId max_xid; int i; + TimestampTz commit_time; max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); @@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, LWLockRelease(XidGenLock); } + Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId)); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + commit_time = parsed->origin_timestamp; + else + commit_time = parsed->xact_time; + /* Set the transaction commit timestamp and metadata */ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, - parsed->xact_time, InvalidCommitTsNodeId, + commit_time, origin_id, false); if (standbyState == STANDBY_DISABLED) @@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, StandbyReleaseLockTree(xid, 0, NULL); } + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + /* recover apply progress */ + replorigin_advance(origin_id, parsed->origin_lsn, lsn, + false /* backward */, false /* WAL */); + } + /* Make sure files supposed to be dropped are dropped */ if (parsed->nrels > 0) { @@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record) { Assert(!TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr); + record->EndRecPtr, XLogRecGetOrigin(record)); } else { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr); + record->EndRecPtr, XLogRecGetOrigin(record)); RemoveTwoPhaseFile(parsed.twophase_xid, false); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 25809961028..da7b6c2fadd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -44,6 +44,7 @@ #include "postmaster/startup.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -295,6 +296,7 @@ static TimeLineID curFileTLI; static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; +XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; /* * RedoRecPtr is this backend's local copy of the REDO record pointer @@ -6212,6 +6214,11 @@ StartupXLOG(void) StartupMultiXact(); /* + * Recover knowledge about replay progress of known replication partners. + */ + StartupReplicationOrigin(); + + /* * Initialize unlogged LSN. On a clean shutdown, it's restored from the * control file. On recovery, all unlogged relations are blown away, so * the unlogged LSN counter can be reset too. @@ -8394,6 +8401,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointBuffers(flags); /* performs all required fsyncs */ + CheckPointReplicationOrigin(); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 618f8792f89..0cdb6af052d 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -26,6 +26,7 @@ #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" #include "miscadmin.h" +#include "replication/origin.h" #include "storage/bufmgr.h" #include "storage/proc.h" #include "utils/memutils.h" @@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head; static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; static uint32 mainrdata_len; /* total # of bytes in chain */ +/* Should te in-progress insertion log the origin */ +static bool include_origin = false; + /* * These are used to hold the record header while constructing a record. * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, @@ -83,10 +87,12 @@ static uint32 mainrdata_len; /* total # of bytes in chain */ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; +#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) + #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) /* * An array of XLogRecData structs, to hold registered data. @@ -193,6 +199,7 @@ XLogResetInsertion(void) max_registered_block_id = 0; mainrdata_len = 0; mainrdata_last = (XLogRecData *) &mainrdata_head; + include_origin = false; begininsert_called = false; } @@ -375,6 +382,16 @@ XLogRegisterBufData(uint8 block_id, char *data, int len) } /* + * Should this record include the replication origin if one is set up? + */ +void +XLogIncludeOrigin(void) +{ + Assert(begininsert_called); + include_origin = true; +} + +/* * Insert an XLOG record having the specified RMID and info bytes, with the * body of the record being the data and buffer references registered earlier * with XLogRegister* calls. @@ -678,6 +695,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(BlockNumber); } + /* followed by the record's origin, if any */ + if (include_origin && replorigin_sesssion_origin != InvalidRepOriginId) + { + *(scratch++) = XLR_BLOCK_ID_ORIGIN; + memcpy(scratch, &replorigin_sesssion_origin, sizeof(replorigin_sesssion_origin)); + scratch += sizeof(replorigin_sesssion_origin); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 77be1b8ef3c..3661e7229aa 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -21,6 +21,7 @@ #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" +#include "replication/origin.h" static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); @@ -975,6 +976,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ResetDecoder(state); state->decoded_record = record; + state->record_origin = InvalidRepOriginId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1009,6 +1011,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) break; /* by convention, the main data fragment is * always last */ } + else if (block_id == XLR_BLOCK_ID_ORIGIN) + { + COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ |