diff options
author | Andrew Dunstan <andrew@dunslane.net> | 2007-06-14 01:48:51 +0000 |
---|---|---|
committer | Andrew Dunstan <andrew@dunslane.net> | 2007-06-14 01:48:51 +0000 |
commit | bd2cb9aaa55d2c6760e2722557a8a87b0bd40945 (patch) | |
tree | 2301545dd731540c607b94277e4e52377f82f821 /src/backend/postmaster/syslogger.c | |
parent | 320f8205850a2e42b0c6a1e8a7649a7df72b547d (diff) | |
download | postgresql-bd2cb9aaa55d2c6760e2722557a8a87b0bd40945.tar.gz postgresql-bd2cb9aaa55d2c6760e2722557a8a87b0bd40945.zip |
Implement a chunking protocol for writes to the syslogger pipe, with messages
reassembled in the syslogger before writing to the log file. This prevents
partial messages from being written, which mucks up log rotation, and
messages from different backends being interleaved, which causes garbled
logs. Backport as far as 8.0, where the syslogger was introduced.
Tom Lane and Andrew Dunstan
Diffstat (limited to 'src/backend/postmaster/syslogger.c')
-rw-r--r-- | src/backend/postmaster/syslogger.c | 268 |
1 files changed, 255 insertions, 13 deletions
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index e92cd73031f..0862b81dd7b 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -18,7 +18,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include <sys/stat.h> #include <sys/time.h> +#include "lib/stringinfo.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgtime.h" @@ -54,6 +55,13 @@ #define LBF_MODE _IOLBF #endif +/* + * We read() into a temp buffer twice as big as a chunk, so that any fragment + * left after processing can be moved down to the front and we'll still have + * room to read a full chunk. + */ +#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE) + /* * GUC parameters. Redirect_stderr cannot be changed after postmaster @@ -75,15 +83,28 @@ bool am_syslogger = false; * Private state */ static pg_time_t next_rotation_time; - static bool redirection_done = false; - static bool pipe_eof_seen = false; - static FILE *syslogFile = NULL; - static char *last_file_name = NULL; +/* + * Buffers for saving partial messages from different backends. We don't expect + * that there will be very many outstanding at one time, so 20 seems plenty of + * leeway. If this array gets full we won't lose messages, but we will lose + * the protocol protection against them being partially written or interleaved. + * + * An inactive buffer has pid == 0 and undefined contents of data. + */ +typedef struct +{ + int32 pid; /* PID of source process */ + StringInfoData data; /* accumulated data, as a StringInfo */ +} save_buffer; + +#define CHUNK_SLOTS 20 +static save_buffer saved_chunks[CHUNK_SLOTS]; + /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 int syslogPipe[2] = {-1, -1}; @@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false; static pid_t syslogger_forkexec(void); static void syslogger_parseArgs(int argc, char *argv[]); #endif +static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer); +static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer); #ifdef WIN32 static unsigned int __stdcall pipeThread(void *arg); @@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS); NON_EXEC_STATIC void SysLoggerMain(int argc, char *argv[]) { +#ifndef WIN32 + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; +#endif char *currentLogDir; char *currentLogFilename; int currentLogRotationAge; @@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[]) bool time_based_rotation = false; #ifndef WIN32 - char logbuffer[1024]; int bytesRead; int rc; fd_set rfds; @@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[]) else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds)) { bytesRead = piperead(syslogPipe[0], - logbuffer, sizeof(logbuffer)); - + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer); if (bytesRead < 0) { if (errno != EINTR) @@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[]) } else if (bytesRead > 0) { - write_syslogger_file(logbuffer, bytesRead); + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); continue; } else @@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[]) * and all backends are shut down, and we are done. */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); } } #else /* WIN32 */ @@ -612,6 +642,207 @@ syslogger_parseArgs(int argc, char *argv[]) /* -------------------------------- + * pipe protocol handling + * -------------------------------- + */ + +/* + * Process data received through the syslogger pipe. + * + * This routine interprets the log pipe protocol which sends log messages as + * (hopefully atomic) chunks - such chunks are detected and reassembled here. + * + * The protocol has a header that starts with two nul bytes, then has a 16 bit + * length, the pid of the sending process, and a flag to indicate if it is + * the last chunk in a message. Incomplete chunks are saved until we read some + * more, and non-final chunks are accumulated until we get the final chunk. + * + * All of this is to avoid 2 problems: + * . partial messages being written to logfiles (messes rotation), and + * . messages from different backends being interleaved (messages garbled). + * + * Any non-protocol messages are written out directly. These should only come + * from non-PostgreSQL sources, however (e.g. third party libraries writing to + * stderr). + * + * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number + * of bytes present. On exit, any not-yet-eaten data is left-justified in + * logbuffer, and *bytes_in_logbuffer is updated. + */ +static void +process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + char *cursor = logbuffer; + int count = *bytes_in_logbuffer; + + /* While we have enough for a header, process data... */ + while (count >= (int) sizeof(PipeProtoHeader)) + { + PipeProtoHeader p; + int chunklen; + + /* Do we have a valid header? */ + memcpy(&p, cursor, sizeof(PipeProtoHeader)); + if (p.nuls[0] == '\0' && p.nuls[1] == '\0' && + p.len > 0 && p.len <= PIPE_MAX_PAYLOAD && + p.pid != 0 && + (p.is_last == 't' || p.is_last == 'f')) + { + chunklen = PIPE_HEADER_SIZE + p.len; + + /* Fall out of loop if we don't have the whole chunk yet */ + if (count < chunklen) + break; + + if (p.is_last == 'f') + { + /* + * Save a complete non-final chunk in the per-pid buffer + * if possible - if not just write it out. + */ + int free_slot = -1, existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + if (free_slot < 0 && saved_chunks[i].pid == 0) + free_slot = i; + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else if (free_slot >= 0) + { + saved_chunks[free_slot].pid = p.pid; + str = &(saved_chunks[free_slot].data); + initStringInfo(str); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + } + else + { + /* + * If there is no free slot we'll just have to take our + * chances and write out a partial message and hope that + * it's not followed by something from another pid. + */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + else + { + /* + * Final chunk --- add it to anything saved for that pid, and + * either way write the whole thing out. + */ + int existing_slot = -1; + int i; + StringInfo str; + + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid == p.pid) + { + existing_slot = i; + break; + } + } + if (existing_slot >= 0) + { + str = &(saved_chunks[existing_slot].data); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); + write_syslogger_file(str->data, str->len); + saved_chunks[existing_slot].pid = 0; + pfree(str->data); + } + else + { + /* The whole message was one chunk, evidently. */ + write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len); + } + } + + /* Finished processing this chunk */ + cursor += chunklen; + count -= chunklen; + } + else + { + /* Process non-protocol data */ + + /* + * Look for the start of a protocol header. If found, dump data + * up to there and repeat the loop. Otherwise, dump it all and + * fall out of the loop. (Note: we want to dump it all if + * at all possible, so as to avoid dividing non-protocol messages + * across logfiles. We expect that in many scenarios, a + * non-protocol message will arrive all in one read(), and we + * want to respect the read() boundary if possible.) + */ + for (chunklen = 1; chunklen < count; chunklen++) + { + if (cursor[chunklen] == '\0') + break; + } + write_syslogger_file(cursor, chunklen); + cursor += chunklen; + count -= chunklen; + } + } + + /* We don't have a full chunk, so left-align what remains in the buffer */ + if (count > 0 && cursor != logbuffer) + memmove(logbuffer, cursor, count); + *bytes_in_logbuffer = count; +} + +/* + * Force out any buffered data + * + * This is currently used only at syslogger shutdown, but could perhaps be + * useful at other times, so it is careful to leave things in a clean state. + */ +static void +flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer) +{ + int i; + StringInfo str; + + /* Dump any incomplete protocol messages */ + for (i = 0; i < CHUNK_SLOTS; i++) + { + if (saved_chunks[i].pid != 0) + { + str = &(saved_chunks[i].data); + write_syslogger_file(str->data, str->len); + saved_chunks[i].pid = 0; + pfree(str->data); + } + } + /* + * Force out any remaining pipe data as-is; we don't bother trying to + * remove any protocol headers that may exist in it. + */ + if (*bytes_in_logbuffer > 0) + write_syslogger_file(logbuffer, *bytes_in_logbuffer); + *bytes_in_logbuffer = 0; +} + + +/* -------------------------------- * logfile routines * -------------------------------- */ @@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count) static unsigned int __stdcall pipeThread(void *arg) { - DWORD bytesRead; - char logbuffer[1024]; + char logbuffer[READ_BUF_SIZE]; + int bytes_in_logbuffer = 0; for (;;) { - if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer), + DWORD bytesRead; + + if (!ReadFile(syslogPipe[0], + logbuffer + bytes_in_logbuffer, + sizeof(logbuffer) - bytes_in_logbuffer, &bytesRead, 0)) { DWORD error = GetLastError(); @@ -672,11 +907,18 @@ pipeThread(void *arg) errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) - write_syslogger_file(logbuffer, bytesRead); + { + bytes_in_logbuffer += bytesRead; + process_pipe_input(logbuffer, &bytes_in_logbuffer); + } } /* We exit the above loop only upon detecting pipe EOF */ pipe_eof_seen = true; + + /* if there's any data left then force it out now */ + flush_pipe_input(logbuffer, &bytes_in_logbuffer); + _endthread(); return 0; } |