aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/copy.c6
-rw-r--r--src/backend/libpq/pqcomm.c178
-rw-r--r--src/backend/libpq/pqcomprim.c77
-rw-r--r--src/backend/utils/error/elog.c11
-rw-r--r--src/include/libpq/libpq.h23
5 files changed, 195 insertions, 100 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5a8b3fc9650..3126e82dfc3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -6,7 +6,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.67 1999/01/17 06:18:15 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.68 1999/01/23 22:27:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe,
}
else if (!from && !binary)
{
- CopySendData("\\.\n",3,fp);
- if (IsUnderPostmaster)
- pq_flush();
+ CopySendData("\\.\n",3,fp);
}
}
}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 8f9c14fee9f..386643fe95c 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -5,39 +5,40 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: pqcomm.c,v 1.63 1999/01/17 06:18:26 momjian Exp $
+ * $Id: pqcomm.c,v 1.64 1999/01/23 22:27:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
- * pq_init - initialize libpq
+ * pq_init - initialize libpq
* pq_getport - return the PGPORT setting
* pq_close - close input / output connections
* pq_flush - flush pending output
+ * pq_recvbuf - load some bytes into the input buffer
* pq_getstr - get a null terminated string from connection
- * pq_getchar - get 1 character from connection
- * pq_peekchar - peek at first character in connection
+ * pq_getchar - get 1 character from connection
+ * pq_peekchar - peek at next character from connection
* pq_getnchar - get n characters from connection, and null-terminate
* pq_getint - get an integer from connection
- * pq_putchar - send 1 character to connection
+ * pq_putchar - send 1 character to connection
* pq_putstr - send a null terminated string to connection
* pq_putnchar - send n characters to connection
* pq_putint - send an integer to connection
- * pq_putncharlen - send n characters to connection
+ * pq_putncharlen - send n characters to connection
* (also send an int header indicating
* the length)
* pq_getinaddr - initialize address from host and port number
* pq_getinserv - initialize address from host and service name
*
- * StreamDoUnlink - Shutdown UNIX socket connectioin
- * StreamServerPort - Open sock stream
- * StreamConnection - Create new connection with client
- * StreamClose - Close a client/backend connection
+ * StreamDoUnlink - Shutdown UNIX socket connection
+ * StreamServerPort - Open socket stream
+ * StreamConnection - Create new connection with client
+ * StreamClose - Close a client/backend connection
*
* NOTES
- * Frontend is now completey in interfaces/libpq, and no
- * functions from this file is used.
+ * Frontend is now completely in interfaces/libpq, and no
+ * functions from this file are used there.
*
*/
#include "postgres.h"
@@ -79,6 +80,14 @@
extern FILE * debug_port; /* in util.c */
+/*
+ * Buffers
+ */
+char PqSendBuffer[PQ_BUFFER_SIZE];
+char PqRecvBuffer[PQ_BUFFER_SIZE];
+int PqSendPointer,PqRecvPointer,PqRecvLength;
+
+
/* --------------------------------
* pq_init - open portal file descriptors
* --------------------------------
@@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */
void
pq_init(int fd)
{
+ PqSendPointer = PqRecvPointer = PqRecvLength = 0;
PQnotifies_init();
if (getenv("LIBPQ_DEBUG"))
debug_port = stderr;
@@ -94,40 +104,40 @@ pq_init(int fd)
/* -------------------------
* pq_getchar()
*
- * get a character from the input file,
- *
+ * get a character from the input file, or EOF if trouble
+ * --------------------------------
*/
int
pq_getchar(void)
{
- char c;
-
- while (recv(MyProcPort->sock, &c, 1, 0) != 1) {
- if (errno != EINTR)
- return EOF; /* Not interrupted, so something went wrong */
+ while (PqRecvPointer >= PqRecvLength)
+ {
+ if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ return EOF; /* Failed to recv data */
}
-
- return c;
+ return PqRecvBuffer[PqRecvPointer++];
}
-/*
+/* -------------------------
+ * pq_peekchar()
+ *
+ * get a character from the connection, but leave it in the buffer
+ * to be read again
* --------------------------------
- * pq_peekchar - get 1 character from connection, but leave it in the stream
*/
-int
-pq_peekchar(void) {
- char c;
- while (recv(MyProcPort->sock, &c, 1, MSG_PEEK) != 1) {
- if (errno != EINTR)
- return EOF; /* Not interrupted, so something went wrong */
+int
+pq_peekchar(void)
+{
+ while (PqRecvPointer >= PqRecvLength)
+ {
+ if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ return EOF; /* Failed to recv data */
}
-
- return c;
+ /* Note we don't bump the pointer... */
+ return PqRecvBuffer[PqRecvPointer];
}
-
-
/* --------------------------------
* pq_getport - return the PGPORT setting
@@ -150,18 +160,91 @@ pq_getport()
void
pq_close()
{
- close(MyProcPort->sock);
+ close(MyProcPort->sock);
PQnotifies_init();
}
/* --------------------------------
* pq_flush - flush pending output
+ *
+ * returns 0 if OK, EOF if trouble
* --------------------------------
*/
-void
+int
pq_flush()
{
- /* Not supported/required? */
+ char *bufptr = PqSendBuffer;
+ char *bufend = PqSendBuffer + PqSendPointer;
+
+ while (bufptr < bufend)
+ {
+ int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
+ if (r <= 0)
+ {
+ if (errno == EINTR)
+ continue; /* Ok if we were interrupted */
+ /* We would like to use elog() here, but cannot because elog
+ * tries to write to the client, which would cause a recursive
+ * flush attempt! So just write it out to the postmaster log.
+ */
+ fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno);
+ /* We drop the buffered data anyway so that processing
+ * can continue, even though we'll probably quit soon.
+ */
+ PqSendPointer = 0;
+ return EOF;
+ }
+ bufptr += r;
+ }
+ PqSendPointer = 0;
+ return 0;
+}
+
+/* --------------------------------
+ * pq_recvbuf - load some bytes into the input buffer
+ *
+ * returns 0 if OK, EOF if trouble
+ * --------------------------------
+ */
+
+int
+pq_recvbuf()
+{
+ if (PqRecvPointer > 0)
+ {
+ if (PqRecvLength > PqRecvPointer)
+ {
+ /* still some unread data, left-justify it in the buffer */
+ memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer,
+ PqRecvLength-PqRecvPointer);
+ PqRecvLength -= PqRecvPointer;
+ PqRecvPointer = 0;
+ }
+ else
+ PqRecvLength = PqRecvPointer = 0;
+ }
+
+ /* Can fill buffer from PqRecvLength and upwards */
+ for (;;)
+ {
+ int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
+ PQ_BUFFER_SIZE - PqRecvLength, 0);
+ if (r <= 0)
+ {
+ if (errno == EINTR)
+ continue; /* Ok if interrupted */
+ /* We would like to use elog() here, but dare not because elog
+ * tries to write to the client, which will cause problems
+ * if we have a hard communications failure ...
+ * So just write the message to the postmaster log.
+ */
+ fprintf(stderr, "pq_recvbuf: recv() failed, errno %d\n", errno);
+ return EOF;
+ }
+ /* r contains number of bytes read, so just incr length */
+ PqRecvLength += r;
+ return 0;
+ }
}
/* --------------------------------
@@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen)
int
pq_getnchar(char *s, int off, int maxlen)
{
- int r = pqGetNBytes(s + off, maxlen);
+ int r = pqGetNBytes(s + off, maxlen);
s[off+maxlen] = '\0';
return r;
}
@@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port)
if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY,
&on, sizeof(on)) < 0)
{
- elog(ERROR, "postmaster: setsockopt failed");
+ elog(ERROR, "postmaster: setsockopt failed: %m");
return STATUS_ERROR;
}
}
@@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n)
*/
int pq_putchar(char c)
{
- char isDone = 0;
-
- do {
- if (send(MyProcPort->sock, &c, 1, 0) != 1) {
- if (errno != EINTR)
- return EOF; /* Anything other than interrupt is error! */
- }
- else
- isDone = 1; /* Done if we sent one char */
- } while (!isDone);
- return c;
+ if (PqSendPointer >= PQ_BUFFER_SIZE)
+ if (pq_flush()) /* If buffer is full, then flush it out */
+ return EOF;
+ PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */
+ return c;
}
-
-
-
diff --git a/src/backend/libpq/pqcomprim.c b/src/backend/libpq/pqcomprim.c
index 23ecfd4e19f..e48a1c16888 100644
--- a/src/backend/libpq/pqcomprim.c
+++ b/src/backend/libpq/pqcomprim.c
@@ -98,7 +98,7 @@ pqPutLong(int integer)
n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer));
#endif
- return pqPutNBytes((char *)&n,4);
+ return pqPutNBytes((char *)&n, 4);
}
/* --------------------------------------------------------------------- */
@@ -107,7 +107,7 @@ pqGetShort(int *result)
{
uint16 n;
- if (pqGetNBytes((char *)&n,2) != 0)
+ if (pqGetNBytes((char *)&n, 2) != 0)
return EOF;
#ifdef FRONTEND
@@ -138,28 +138,29 @@ pqGetLong(int *result)
}
/* --------------------------------------------------------------------- */
-/* pqGetNBytes: Read a chunk of exactly len bytes in buffer s (which must be 1
- byte longer) and terminate it with a '\0'.
- Return 0 if ok.
-*/
+/* pqGetNBytes: Read a chunk of exactly len bytes into buffer s.
+ * Return 0 if ok, EOF if trouble.
+ */
int
pqGetNBytes(char *s, size_t len)
{
- int bytesDone = 0;
-
- do {
- int r = recv(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
- if (r == 0 || r == -1) {
- if (errno != EINTR)
- return EOF; /* All other than signal-interrupted is error */
- continue; /* Otherwise, try again */
- }
-
- /* r contains number of bytes received */
- bytesDone += r;
-
- } while (bytesDone < len);
- /* Zero-termination now in pq_getnchar() instead */
+ size_t amount;
+
+ while (len > 0)
+ {
+ while (PqRecvPointer >= PqRecvLength)
+ {
+ if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ return EOF; /* Failed to recv data */
+ }
+ amount = PqRecvLength - PqRecvPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
+ PqRecvPointer += amount;
+ s += amount;
+ len -= amount;
+ }
return 0;
}
@@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len)
int
pqPutNBytes(const char *s, size_t len)
{
- int bytesDone = 0;
-
- do {
- int r = send(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
- if (r == 0 || r == -1) {
- if (errno != EINTR)
- return EOF; /* Only signal interruption allowed */
- continue; /* If interruped and read nothing, just try again */
- }
-
- /* r contains number of bytes sent so far */
- bytesDone += r;
- } while (bytesDone < len);
-
+ size_t amount;
+
+ while (len > 0)
+ {
+ if (PqSendPointer >= PQ_BUFFER_SIZE)
+ if (pq_flush()) /* If buffer is full, then flush it out */
+ return EOF;
+ amount = PQ_BUFFER_SIZE - PqSendPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ PqSendPointer += amount;
+ s += amount;
+ len -= amount;
+ }
return 0;
}
@@ -191,8 +193,8 @@ pqGetString(char *s, size_t len)
int c;
/*
- * Keep on reading until we get the terminating '\0' and discard those
- * bytes we don't have room for.
+ * Keep on reading until we get the terminating '\0',
+ * discarding any bytes we don't have room for.
*/
while ((c = pq_getchar()) != EOF && c != '\0')
@@ -216,4 +218,3 @@ pqPutString(const char *s)
{
return pqPutNBytes(s,strlen(s)+1);
}
-
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 4e68c1e24a8..473fc06c3e1 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.37 1999/01/11 03:56:07 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.38 1999/01/23 22:27:29 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...)
pq_putnchar("N", 1);
else
pq_putnchar("E", 1);
- /* pq_putint(-101, 4); *//* should be query id */
pq_putstr(line + TIMESTAMP_SIZE); /* don't show timestamps */
+ /*
+ * This flush is normally not necessary, since postgres.c will
+ * flush out waiting data when control returns to the main loop.
+ * But it seems best to leave it here, so that the client has some
+ * clue what happened if the backend dies before getting back to the
+ * main loop ... error/notice messages should not be a performance-
+ * critical path anyway, so an extra flush won't hurt much ...
+ */
pq_flush();
}
if (!IsUnderPostmaster)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index a315521eb35..c1cdd8ac5d9 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: libpq.h,v 1.23 1999/01/12 12:49:52 scrappy Exp $
+ * $Id: libpq.h,v 1.24 1999/01/23 22:27:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -254,12 +254,13 @@ extern void pq_init(int fd);
extern void pq_gettty(char *tp);
extern int pq_getport(void);
extern void pq_close(void);
-extern void pq_flush(void);
+extern int pq_flush(void);
+extern int pq_recvbuf(void);
extern int pq_getstr(char *s, int maxlen);
extern int PQgetline(char *s, int maxlen);
extern int PQputline(char *s);
-extern int pq_getchar(void);
-extern int pq_peekchar(void);
+extern int pq_getchar(void);
+extern int pq_peekchar(void);
extern int pq_getnchar(char *s, int off, int maxlen);
extern int pq_getint(int b);
extern int pq_putchar(char c);
@@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP);
extern int StreamConnection(int server_fd, Port *port);
extern void StreamClose(int sock);
+/*
+ * Internal send/receive buffers in libpq.
+ * These probably shouldn't be global at all, but unless we merge
+ * pqcomm.c and pqcomprim.c they have to be...
+ */
+
+#define PQ_BUFFER_SIZE 8192
+
+extern char PqSendBuffer[PQ_BUFFER_SIZE];
+extern int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
+extern char PqRecvBuffer[PQ_BUFFER_SIZE];
+extern int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
+extern int PqRecvLength; /* End of data available in PqRecvBuffer */
+
#endif /* LIBPQ_H */