diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 269d200707e..f2a3ee20732 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -37,6 +37,7 @@ #include <signal.h> #include <unistd.h> +#include "funcapi.h" #include "access/xlog_internal.h" #include "catalog/pg_type.h" #include "libpq/libpq.h" @@ -49,6 +50,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -943,6 +945,78 @@ WalSndWakeup(void) } /* + * Returns activity of walsenders, including pids and xlog locations sent to + * standby servers. + */ +Datum +pg_stat_get_wal_senders(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAL_SENDERS_COLS 2 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + char sent_location[MAXFNAMELEN]; + XLogRecPtr sentPtr; + Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; + bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + sentPtr = walsnd->sentPtr; + SpinLockRelease(&walsnd->mutex); + + snprintf(sent_location, sizeof(sent_location), "%X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + + memset(nulls, 0, sizeof(nulls)); + values[0] = Int32GetDatum(walsnd->pid); + values[1] = CStringGetTextDatum(sent_location); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the * future for synchronous replication. |