aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/slotfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r--src/backend/replication/slotfuncs.c98
1 files changed, 89 insertions, 9 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 5acd2bae19c..c9416b03eee 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -15,13 +15,13 @@
#include "funcapi.h"
#include "miscadmin.h"
+
#include "access/htup_details.h"
+#include "replication/slot.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
-#include "replication/slot.h"
-
-Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
-Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
static void
check_permissions(void)
@@ -54,7 +54,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
elog(ERROR, "return type must be a row type");
/* acquire replication slot, this will check for conflicting names*/
- ReplicationSlotCreate(NameStr(*name), false);
+ ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
@@ -69,6 +69,68 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Name plugin = PG_GETARG_NAME(1);
+
+ LogicalDecodingContext *ctx = NULL;
+
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum result;
+ Datum values[2];
+ bool nulls[2];
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ Assert(!MyReplicationSlot);
+
+ /*
+ * Acquire a logical decoding slot, this will check for conflicting
+ * names.
+ */
+ ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
+
+ /*
+ * Create logical decoding context, to build the initial snapshot.
+ */
+ ctx = CreateInitDecodingContext(
+ NameStr(*plugin), NIL,
+ logical_read_local_xlog_page, NULL, NULL);
+
+ /* build initial snapshot, might take a while */
+ DecodingContextFindStartpoint(ctx);
+
+ values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
+ values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+
+ /* don't need the decoding context anymore */
+ FreeDecodingContext(ctx);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ /* ok, slot is now fully created, mark it as persistent */
+ ReplicationSlotPersist();
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+
/*
* SQL function for dropping a replication slot.
*/
@@ -92,7 +154,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
+#define PG_GET_REPLICATION_SLOTS_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -134,15 +196,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
- Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
- bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+ Datum values[PG_GET_REPLICATION_SLOTS_COLS];
+ bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
TransactionId xmin;
+ TransactionId catalog_xmin;
XLogRecPtr restart_lsn;
bool active;
Oid database;
NameData slot_name;
-
+ NameData plugin;
int i;
SpinLockAcquire(&slot->mutex);
@@ -154,9 +217,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
{
xmin = slot->data.xmin;
+ catalog_xmin = slot->data.catalog_xmin;
database = slot->data.database;
restart_lsn = slot->data.restart_lsn;
namecpy(&slot_name, &slot->data.name);
+ namecpy(&plugin, &slot->data.plugin);
active = slot->active;
}
@@ -166,19 +231,34 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
i = 0;
values[i++] = NameGetDatum(&slot_name);
+
+ if (database == InvalidOid)
+ nulls[i++] = true;
+ else
+ values[i++] = NameGetDatum(&plugin);
+
if (database == InvalidOid)
values[i++] = CStringGetTextDatum("physical");
else
values[i++] = CStringGetTextDatum("logical");
+
if (database == InvalidOid)
nulls[i++] = true;
else
values[i++] = database;
+
values[i++] = BoolGetDatum(active);
+
if (xmin != InvalidTransactionId)
values[i++] = TransactionIdGetDatum(xmin);
else
nulls[i++] = true;
+
+ if (catalog_xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(catalog_xmin);
+ else
+ nulls[i++] = true;
+
if (restart_lsn != InvalidTransactionId)
values[i++] = LSNGetDatum(restart_lsn);
else