aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/slotfuncs.c50
1 files changed, 36 insertions, 14 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb7..2806e1076ca 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
/*
* Helper function for advancing physical replication slot forward.
+ * The LSN position to move to is compared simply to the slot's
+ * restart_lsn, knowing that any position older than that would be
+ * removed by successive checkpoints.
*/
static XLogRecPtr
-pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_physical_replication_slot_advance(XLogRecPtr moveto)
{
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogRecPtr retlsn = startlsn;
- SpinLockAcquire(&MyReplicationSlot->mutex);
- if (MyReplicationSlot->data.restart_lsn < moveto)
+ if (startlsn < moveto)
{
+ SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.restart_lsn = moveto;
+ SpinLockRelease(&MyReplicationSlot->mutex);
retlsn = moveto;
}
- SpinLockRelease(&MyReplicationSlot->mutex);
return retlsn;
}
/*
* Helper function for advancing logical replication slot forward.
+ * The slot's restart_lsn is used as start point for reading records,
+ * while confirmed_lsn is used as base point for the decoding context.
+ * The LSN position to move to is checked by doing a per-record scan and
+ * logical decoding which makes sure that confirmed_lsn is updated to a
+ * LSN which allows the future slot consumer to get consistent logical
+ * changes.
*/
static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush;
PG_TRY();
{
@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader);
- /* check limits */
+ /* Stop once the moving point wanted by caller has been reached */
if (moveto <= ctx->reader->EndRecPtr)
break;
@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
Name slotname = PG_GETARG_NAME(0);
XLogRecPtr moveto = PG_GETARG_LSN(1);
XLogRecPtr endlsn;
- XLogRecPtr startlsn;
+ XLogRecPtr minlsn;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
- if (moveto < startlsn)
+ /*
+ * Check if the slot is not moving backwards. Physical slots rely simply
+ * on restart_lsn as a minimum point, while logical slots have confirmed
+ * consumption up to confirmed_lsn, meaning that in both cases data older
+ * than that is not available anymore.
+ */
+ if (OidIsValid(MyReplicationSlot->data.database))
+ minlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ minlsn = MyReplicationSlot->data.restart_lsn;
+
+ if (moveto < minlsn)
{
ReplicationSlotRelease();
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
(uint32) (moveto >> 32), (uint32) moveto,
- (uint32) (startlsn >> 32), (uint32) startlsn)));
+ (uint32) (minlsn >> 32), (uint32) minlsn)));
}
+ /* Do the actual slot update, depending on the slot type */
if (OidIsValid(MyReplicationSlot->data.database))
- endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+ endlsn = pg_logical_replication_slot_advance(moveto);
else
- endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+ endlsn = pg_physical_replication_slot_advance(moveto);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;