diff options
author | Andres Freund <andres@anarazel.de> | 2015-04-29 19:30:53 +0200 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2015-04-29 19:30:53 +0200 |
commit | 5aa2350426c4fdb3d04568b65aadac397012bbcb (patch) | |
tree | 954c3123dc58905bbda6407565383c65850204e7 /src/backend/replication/logical/decode.c | |
parent | c6e96a2f986e4dad72c14b14d4cc17d02b2a6aad (diff) | |
download | postgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.tar.gz postgresql-5aa2350426c4fdb3d04568b65aadac397012bbcb.zip |
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 49 |
1 files changed, 46 insertions, 3 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index eb7293f2f33..88424964ef3 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -40,6 +40,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/origin.h" #include "replication/snapbuild.h" #include "storage/standby.h" @@ -131,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: + case RM_REPLORIGIN_ID: break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -422,6 +424,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +static inline bool +FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_by_origin_cb == NULL) + return false; + + return filter_by_origin_cb_wrapper(ctx, origin_id); +} + /* * Consolidated commit record handling between the different form of commit * records. @@ -430,8 +441,17 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid) { + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + XLogRecPtr commit_time = InvalidXLogRecPtr; + XLogRecPtr origin_id = InvalidRepOriginId; int i; + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + /* * Process invalidation messages, even if we're not interested in the * transaction's contents, since the various caches need to always be @@ -452,12 +472,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There basically two reasons we might not be interested in this + * There can be several reasons we might not be interested in this * transaction: * 1) We might not be interested in decoding transactions up to this * LSN. This can happen because we previously decoded it and now just * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -472,7 +493,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * --- */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database)) + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { @@ -492,7 +514,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - parsed->xact_time); + commit_time, origin_id, origin_lsn); } /* @@ -537,8 +559,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -579,8 +606,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -628,8 +660,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); @@ -673,6 +710,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (rnode.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); data = tupledata; @@ -685,6 +726,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); /* |