diff options
Diffstat (limited to 'src/backend/postmaster/syslogger.c')
-rw-r--r-- | src/backend/postmaster/syslogger.c | 268 |
1 files changed, 255 insertions, 13 deletions
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index e92cd73031f..0862b81dd7b 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -18,7 +18,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include <sys/stat.h> #include <sys/time.h> +#include "lib/stringinfo.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgtime.h" @@ -54,6 +55,13 @@ #define LBF_MODE _IOLBF #endif +/* + * We read() into a temp buffer twice as big as a chunk, so that any fragment + * left after processing can be moved down to the front and we'll still have + * room to read a full chunk. + */ +#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE) + /* * GUC parameters. Redirect_stderr cannot be changed after postmaster @@ -75,15 +83,28 @@ bool am_syslogger = false; * Private state */ static pg_time_t next_rotation_time; - static bool redirection_done = false; - static bool pipe_eof_seen = false; - static FILE *syslogFile = NULL; - static char *last_file_name = NULL; +/* + * Buffers for saving partial messages from different backends. We don't expect + * that there will be very many outstanding at one time, so 20 seems plenty of + * leeway. If this array gets full we won't lose messages, but we will lose + * the protocol protection against them being partially written or interleaved. + * + * An inactive buffer has pid == 0 and undefined contents of data. + */ +typedef struct +{ + int32 pid; /* PID of source process */ + StringInfoData data; /* accumulated data, as a StringInfo */ +} save_buffer; + +#define CHUNK_SLOTS 20 +static save_buffer saved_chunks[CHUNK_SLOTS]; + /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; @@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false; static pid_t syslogger_forkexec(void); static void syslogger_parseArgs(int argc, char *argv[]); #endif +static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer); +static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer); #ifdef WIN32 static unsigned int __stdcall pipeThread(void *arg); @@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS); NON_EXEC_STATIC void SysLoggerMain(int argc, char *argv[]) { +#ifndef WIN32 + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; +#endif char *currentLogDir; char *currentLogFilename; int currentLogRotationAge; @@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[]) bool time_based_rotation = false; #ifndef WIN32 - char logbuffer[1024]; int bytesRead; int rc; fd_set rfds; @@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[]) else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { bytesRead = piperead(syslogPipe[0], - logbuffer, sizeof(logbuffer)); - + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer); if (bytesRead < 0) { if (errno != EINTR) @@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[]) } else if (bytesRead > 0) { - write_syslogger_file(logbuffer, bytesRead); + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); continue; } else @@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[]) * and all backends are shut down, and we are done. */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); } } #else /* WIN32 */ @@ -612,6 +642,207 @@ syslogger_parseArgs(int argc, char *argv[]) /* -------------------------------- + * pipe protocol handling + * -------------------------------- + */ + +/* + * Process data received through the syslogger pipe. + * + * This routine interprets the log pipe protocol which sends log messages as + * (hopefully atomic) chunks - such chunks are detected and reassembled here. + * + * The protocol has a header that starts with two nul bytes, then has a 16 bit + * length, the pid of the sending process, and a flag to indicate if it is + * the last chunk in a message. Incomplete chunks are saved until we read some + * more, and non-final chunks are accumulated until we get the final chunk. + * + * All of this is to avoid 2 problems: + * . partial messages being written to logfiles (messes rotation), and + * . messages from different backends being interleaved (messages garbled). + * + * Any non-protocol messages are written out directly. These should only come + * from non-PostgreSQL sources, however (e.g. third party libraries writing to + * stderr). + * + * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number + * of bytes present. On exit, any not-yet-eaten data is left-justified in + * logbuffer, and *bytes_in_logbuffer is updated. + */ +static void +process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + char *cursor = logbuffer; + int count = *bytes_in_logbuffer; + + /* While we have enough for a header, process data... */ + while (count >= (int) sizeof(PipeProtoHeader)) + { + PipeProtoHeader p; + int chunklen; + + /* Do we have a valid header? */ + memcpy(&p, cursor, sizeof(PipeProtoHeader)); + if (p.nuls[0] == '\0' && p.nuls[1] == '\0' && + p.len > 0 && p.len <= PIPE_MAX_PAYLOAD && + p.pid != 0 && + (p.is_last == 't' || p.is_last == 'f')) + { + chunklen = PIPE_HEADER_SIZE + p.len; + + /* Fall out of loop if we don't have the whole chunk yet */ + if (count < chunklen) + break; + + if (p.is_last == 'f') + { + /* + * Save a complete non-final chunk in the per-pid buffer + * if possible - if not just write it out. + */ + int free_slot = -1, existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + if (free_slot < 0 && saved_chunks[i].pid == 0) + free_slot = i; + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else if (free_slot >= 0) + { + saved_chunks[free_slot].pid = p.pid; + str = &(saved_chunks[free_slot].data); + initStringInfo(str); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else + { + /* + * If there is no free slot we'll just have to take our + * chances and write out a partial message and hope that + * it's not followed by something from another pid. + */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + else + { + /* + * Final chunk --- add it to anything saved for that pid, and + * either way write the whole thing out. + */ + int existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + write_syslogger_file(str->data, str->len); + saved_chunks[existing_slot].pid = 0; + pfree(str->data); + } + else + { + /* The whole message was one chunk, evidently. */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + + /* Finished processing this chunk */ + cursor += chunklen; + count -= chunklen; + } + else + { + /* Process non-protocol data */ + + /* + * Look for the start of a protocol header. If found, dump data + * up to there and repeat the loop. Otherwise, dump it all and + * fall out of the loop. (Note: we want to dump it all if + * at all possible, so as to avoid dividing non-protocol messages + * across logfiles. We expect that in many scenarios, a + * non-protocol message will arrive all in one read(), and we + * want to respect the read() boundary if possible.) + */ + for (chunklen = 1; chunklen < count; chunklen++) + { + if (cursor[chunklen] == '\0') + break; + } + write_syslogger_file(cursor, chunklen); + cursor += chunklen; + count -= chunklen; + } + } + + /* We don't have a full chunk, so left-align what remains in the buffer */ + if (count > 0 && cursor != logbuffer) + memmove(logbuffer, cursor, count); + *bytes_in_logbuffer = count; +} + +/* + * Force out any buffered data + * + * This is currently used only at syslogger shutdown, but could perhaps be + * useful at other times, so it is careful to leave things in a clean state. + */ +static void +flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + int i; + StringInfo str; + + /* Dump any incomplete protocol messages */ + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid != 0) + { + str = &(saved_chunks[i].data); + write_syslogger_file(str->data, str->len); + saved_chunks[i].pid = 0; + pfree(str->data); + } + } + /* + * Force out any remaining pipe data as-is; we don't bother trying to + * remove any protocol headers that may exist in it. + */ + if (*bytes_in_logbuffer > 0) + write_syslogger_file(logbuffer, *bytes_in_logbuffer); + *bytes_in_logbuffer = 0; +} + + +/* -------------------------------- * logfile routines * -------------------------------- */ @@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count) static unsigned int __stdcall pipeThread(void *arg) { - DWORD bytesRead; - char logbuffer[1024]; + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; for (;;) { - if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer), + DWORD bytesRead; + + if (!ReadFile(syslogPipe[0], + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer, &bytesRead, 0)) { DWORD error = GetLastError(); @@ -672,11 +907,18 @@ pipeThread(void *arg) errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) - write_syslogger_file(logbuffer, bytesRead); + { + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); + } } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); + _endthread(); return 0; } |