diff options
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r-- | src/backend/replication/slotfuncs.c | 98 |
1 files changed, 89 insertions, 9 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 5acd2bae19c..c9416b03eee 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -15,13 +15,13 @@ #include "funcapi.h" #include "miscadmin.h" + #include "access/htup_details.h" +#include "replication/slot.h" +#include "replication/logical.h" +#include "replication/logicalfuncs.h" #include "utils/builtins.h" #include "utils/pg_lsn.h" -#include "replication/slot.h" - -Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); -Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); static void check_permissions(void) @@ -54,7 +54,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) elog(ERROR, "return type must be a row type"); /* acquire replication slot, this will check for conflicting names*/ - ReplicationSlotCreate(NameStr(*name), false); + ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); @@ -69,6 +69,68 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + + LogicalDecodingContext *ctx = NULL; + + TupleDesc tupdesc; + HeapTuple tuple; + Datum result; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + check_permissions(); + + CheckLogicalDecodingRequirements(); + + Assert(!MyReplicationSlot); + + /* + * Acquire a logical decoding slot, this will check for conflicting + * names. + */ + ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); + + /* + * Create logical decoding context, to build the initial snapshot. + */ + ctx = CreateInitDecodingContext( + NameStr(*plugin), NIL, + logical_read_local_xlog_page, NULL, NULL); + + /* build initial snapshot, might take a while */ + DecodingContextFindStartpoint(ctx); + + values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); + values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); + + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + + memset(nulls, 0, sizeof(nulls)); + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + /* ok, slot is now fully created, mark it as persistent */ + ReplicationSlotPersist(); + ReplicationSlotRelease(); + + PG_RETURN_DATUM(result); +} + + /* * SQL function for dropping a replication slot. */ @@ -92,7 +154,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6 +#define PG_GET_REPLICATION_SLOTS_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -134,15 +196,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) for (slotno = 0; slotno < max_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; - Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS]; - bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS]; + Datum values[PG_GET_REPLICATION_SLOTS_COLS]; + bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; TransactionId xmin; + TransactionId catalog_xmin; XLogRecPtr restart_lsn; bool active; Oid database; NameData slot_name; - + NameData plugin; int i; SpinLockAcquire(&slot->mutex); @@ -154,9 +217,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else { xmin = slot->data.xmin; + catalog_xmin = slot->data.catalog_xmin; database = slot->data.database; restart_lsn = slot->data.restart_lsn; namecpy(&slot_name, &slot->data.name); + namecpy(&plugin, &slot->data.plugin); active = slot->active; } @@ -166,19 +231,34 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) i = 0; values[i++] = NameGetDatum(&slot_name); + + if (database == InvalidOid) + nulls[i++] = true; + else + values[i++] = NameGetDatum(&plugin); + if (database == InvalidOid) values[i++] = CStringGetTextDatum("physical"); else values[i++] = CStringGetTextDatum("logical"); + if (database == InvalidOid) nulls[i++] = true; else values[i++] = database; + values[i++] = BoolGetDatum(active); + if (xmin != InvalidTransactionId) values[i++] = TransactionIdGetDatum(xmin); else nulls[i++] = true; + + if (catalog_xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(catalog_xmin); + else + nulls[i++] = true; + if (restart_lsn != InvalidTransactionId) values[i++] = LSNGetDatum(restart_lsn); else |