diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/logical.c | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a8d2e024d34..1d56d0c4ef3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1828,7 +1828,19 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + /* + * Prevent moving the confirmed_flush backwards, as this could lead to + * data duplication issues caused by replicating already replicated + * changes. + * + * This can happen when a client acknowledges an LSN it doesn't have + * to do anything for, and thus didn't store persistently. After a + * restart, the client can send the prior LSN that it stored + * persistently as an acknowledgement, but we need to ignore such an + * LSN. See similar case handling in CreateDecodingContext. + */ + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -1893,7 +1905,14 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) else { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + + /* + * Prevent moving the confirmed_flush backwards. See comments above + * for the details. + */ + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; + SpinLockRelease(&MyReplicationSlot->mutex); } } |