aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/slotfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r--src/backend/replication/slotfuncs.c193
1 files changed, 193 insertions, 0 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
new file mode 100644
index 00000000000..98a860e5288
--- /dev/null
+++ b/src/backend/replication/slotfuncs.c
@@ -0,0 +1,193 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotfuncs.c
+ * Support functions for replication slots
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slotfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "access/htup_details.h"
+#include "utils/builtins.h"
+#include "replication/slot.h"
+
+Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
+Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
+
+static void
+check_permissions(void)
+{
+ if (!superuser() && !has_rolreplication(GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser or replication role to use replication slots"))));
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Datum values[2];
+ bool nulls[2];
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum result;
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /* acquire replication slot, this will check for conflicting names*/
+ ReplicationSlotCreate(NameStr(*name), false);
+
+ values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
+
+ nulls[0] = false;
+ nulls[1] = true;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+/*
+ * SQL function for dropping a replication slot.
+ */
+Datum
+pg_drop_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ ReplicationSlotDrop(NameStr(*name));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * pg_get_replication_slots - SQL SRF showing active replication slots.
+ */
+Datum
+pg_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int slotno;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not " \
+ "allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /*
+ * We don't require any special permission to see this function's data
+ * because nothing should be sensitive. The most critical being the slot
+ * name, which shouldn't contain anything particularly sensitive.
+ */
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ for (slotno = 0; slotno < max_replication_slots; slotno++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+ Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+
+ TransactionId xmin;
+ XLogRecPtr restart_lsn;
+ bool active;
+ Oid database;
+ const char *slot_name;
+
+ char restart_lsn_s[MAXFNAMELEN];
+ int i;
+
+ SpinLockAcquire(&slot->mutex);
+ if (!slot->in_use)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ else
+ {
+ xmin = slot->data.xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ slot_name = pstrdup(NameStr(slot->data.name));
+
+ active = slot->active;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+ (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
+
+ i = 0;
+ values[i++] = CStringGetTextDatum(slot_name);
+ if (database == InvalidOid)
+ values[i++] = CStringGetTextDatum("physical");
+ else
+ values[i++] = CStringGetTextDatum("logical");
+ values[i++] = database;
+ values[i++] = BoolGetDatum(active);
+ if (xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(xmin);
+ else
+ nulls[i++] = true;
+ if (restart_lsn != InvalidTransactionId)
+ values[i++] = CStringGetTextDatum(restart_lsn_s);
+ else
+ nulls[i++] = true;
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}