aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c74
1 files changed, 74 insertions, 0 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 269d200707e..f2a3ee20732 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -37,6 +37,7 @@
#include <signal.h>
#include <unistd.h>
+#include "funcapi.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "libpq/libpq.h"
@@ -49,6 +50,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -943,6 +945,78 @@ WalSndWakeup(void)
}
/*
+ * Returns activity of walsenders, including pids and xlog locations sent to
+ * standby servers.
+ */
+Datum
+pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_WAL_SENDERS_COLS 2
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int i;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not " \
+ "allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ char sent_location[MAXFNAMELEN];
+ XLogRecPtr sentPtr;
+ Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
+ bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ sentPtr = walsnd->sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+
+ snprintf(sent_location, sizeof(sent_location), "%X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+
+ memset(nulls, 0, sizeof(nulls));
+ values[0] = Int32GetDatum(walsnd->pid);
+ values[1] = CStringGetTextDatum(sent_location);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ /* clean up and return the tuplestore */
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
+
+/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
* future for synchronous replication.