aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2018-07-19 14:15:44 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2018-07-19 14:15:44 -0400
commit1573995f55994ee04dd0d69481de17d662ad8e88 (patch)
tree4e6d3b8e298e3e0549a34abe53406540678cd6b0 /src
parent309765fa1e317597bfd341fa99dfa97ea5722890 (diff)
downloadpostgresql-1573995f55994ee04dd0d69481de17d662ad8e88.tar.gz
postgresql-1573995f55994ee04dd0d69481de17d662ad8e88.zip
Rewrite comments in replication slot advance implementation
The code added by 9c7d06d60680 was a bit obscure; clarify that by rewriting the comments. Lack of clarity has already caused bugs, so it's a worthy goal. Co-authored-by: Arseny Sher <a.sher@postgrespro.ru> Co-authored-by: Michaël Paquier <michael@paquier.xyz> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Petr Jelínek <petr.jelinek@2ndquadrant.com> Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c5
-rw-r--r--src/backend/replication/slotfuncs.c72
2 files changed, 50 insertions, 27 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c2d0e0c723f..3cd4eefb9bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
* that, see below).
*
* output_plugin_options
- * contains options passed to the output plugin.
+ * options passed to the output plugin.
+ *
+ * fast_forward
+ * bypass the generation of logical changes.
*
* read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6d7474a976c..8782bad4a21 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
}
/*
- * Helper function for advancing physical replication slot forward.
- * The LSN position to move to is compared simply to the slot's
- * restart_lsn, knowing that any position older than that would be
- * removed by successive checkpoints.
+ * Helper function for advancing our physical replication slot forward.
+ *
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
+ * knowing that any position older than that would be removed by successive
+ * checkpoints.
*/
static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr moveto)
@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
}
/*
- * Helper function for advancing logical replication slot forward.
+ * Helper function for advancing our logical replication slot forward.
+ *
* The slot's restart_lsn is used as start point for reading records,
* while confirmed_lsn is used as base point for the decoding context.
- * The LSN position to move to is checked by doing a per-record scan and
- * logical decoding which makes sure that confirmed_lsn is updated to a
- * LSN which allows the future slot consumer to get consistent logical
- * changes.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
*/
static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
- XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush;
+ XLogRecPtr startlsn;
+ XLogRecPtr retlsn;
PG_TRY();
{
- /* restart at slot's confirmed_flush */
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
+ * confirmed_flush.
+ */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
- true,
+ true, /* fast_forward */
logical_read_local_xlog_page,
NULL, NULL, NULL);
+ /*
+ * Start reading at the slot's restart_lsn, which we know to point to
+ * a valid record.
+ */
+ startlsn = MyReplicationSlot->data.restart_lsn;
+
+ /* Initialize our return value in case we don't do anything */
+ retlsn = MyReplicationSlot->data.confirmed_flush;
+
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
- /* Decode until we run out of records */
- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+ /* Decode at least one record, until we run out of records */
+ while ((!XLogRecPtrIsInvalid(startlsn) &&
+ startlsn < moveto) ||
+ (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
+ ctx->reader->EndRecPtr < moveto))
{
- XLogRecord *record;
char *errm = NULL;
+ XLogRecord *record;
+ /*
+ * Read records. No changes are generated in fast_forward mode,
+ * but snapbuilder/slot statuses are updated properly.
+ */
record = XLogReadRecord(ctx->reader, startlsn, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
+ /* Read sequentially from now on */
startlsn = InvalidXLogRecPtr;
/*
- * The {begin_txn,change,commit_txn}_wrapper callbacks above will
- * store the description into our tuplestore.
+ * Process the record. Storage-level changes are ignored in
+ * fast_forward mode, but other modules (such as snapbuilder)
+ * might still have critical updates to do.
*/
- if (record != NULL)
+ if (record)
LogicalDecodingProcessRecord(ctx, ctx->reader);
- /* Stop once the moving point wanted by caller has been reached */
+ /* Stop once the requested target has been reached */
if (moveto <= ctx->reader->EndRecPtr)
break;
@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
LogicalConfirmReceivedLocation(moveto);
/*
- * If only the confirmed_flush_lsn has changed the slot won't get
+ * If only the confirmed_flush LSN has changed the slot won't get
* marked as dirty by the above. Callers on the walsender
* interface are expected to keep track of their own progress and
* don't need it written out. But SQL-interface users cannot