aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/pg_basebackup/pg_recvlogical.c55
-rw-r--r--src/tools/pgindent/typedefs.list1
2 files changed, 44 insertions, 12 deletions
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3c7937a1df..9fe4ac81260 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -32,6 +32,14 @@
/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5
+typedef enum
+{
+ STREAM_STOP_NONE,
+ STREAM_STOP_END_OF_WAL,
+ STREAM_STOP_KEEPALIVE,
+ STREAM_STOP_SIGNAL
+} StreamStopReason;
+
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
@@ -55,6 +63,7 @@ static const char *plugin = "test_decoding";
/* Global State */
static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
static TimestampTz output_last_fsync = -1;
@@ -66,7 +75,8 @@ static void usage(void);
static void StreamLogicalLog(void);
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
- bool keepalive, XLogRecPtr lsn);
+ StreamStopReason reason,
+ XLogRecPtr lsn);
static void
usage(void)
@@ -207,9 +217,11 @@ StreamLogicalLog(void)
TimestampTz last_status = -1;
int i;
PQExpBuffer query;
+ XLogRecPtr cur_record_lsn;
output_written_lsn = InvalidXLogRecPtr;
output_fsync_lsn = InvalidXLogRecPtr;
+ cur_record_lsn = InvalidXLogRecPtr;
/*
* Connect in replication mode to the server
@@ -275,7 +287,8 @@ StreamLogicalLog(void)
int bytes_written;
TimestampTz now;
int hdr_len;
- XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
+
+ cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -487,7 +500,7 @@ StreamLogicalLog(void)
if (endposReached)
{
- prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ stop_reason = STREAM_STOP_KEEPALIVE;
time_to_abort = true;
break;
}
@@ -527,7 +540,7 @@ StreamLogicalLog(void)
*/
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ stop_reason = STREAM_STOP_END_OF_WAL;
time_to_abort = true;
break;
}
@@ -572,12 +585,16 @@ StreamLogicalLog(void)
/* endpos was exactly the record we just processed, we're done */
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ stop_reason = STREAM_STOP_END_OF_WAL;
time_to_abort = true;
break;
}
}
+ /* Clean up connection state if stream has been aborted */
+ if (time_to_abort)
+ prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
+
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_COPY_OUT)
{
@@ -656,6 +673,7 @@ error:
static void
sigexit_handler(SIGNAL_ARGS)
{
+ stop_reason = STREAM_STOP_SIGNAL;
time_to_abort = true;
}
@@ -1021,18 +1039,31 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now)
* retry on failure.
*/
static void
-prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
+ XLogRecPtr lsn)
{
(void) PQputCopyEnd(conn, NULL);
(void) PQflush(conn);
if (verbose)
{
- if (keepalive)
- pg_log_info("end position %X/%X reached by keepalive",
- LSN_FORMAT_ARGS(endpos));
- else
- pg_log_info("end position %X/%X reached by WAL record at %X/%X",
- LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+ switch (reason)
+ {
+ case STREAM_STOP_SIGNAL:
+ pg_log_info("received interrupt signal, exiting");
+ break;
+ case STREAM_STOP_KEEPALIVE:
+ pg_log_info("end position %X/%X reached by keepalive",
+ LSN_FORMAT_ARGS(endpos));
+ break;
+ case STREAM_STOP_END_OF_WAL:
+ Assert(!XLogRecPtrIsInvalid(lsn));
+ pg_log_info("end position %X/%X reached by WAL record at %X/%X",
+ LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+ break;
+ case STREAM_STOP_NONE:
+ Assert(false);
+ break;
+ }
}
}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e941fb6c82f..a1cf01e38e4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2639,6 +2639,7 @@ Step
StopList
StrategyNumber
StreamCtl
+StreamStopReason
String
StringInfo
StringInfoData