aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c41
1 files changed, 39 insertions, 2 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index a8d2e024d34..f1eb798f3e9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"
@@ -1825,10 +1827,26 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
+ XLogRecPtr restart_lsn pg_attribute_unused();
SpinLockAcquire(&MyReplicationSlot->mutex);
- MyReplicationSlot->data.confirmed_flush = lsn;
+ /* remember the old restart lsn */
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+
+ /*
+ * Prevent moving the confirmed_flush backwards, as this could lead to
+ * data duplication issues caused by replicating already replicated
+ * changes.
+ *
+ * This can happen when a client acknowledges an LSN it doesn't have
+ * to do anything for, and thus didn't store persistently. After a
+ * restart, the client can send the prior LSN that it stored
+ * persistently as an acknowledgement, but we need to ignore such an
+ * LSN. See similar case handling in CreateDecodingContext.
+ */
+ if (lsn > MyReplicationSlot->data.confirmed_flush)
+ MyReplicationSlot->data.confirmed_flush = lsn;
/* if we're past the location required for bumping xmin, do so */
if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -1869,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
+#ifdef USE_INJECTION_POINTS
+ XLogSegNo seg1,
+ seg2;
+
+ XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+ XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+ /* trigger injection point, but only if segment changes */
+ if (seg1 != seg2)
+ INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
@@ -1893,7 +1923,14 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
else
{
SpinLockAcquire(&MyReplicationSlot->mutex);
- MyReplicationSlot->data.confirmed_flush = lsn;
+
+ /*
+ * Prevent moving the confirmed_flush backwards. See comments above
+ * for the details.
+ */
+ if (lsn > MyReplicationSlot->data.confirmed_flush)
+ MyReplicationSlot->data.confirmed_flush = lsn;
+
SpinLockRelease(&MyReplicationSlot->mutex);
}
}