aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c13
-rw-r--r--src/backend/replication/slot.c4
-rw-r--r--src/include/replication/slot.h13
3 files changed, 26 insertions, 4 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 13935915382..61588d626f6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e3..79d7a57d677 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de78..7964ae254f4 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
/*
* Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use flag is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot. The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
*/
typedef struct ReplicationSlot
{