diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/slotfuncs.c | 98 |
1 files changed, 45 insertions, 53 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 3a7cea54d59..947644b5dd6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -277,87 +277,73 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) for (slotno = 0; slotno < max_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + ReplicationSlot slot_contents; Datum values[PG_GET_REPLICATION_SLOTS_COLS]; bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; - - ReplicationSlotPersistency persistency; - TransactionId xmin; - TransactionId catalog_xmin; - XLogRecPtr restart_lsn; - XLogRecPtr confirmed_flush_lsn; - pid_t active_pid; - Oid database; - NameData slot_name; - NameData plugin; int i; if (!slot->in_use) continue; + /* Copy slot contents while holding spinlock, then examine at leisure */ SpinLockAcquire(&slot->mutex); - - xmin = slot->data.xmin; - catalog_xmin = slot->data.catalog_xmin; - database = slot->data.database; - restart_lsn = slot->data.restart_lsn; - confirmed_flush_lsn = slot->data.confirmed_flush; - namecpy(&slot_name, &slot->data.name); - namecpy(&plugin, &slot->data.plugin); - active_pid = slot->active_pid; - persistency = slot->data.persistency; - + slot_contents = *slot; SpinLockRelease(&slot->mutex); + memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); i = 0; - values[i++] = NameGetDatum(&slot_name); + values[i++] = NameGetDatum(&slot_contents.data.name); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) nulls[i++] = true; else - values[i++] = NameGetDatum(&plugin); + values[i++] = NameGetDatum(&slot_contents.data.plugin); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) values[i++] = CStringGetTextDatum("physical"); else values[i++] = CStringGetTextDatum("logical"); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) nulls[i++] = true; else - values[i++] = database; + values[i++] = ObjectIdGetDatum(slot_contents.data.database); - values[i++] = BoolGetDatum(persistency == RS_TEMPORARY); - values[i++] = BoolGetDatum(active_pid != 0); + values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY); + values[i++] = BoolGetDatum(slot_contents.active_pid != 0); - if (active_pid != 0) - values[i++] = Int32GetDatum(active_pid); + if (slot_contents.active_pid != 0) + values[i++] = Int32GetDatum(slot_contents.active_pid); else nulls[i++] = true; - if (xmin != InvalidTransactionId) - values[i++] = TransactionIdGetDatum(xmin); + if (slot_contents.data.xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(slot_contents.data.xmin); else nulls[i++] = true; - if (catalog_xmin != InvalidTransactionId) - values[i++] = TransactionIdGetDatum(catalog_xmin); + if (slot_contents.data.catalog_xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin); else nulls[i++] = true; - if (restart_lsn != InvalidXLogRecPtr) - values[i++] = LSNGetDatum(restart_lsn); + if (slot_contents.data.restart_lsn != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(slot_contents.data.restart_lsn); else nulls[i++] = true; - if (confirmed_flush_lsn != InvalidXLogRecPtr) - values[i++] = LSNGetDatum(confirmed_flush_lsn); + if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush); else nulls[i++] = true; + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + LWLockRelease(ReplicationSlotControlLock); tuplestore_donestoring(tupstore); @@ -616,6 +602,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) Name src_name = PG_GETARG_NAME(0); Name dst_name = PG_GETARG_NAME(1); ReplicationSlot *src = NULL; + ReplicationSlot first_slot_contents; + ReplicationSlot second_slot_contents; XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; @@ -655,13 +643,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) { + /* Copy the slot contents while holding spinlock */ SpinLockAcquire(&s->mutex); - src_islogical = SlotIsLogical(s); - src_restart_lsn = s->data.restart_lsn; - temporary = s->data.persistency == RS_TEMPORARY; - plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; + first_slot_contents = *s; SpinLockRelease(&s->mutex); - src = s; break; } @@ -674,6 +659,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", NameStr(*src_name)))); + src_islogical = SlotIsLogical(&first_slot_contents); + src_restart_lsn = first_slot_contents.data.restart_lsn; + temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; + /* Check type of replication slot */ if (src_islogical != logical_slot) ereport(ERROR, @@ -738,18 +728,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Copy data of source slot again */ SpinLockAcquire(&src->mutex); - copy_effective_xmin = src->effective_xmin; - copy_effective_catalog_xmin = src->effective_catalog_xmin; + second_slot_contents = *src; + SpinLockRelease(&src->mutex); - copy_xmin = src->data.xmin; - copy_catalog_xmin = src->data.catalog_xmin; - copy_restart_lsn = src->data.restart_lsn; - copy_confirmed_flush = src->data.confirmed_flush; + copy_effective_xmin = second_slot_contents.effective_xmin; + copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin; + + copy_xmin = second_slot_contents.data.xmin; + copy_catalog_xmin = second_slot_contents.data.catalog_xmin; + copy_restart_lsn = second_slot_contents.data.restart_lsn; + copy_confirmed_flush = second_slot_contents.data.confirmed_flush; /* for existence check */ - copy_name = pstrdup(NameStr(src->data.name)); - copy_islogical = SlotIsLogical(src); - SpinLockRelease(&src->mutex); + copy_name = NameStr(second_slot_contents.data.name); + copy_islogical = SlotIsLogical(&second_slot_contents); /* * Check if the source slot still exists and is valid. We regard it as |