diff options
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 418 |
1 files changed, 271 insertions, 147 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6c571aaebcf..bac549a9262 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -16,7 +16,7 @@ * * Copyright (c) 2001, PostgreSQL Global Development Group * - * $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.5 2001/08/04 00:14:43 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.6 2001/08/05 02:06:50 tgl Exp $ * ---------- */ #include "postgres.h" @@ -66,7 +66,7 @@ bool pgstat_collect_blocklevel = false; static int pgStatSock = -1; static int pgStatPipe[2]; static struct sockaddr_in pgStatAddr; -static int pgStatPmPipe[2]; +static int pgStatPmPipe[2] = { -1, -1 }; static int pgStatRunning = 0; static int pgStatPid; @@ -97,6 +97,7 @@ static char pgStat_fname[MAXPGPATH]; */ static void pgstat_main(int real_argc, char *real_argv[]); static void pgstat_recvbuffer(int real_argc, char *real_argv[]); +static void pgstat_die(SIGNAL_ARGS); static int pgstat_add_backend(PgStat_MsgHdr *msg); static void pgstat_sub_backend(int procpid); @@ -166,7 +167,7 @@ pgstat_init(void) return 0; /* - * Create the UDP socket for receiving statistic messages + * Create the UDP socket for sending and receiving statistic messages */ if ((pgStatSock = socket(PF_INET, SOCK_DGRAM, 0)) < 0) { @@ -198,7 +199,24 @@ pgstat_init(void) } /* - * Set the socket to non-blocking IO + * Connect the socket to its own address. This saves a few cycles + * by not having to respecify the target address on every send. + * This also provides a kernel-level check that only packets from + * this same address will be received. + */ + if (connect(pgStatSock, (struct sockaddr *)&pgStatAddr, alen) < 0) + { + perror("PGSTAT: connect(2)"); + close(pgStatSock); + pgStatSock = -1; + return -1; + } + + /* + * Set the socket to non-blocking IO. This ensures that if the + * collector falls behind (despite the buffering process), statistics + * messages will be discarded; backends won't block waiting to send + * messages to the collector. */ if (fcntl(pgStatSock, F_SETFL, O_NONBLOCK) < 0) { @@ -253,9 +271,9 @@ pgstat_start(int real_argc, char *real_argv[]) } /* - * Then fork off the collector. + * Then fork off the collector. Remember its PID for pgstat_ispgstat. */ - switch(pgStatPid = (int)fork()) + switch ((pgStatPid = (int)fork())) { case -1: perror("PGSTAT: fork(2)"); @@ -270,6 +288,9 @@ pgstat_start(int real_argc, char *real_argv[]) return 0; } + /* in postmaster child ... */ + ClosePostmasterPorts(false); + pgstat_main(real_argc, real_argv); exit(0); @@ -297,6 +318,25 @@ pgstat_ispgstat(int pid) /* ---------- + * pgstat_close_sockets() - + * + * Called when postmaster forks a non-pgstat child process, to close off + * file descriptors that should not be held open in child processes. + * ---------- + */ +void +pgstat_close_sockets(void) +{ + if (pgStatPmPipe[0] >= 0) + close(pgStatPmPipe[0]); + pgStatPmPipe[0] = -1; + if (pgStatPmPipe[1] >= 0) + close(pgStatPmPipe[1]); + pgStatPmPipe[1] = -1; +} + + +/* ---------- * pgstat_beterm() - * * Called from postmaster to tell collector a backend terminated. @@ -307,7 +347,7 @@ pgstat_beterm(int pid) { PgStat_MsgBeterm msg; - if (!pgstat_collect_startcollector) + if (!pgstat_collect_startcollector || pgStatSock < 0) return; msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM; @@ -1030,8 +1070,8 @@ pgstat_send(void *msg, int len) ((PgStat_MsgHdr *)msg)->m_size = len; - sendto(pgStatSock, msg, len, 0, - (struct sockaddr *)&pgStatAddr, sizeof(pgStatAddr)); + send(pgStatSock, msg, len, 0); + /* We deliberately ignore any error from send() */ } @@ -1044,7 +1084,8 @@ pgstat_send(void *msg, int len) /* ---------- * pgstat_main() - * - * The statistics collector itself. + * Start up the statistics collector itself. This is the body of the + * postmaster child process. * ---------- */ static void @@ -1052,10 +1093,11 @@ pgstat_main(int real_argc, char *real_argv[]) { PgStat_Msg msg; fd_set rfds; + int readPipe; + int pmPipe = pgStatPmPipe[0]; int maxfd; int nready; - int len; - int dlen; + int len = 0; struct timeval timeout; struct timeval next_statwrite; bool need_statwrite; @@ -1067,9 +1109,11 @@ pgstat_main(int real_argc, char *real_argv[]) * as well. */ close(pgStatPmPipe[1]); + pgStatPmPipe[1] = -1; /* - * Ignore all signals usually bound to some action in the postmaster + * Ignore all signals usually bound to some action in the postmaster, + * except for SIGCHLD --- see pgstat_recvbuffer. */ pqsignal(SIGHUP, SIG_IGN); pqsignal(SIGINT, SIG_IGN); @@ -1079,15 +1123,22 @@ pgstat_main(int real_argc, char *real_argv[]) pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, SIG_IGN); pqsignal(SIGUSR2, SIG_IGN); - pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGCHLD, pgstat_die); pqsignal(SIGTTIN, SIG_DFL); pqsignal(SIGTTOU, SIG_DFL); pqsignal(SIGCONT, SIG_DFL); pqsignal(SIGWINCH, SIG_DFL); /* - * Start a buffering subprocess to read from the socket, so + * Start a buffering process to read from the socket, so * we have a little more time to process incoming messages. + * + * NOTE: the process structure is: postmaster is parent of buffer process + * is parent of collector process. This way, the buffer can detect + * collector failure via SIGCHLD, whereas otherwise it wouldn't notice + * collector failure until it tried to write on the pipe. That would mean + * that after the postmaster started a new collector, we'd have two buffer + * processes competing to read from the UDP socket --- not good. */ if (pipe(pgStatPipe) < 0) { @@ -1095,26 +1146,32 @@ pgstat_main(int real_argc, char *real_argv[]) exit(1); } - switch(fork()) + switch (fork()) { case -1: perror("PGSTAT: fork(2)"); exit(1); case 0: - close(pgStatPipe[0]); - /* child process should die if can't pipe to parent collector */ - pqsignal(SIGPIPE, SIG_DFL); - pgstat_recvbuffer(real_argc, real_argv); - exit(2); - - default: + /* child becomes collector process */ close(pgStatPipe[1]); close(pgStatSock); break; + + default: + /* parent becomes buffer process */ + close(pgStatPipe[0]); + pgstat_recvbuffer(real_argc, real_argv); + exit(0); } /* + * In the child we can have default SIGCHLD handling (in case we + * want to call system() here...) + */ + pqsignal(SIGCHLD, SIG_DFL); + + /* * Identify myself via ps * * WARNING: On some platforms the environment will be moved around to @@ -1164,9 +1221,11 @@ pgstat_main(int real_argc, char *real_argv[]) } memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends); + readPipe = pgStatPipe[0]; + /* * Process incoming messages and handle all the reporting stuff - * until the postmaster waves us good bye. + * until there are no more messages. */ for (;;) { @@ -1196,13 +1255,13 @@ pgstat_main(int real_argc, char *real_argv[]) * Setup the descriptor set for select(2) */ FD_ZERO(&rfds); - FD_SET(pgStatPipe[0], &rfds); - FD_SET(pgStatPmPipe[0], &rfds); + FD_SET(readPipe, &rfds); + FD_SET(pmPipe, &rfds); - if (pgStatPipe[0] > pgStatPmPipe[0]) - maxfd = pgStatPipe[0]; + if (readPipe > pmPipe) + maxfd = readPipe; else - maxfd = pgStatPmPipe[0]; + maxfd = pmPipe; /* * Now wait for something to do. @@ -1211,6 +1270,8 @@ pgstat_main(int real_argc, char *real_argv[]) (need_statwrite) ? &timeout : NULL); if (nready < 0) { + if (errno == EINTR) + continue; perror("PGSTAT: select(2)"); exit(1); } @@ -1230,103 +1291,92 @@ pgstat_main(int real_argc, char *real_argv[]) /* * Check if there is a new statistics message to collect. */ - if (FD_ISSET(pgStatPipe[0], &rfds)) + if (FD_ISSET(readPipe, &rfds)) { /* - * If this is the first message after we wrote the stats - * file the last time, setup the timeout that it'd be - * written. + * We may need to issue multiple read calls in case the + * buffer process didn't write the message in a single write, + * which is possible since it dumps its buffer bytewise. + * In any case, we'd need two reads since we don't know the + * message length initially. */ - if (!need_statwrite) - { - gettimeofday(&next_statwrite, NULL); - next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000); - next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000); - next_statwrite.tv_usec %= 1000000; - need_statwrite = TRUE; - } + int nread = 0; + int targetlen = sizeof(PgStat_MsgHdr); /* initial */ - /* - * Read the header. - */ - len = read(pgStatPipe[0], &msg, sizeof(PgStat_MsgHdr)); - if (len < 0) + while (nread < targetlen) { - perror("PGSTAT: read(2)"); - exit(1); - } - if (len == 0) - { - return; - } - if (len != sizeof(PgStat_MsgHdr)) - { - fprintf(stderr, "PGSTAT: short read(2)"); - exit(1); - } - - /* - * And the body. We need to do it in two steps because - * we don't know the length. - */ - dlen = msg.msg_hdr.m_size - sizeof(PgStat_MsgHdr); - if (dlen > 0) - { - len = read(pgStatPipe[0], - ((char *)&msg) + sizeof(PgStat_MsgHdr), dlen); + len = read(readPipe, + ((char *) &msg) + nread, + targetlen - nread); if (len < 0) { + if (errno == EINTR) + continue; perror("PGSTAT: read(2)"); exit(1); } - if (len == 0) - { - return; - } - if (len != dlen) + if (len == 0) /* EOF on the pipe! */ + break; + nread += len; + if (nread == sizeof(PgStat_MsgHdr)) { - fprintf(stderr, "PGSTAT: short read(2)"); - exit(1); + /* we have the header, compute actual msg length */ + targetlen = msg.msg_hdr.m_size; + if (targetlen < (int) sizeof(PgStat_MsgHdr) || + targetlen > (int) sizeof(msg)) + { + /* + * Bogus message length implies that we got out + * of sync with the buffer process somehow. + * Abort so that we can restart both processes. + */ + fprintf(stderr, "PGSTAT: bogus message length\n"); + exit(1); + } } } + /* + * EOF on the pipe implies that the buffer process exited. + * Fall out of outer loop. + */ + if (len == 0) + break; /* - * Distribute the message to the specific function - * handling it. + * Distribute the message to the specific function handling it. */ - len += sizeof(PgStat_MsgHdr); switch (msg.msg_hdr.m_type) { case PGSTAT_MTYPE_DUMMY: break; case PGSTAT_MTYPE_BESTART: - pgstat_recv_bestart((PgStat_MsgBestart *)&msg, len); + pgstat_recv_bestart((PgStat_MsgBestart *)&msg, nread); break; case PGSTAT_MTYPE_BETERM: - pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, len); + pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, nread); break; case PGSTAT_MTYPE_TABSTAT: - pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, len); + pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, nread); break; case PGSTAT_MTYPE_TABPURGE: - pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, len); + pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, nread); break; case PGSTAT_MTYPE_ACTIVITY: - pgstat_recv_activity((PgStat_MsgActivity *)&msg, len); + pgstat_recv_activity((PgStat_MsgActivity *)&msg, nread); break; case PGSTAT_MTYPE_DROPDB: - pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, len); + pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, nread); break; case PGSTAT_MTYPE_RESETCOUNTER: pgstat_recv_resetcounter((PgStat_MsgResetcounter *)&msg, - len); + nread); break; default: @@ -1334,36 +1384,55 @@ pgstat_main(int real_argc, char *real_argv[]) } /* - * Globally count messages and start over. + * Globally count messages. */ pgStatNumMessages++; - continue; + + /* + * If this is the first message after we wrote the stats + * file the last time, setup the timeout that it'd be + * written. + */ + if (!need_statwrite) + { + gettimeofday(&next_statwrite, NULL); + next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000); + next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000); + next_statwrite.tv_usec %= 1000000; + need_statwrite = TRUE; + } } /* - * If the postmaster pipe is ready for reading this means that - * the kernel must have closed it because of the termination - * of the postmaster (he never really writes to it). Give up - * then, but save the final stats in case we want to reuse - * them at startup in the future. + * Note that we do NOT check for postmaster exit inside the loop; + * only EOF on the buffer pipe causes us to fall out. This ensures + * we don't exit prematurely if there are still a few messages in + * the buffer or pipe at postmaster shutdown. */ - if (FD_ISSET(pgStatPmPipe[0], &rfds)) - { - pgstat_write_statsfile(); - return; - } } + + /* + * Okay, we saw EOF on the buffer pipe, so there are no more messages to + * process. If the buffer process quit because of postmaster shutdown, + * we want to save the final stats to reuse at next startup. But if the + * buffer process failed, it seems best not to (there may even now be a + * new collector firing up, and we don't want it to read a partially- + * rewritten stats file). We can tell whether the postmaster is still + * alive by checking to see if the postmaster pipe is still open. If it + * is read-ready (ie, EOF), the postmaster must have quit. + */ + if (FD_ISSET(pmPipe, &rfds)) + pgstat_write_statsfile(); } /* ---------- * pgstat_recvbuffer() - * - * This is a special receive buffer started by the statistics - * collector itself and running in a separate process. It's only + * This is the body of the separate buffering process. Its only * purpose is to receive messages from the UDP socket as fast as - * possible and forward them over a pipe into the collector - * itself. + * possible and forward them over a pipe into the collector itself. + * If the collector is slow to absorb messages, they are buffered here. * ---------- */ static void @@ -1371,16 +1440,21 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) { fd_set rfds; fd_set wfds; + int writePipe = pgStatPipe[1]; + int pmPipe = pgStatPmPipe[0]; int maxfd; int nready; int len; - PgStat_Msg *msgbuffer = NULL; - int msg_recv = 0; - int msg_send = 0; - int msg_have = 0; + int xfr; + int frm; + PgStat_Msg input_buffer; + char *msgbuffer; + int msg_send = 0; /* next send index in buffer */ + int msg_recv = 0; /* next receive index */ + int msg_have = 0; /* number of bytes stored */ struct sockaddr_in fromaddr; int fromlen; - int overflow = 0; + bool overflow = false; /* * Identify myself via ps @@ -1392,10 +1466,29 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) set_ps_display(""); /* + * We want to die if our child collector process does. There are two ways + * we might notice that it has died: receive SIGCHLD, or get a write + * failure on the pipe leading to the child. We can set SIGPIPE to kill + * us here. Our SIGCHLD handler was already set up before we forked (must + * do it that way, else it's a race condition). + */ + pqsignal(SIGPIPE, SIG_DFL); + PG_SETMASK(&UnBlockSig); + + /* + * Set the write pipe to nonblock mode, so that we cannot block when + * the collector falls behind. + */ + if (fcntl(writePipe, F_SETFL, O_NONBLOCK) < 0) + { + perror("PGSTATBUFF: fcntl(2)"); + exit(1); + } + + /* * Allocate the message buffer */ - msgbuffer = (PgStat_Msg *)malloc(sizeof(PgStat_Msg) * - PGSTAT_RECVBUFFERSZ); + msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ); if (msgbuffer == NULL) { perror("PGSTATBUFF: malloc()"); @@ -1415,22 +1508,21 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) * As long as we have buffer space we add the socket * to the read descriptor set. */ - if (msg_have < PGSTAT_RECVBUFFERSZ) + if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg))) { FD_SET(pgStatSock, &rfds); maxfd = pgStatSock; - overflow = 0; + overflow = false; } else { - if (overflow == 0) + if (!overflow) { - fprintf(stderr, "PGSTAT: Warning - receive buffer full\n"); - overflow = 1; + fprintf(stderr, "PGSTATBUFF: Warning - receive buffer full\n"); + overflow = true; } } - /* * If we have messages to write out, we add the pipe * to the write descriptor set. Otherwise, we check if @@ -1438,24 +1530,25 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) */ if (msg_have > 0) { - FD_SET(pgStatPipe[1], &wfds); - if (pgStatPipe[1] > maxfd) - maxfd = pgStatPipe[1]; + FD_SET(writePipe, &wfds); + if (writePipe > maxfd) + maxfd = writePipe; } else { - FD_SET(pgStatPmPipe[0], &rfds); - if (pgStatPmPipe[0] > maxfd) - maxfd = pgStatPmPipe[0]; + FD_SET(pmPipe, &rfds); + if (pmPipe > maxfd) + maxfd = pmPipe; } - /* * Wait for some work to do. */ nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL); if (nready < 0) { + if (errno == EINTR) + continue; perror("PGSTATBUFF: select(2)"); exit(1); } @@ -1468,8 +1561,8 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) { fromlen = sizeof(fromaddr); len = recvfrom(pgStatSock, - &msgbuffer[msg_recv], sizeof(PgStat_Msg), 0, - (struct sockaddr *)&fromaddr, &fromlen); + &input_buffer, sizeof(PgStat_Msg), 0, + (struct sockaddr *) &fromaddr, &fromlen); if (len < 0) { perror("PGSTATBUFF: recvfrom(2)"); @@ -1485,13 +1578,15 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) /* * The received length must match the length in the header */ - if (msgbuffer[msg_recv].msg_hdr.m_size != len) + if (input_buffer.msg_hdr.m_size != len) continue; /* * The source address of the packet must be our own socket. * This ensures that only real hackers or our own backends - * tell us something. + * tell us something. (This should be redundant with a + * kernel-level check due to having used connect(), but + * let's do it anyway.) */ if (fromaddr.sin_addr.s_addr != pgStatAddr.sin_addr.s_addr) continue; @@ -1499,44 +1594,67 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) continue; /* - * O.K. - we accept this message. + * O.K. - we accept this message. Copy it to the circular + * msgbuffer. */ - msg_have++; - msg_recv++; - if (msg_recv == PGSTAT_RECVBUFFERSZ) - msg_recv = 0; + frm = 0; + while (len > 0) + { + xfr = PGSTAT_RECVBUFFERSZ - msg_recv; + if (xfr > len) + xfr = len; + Assert(xfr > 0); + memcpy(msgbuffer + msg_recv, + ((char *) &input_buffer) + frm, + xfr); + msg_recv += xfr; + if (msg_recv == PGSTAT_RECVBUFFERSZ) + msg_recv = 0; + msg_have += xfr; + frm += xfr; + len -= xfr; + } } /* - * If the collector is ready to receive, write a buffered - * message into his pipe. + * If the collector is ready to receive, write some data into his + * pipe. We may or may not be able to write all that we have. + * + * NOTE: if what we have is less than PIPE_BUF bytes but more than + * the space available in the pipe buffer, most kernels will refuse + * to write any of it, and will return EAGAIN. This means we will + * busy-loop until the situation changes (either because the collector + * caught up, or because more data arrives so that we have more than + * PIPE_BUF bytes buffered). This is not good, but is there any way + * around it? We have no way to tell when the collector has + * caught up... */ - if (FD_ISSET(pgStatPipe[1], &wfds)) + if (FD_ISSET(writePipe, &wfds)) { - len = write(pgStatPipe[1], &msgbuffer[msg_send], - msgbuffer[msg_send].msg_hdr.m_size); + xfr = PGSTAT_RECVBUFFERSZ - msg_send; + if (xfr > msg_have) + xfr = msg_have; + Assert(xfr > 0); + len = write(writePipe, msgbuffer + msg_send, xfr); if (len < 0) { + if (errno == EINTR || errno == EAGAIN) + continue; /* not enough space in pipe */ perror("PGSTATBUFF: write(2)"); exit(1); } - if (len != msgbuffer[msg_send].msg_hdr.m_size) - { - fprintf(stderr, "PGSTATBUFF: short write(2)"); - exit(1); - } - - msg_have--; - msg_send++; + /* NB: len < xfr is okay */ + msg_send += len; if (msg_send == PGSTAT_RECVBUFFERSZ) msg_send = 0; + msg_have -= len; } /* * Make sure we forwarded all messages before we check for * Postmaster termination. */ - if (FD_ISSET(pgStatSock, &rfds) || FD_ISSET(pgStatPipe[1], &wfds)) + if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds)) continue; /* @@ -1544,16 +1662,22 @@ pgstat_recvbuffer(int real_argc, char *real_argv[]) * the kernel must have closed it on exit() (the postmaster * never really writes to it). So we've done our job. */ - if (FD_ISSET(pgStatPmPipe[0], &rfds)) + if (FD_ISSET(pmPipe, &rfds)) exit(0); } } +static void +pgstat_die(SIGNAL_ARGS) +{ + exit(1); +} + /* ---------- * pgstat_add_backend() - * - * Support function to keep our backen list up to date. + * Support function to keep our backend list up to date. * ---------- */ static int @@ -2414,7 +2538,7 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len) /* * If the database is marked for destroy, this is a delayed - * UDP packet and not worth beeing counted. + * UDP packet and not worth being counted. */ if (dbentry->destroy > 0) return; |