aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xlogreader.c4
-rw-r--r--src/backend/replication/walsender.c38
-rw-r--r--src/include/access/xlogreader.h4
-rw-r--r--src/test/recovery/t/006_logical_decoding.pl11
4 files changed, 31 insertions, 26 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5995798b585..cb76be4f469 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr);
static void ResetDecoder(XLogReaderState *state);
+static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+ int segsize, const char *waldir);
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
@@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
/*
* Initialize the passed segment structs.
*/
-void
+static void
WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir)
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2364cbfc61b..e2477c47e0a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -130,13 +130,11 @@ bool log_replication_commands = false;
bool wake_wal_senders = false;
/*
- * Physical walsender does not use xlogreader to read WAL, but it does use a
- * fake one to keep state. Logical walsender uses a proper xlogreader. Both
- * keep the 'xlogreader' pointer to the right one, for the sake of common
- * routines.
+ * xlogreader used for replication. Note that a WAL sender doing physical
+ * replication does not need xlogreader to read WAL, but it needs one to
+ * keep a state of its work.
*/
-static XLogReaderState fake_xlogreader;
-static XLogReaderState *xlogreader;
+static XLogReaderState *xlogreader = NULL;
/*
* These variables keep track of the state of the timeline we're currently
@@ -285,20 +283,6 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
-
- /*
- * Prepare physical walsender's fake xlogreader struct. Logical walsender
- * does this later.
- */
- if (!am_db_walsender)
- {
- xlogreader = &fake_xlogreader;
- xlogreader->routine =
- *XL_ROUTINE(.segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close);
- WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
- wal_segment_size, NULL);
- }
}
/*
@@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
+ /* create xlogreader for physical replication */
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
+ NULL);
+
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
@@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
+
+ Assert(xlogreader != NULL);
break;
}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d930fe957df..b0f2a6ed43a 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/* Initialize supporting structures */
-extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
- int segsize, const char *waldir);
-
/* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index ee05535b1c2..78229a7b92b 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 13;
+use Test::More tests => 14;
use Config;
# Initialize master node
@@ -36,6 +36,15 @@ ok( $stderr =~
m/replication slot "test_slot" was not created in this database/,
"Logical decoding correctly fails to start");
+# Check case of walsender not using a database connection. Logical
+# decoding should not be allowed.
+($result, $stdout, $stderr) = $node_master->psql(
+ 'template1',
+ qq[START_REPLICATION SLOT s1 LOGICAL 0/1],
+ replication => 'true');
+ok($stderr =~ /ERROR: logical decoding requires a database connection/,
+ "Logical decoding fails on non-database connection");
+
$node_master->safe_psql('postgres',
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
);