aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logicalfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logicalfuncs.c')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c158
1 files changed, 3 insertions, 155 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 56e47e4b9c4..f789fc127d0 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -22,6 +22,7 @@
#include "miscadmin.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "catalog/pg_type.h"
@@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
p->returned_rows++;
}
-/*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
- */
-static void
-XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
-{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static uint32 sendOff = 0;
-
- p = buf;
- recptr = startptr;
- nbytes = count;
-
- while (nbytes > 0)
- {
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = recptr % XLogSegSize;
-
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
- {
- char path[MAXPGPATH];
-
- /* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo);
-
- XLogFilePath(path, tli, sendSegNo);
-
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
-
- if (sendFile < 0)
- {
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- path)));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendOff = 0;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- char path[MAXPGPATH];
-
- XLogFilePath(path, tli, sendSegNo);
-
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- path, startoff)));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (XLogSegSize - startoff))
- segbytes = XLogSegSize - startoff;
- else
- segbytes = nbytes;
-
- readbytes = read(sendFile, p, segbytes);
- if (readbytes <= 0)
- {
- char path[MAXPGPATH];
-
- XLogFilePath(path, tli, sendSegNo);
-
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- path, sendOff, (unsigned long) segbytes)));
- }
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
- }
-}
-
static void
check_permissions(void)
{
@@ -211,63 +110,12 @@ check_permissions(void)
(errmsg("must be superuser or replication role to use replication slots"))));
}
-/*
- * read_page callback for logical decoding contexts.
- *
- * Public because it would likely be very helpful for someone writing another
- * output method outside walsender, e.g. in a bgworker.
- *
- * TODO: The walsender has it's own version of this, but it relies on the
- * walsender's latch being set whenever WAL is flushed. No such infrastructure
- * exists for normal backends, so we have to do a check/sleep/repeat style of
- * loop for now.
- */
int
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
{
- XLogRecPtr flushptr,
- loc;
- int count;
-
- loc = targetPagePtr + reqLen;
- while (1)
- {
- /*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
- */
- if (!RecoveryInProgress())
- {
- *pageTLI = ThisTimeLineID;
- flushptr = GetFlushRecPtr();
- }
- else
- flushptr = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= flushptr)
- break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
- }
-
- /* more than one block available */
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
- count = XLOG_BLCKSZ;
- /* not enough data there */
- else if (targetPagePtr + reqLen > flushptr)
- return -1;
- /* part of the page available */
- else
- count = flushptr - targetPagePtr;
-
- XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
-
- return count;
+ return read_local_xlog_page(state, targetPagePtr, reqLen,
+ targetRecPtr, cur_page, pageTLI);
}
/*