aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/rmgrdesc/xlogdesc.c12
-rw-r--r--src/backend/access/transam/xlog.c154
-rw-r--r--src/backend/access/transam/xlogreader.c40
-rw-r--r--src/include/access/xlog_internal.h11
-rw-r--r--src/include/access/xlogreader.h10
-rw-r--r--src/include/catalog/pg_control.h2
-rw-r--r--src/test/recovery/t/026_overwrite_contrecord.pl207
-rwxr-xr-xsrc/test/recovery/t/idiosyncratic_copy20
-rw-r--r--src/tools/pgindent/typedefs.list1
9 files changed, 450 insertions, 7 deletions
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e6090a9dadb..5bf2346dd91 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -139,6 +139,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
timestamptz_to_str(xlrec.end_time));
}
+ else if (info == XLOG_OVERWRITE_CONTRECORD)
+ {
+ xl_overwrite_contrecord xlrec;
+
+ memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
+ appendStringInfo(buf, "lsn %X/%X; time %s",
+ LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
+ timestamptz_to_str(xlrec.overwrite_time));
+ }
}
const char *
@@ -178,6 +187,9 @@ xlog_identify(uint8 info)
case XLOG_END_OF_RECOVERY:
id = "END_OF_RECOVERY";
break;
+ case XLOG_OVERWRITE_CONTRECORD:
+ id = "OVERWRITE_CONTRECORD";
+ break;
case XLOG_FPI:
id = "FPI";
break;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e84f6ed2ff1..ce0faeb5d37 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -216,6 +216,15 @@ static XLogRecPtr flushedUpto = 0;
static TimeLineID receiveTLI = 0;
/*
+ * abortedRecPtr is the start pointer of a broken record at end of WAL when
+ * recovery completes; missingContrecPtr is the location of the first
+ * contrecord that went missing. See CreateOverwriteContrecordRecord for
+ * details.
+ */
+static XLogRecPtr abortedRecPtr;
+static XLogRecPtr missingContrecPtr;
+
+/*
* During recovery, lastFullPageWrites keeps track of full_page_writes that
* the replayed WAL records indicate. It's initialized with full_page_writes
* that the recovery starting checkpoint record indicates, and then updated
@@ -903,8 +912,11 @@ static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
TimeLineID prevTLI);
+static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
+ XLogReaderState *state);
static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -2258,6 +2270,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
NewPage->xlp_info |= XLP_BKP_REMOVABLE;
/*
+ * If a record was found to be broken at the end of recovery, and
+ * we're going to write on the page where its first contrecord was
+ * lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page
+ * header. See CreateOverwriteContrecordRecord().
+ */
+ if (missingContrecPtr == NewPageBeginPtr)
+ {
+ NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
+ missingContrecPtr = InvalidXLogRecPtr;
+ }
+
+ /*
* If first page of an XLOG segment file, make it a long header.
*/
if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
@@ -4390,6 +4414,19 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
{
+ /*
+ * When not in standby mode we find that WAL ends in an incomplete
+ * record, keep track of that record. After recovery is done,
+ * we'll write a record to indicate downstream WAL readers that
+ * that portion is to be ignored.
+ */
+ if (!StandbyMode &&
+ !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+ {
+ abortedRecPtr = xlogreader->abortedRecPtr;
+ missingContrecPtr = xlogreader->missingContrecPtr;
+ }
+
if (readFile >= 0)
{
close(readFile);
@@ -7026,6 +7063,12 @@ StartupXLOG(void)
InRecovery = true;
}
+ /*
+ * Start recovery assuming that the final record isn't lost.
+ */
+ abortedRecPtr = InvalidXLogRecPtr;
+ missingContrecPtr = InvalidXLogRecPtr;
+
/* REDO */
if (InRecovery)
{
@@ -7622,8 +7665,9 @@ StartupXLOG(void)
/*
* Kill WAL receiver, if it's still running, before we continue to write
- * the startup checkpoint record. It will trump over the checkpoint and
- * subsequent records if it's still alive when we start writing WAL.
+ * the startup checkpoint and aborted-contrecord records. It will trump
+ * over these records and subsequent ones if it's still alive when we
+ * start writing WAL.
*/
ShutdownWalRcv();
@@ -7656,8 +7700,12 @@ StartupXLOG(void)
StandbyMode = false;
/*
- * Re-fetch the last valid or last applied record, so we can identify the
- * exact endpoint of what we consider the valid portion of WAL.
+ * Determine where to start writing WAL next.
+ *
+ * When recovery ended in an incomplete record, write a WAL record about
+ * that and continue after it. In all other cases, re-fetch the last
+ * valid or last applied record, so we can identify the exact endpoint of
+ * what we consider the valid portion of WAL.
*/
XLogBeginRead(xlogreader, LastRec);
record = ReadRecord(xlogreader, PANIC, false);
@@ -7807,6 +7855,18 @@ StartupXLOG(void)
XLogCtl->PrevTimeLineID = PrevTimeLineID;
/*
+ * Actually, if WAL ended in an incomplete record, skip the parts that
+ * made it through and start writing after the portion that persisted.
+ * (It's critical to first write an OVERWRITE_CONTRECORD message, which
+ * we'll do as soon as we're open for writing new WAL.)
+ */
+ if (!XLogRecPtrIsInvalid(missingContrecPtr))
+ {
+ Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
+ EndOfLog = missingContrecPtr;
+ }
+
+ /*
* Prepare to write WAL starting at EndOfLog location, and init xlog
* buffer cache using the block containing the last record from the
* previous incarnation.
@@ -7858,13 +7918,23 @@ StartupXLOG(void)
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
+ LocalSetXLogInsertAllowed();
+
+ /* If necessary, write overwrite-contrecord before doing anything else */
+ if (!XLogRecPtrIsInvalid(abortedRecPtr))
+ {
+ Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
+ CreateOverwriteContrecordRecord(abortedRecPtr);
+ abortedRecPtr = InvalidXLogRecPtr;
+ missingContrecPtr = InvalidXLogRecPtr;
+ }
+
/*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
* record before resource manager writes cleanup WAL records or checkpoint
* record is written.
*/
Insert->fullPageWrites = lastFullPageWrites;
- LocalSetXLogInsertAllowed();
UpdateFullPageWrites();
LocalXLogInsertAllowed = -1;
@@ -9365,6 +9435,53 @@ CreateEndOfRecoveryRecord(void)
}
/*
+ * Write an OVERWRITE_CONTRECORD message.
+ *
+ * When on WAL replay we expect a continuation record at the start of a page
+ * that is not there, recovery ends and WAL writing resumes at that point.
+ * But it's wrong to resume writing new WAL back at the start of the record
+ * that was broken, because downstream consumers of that WAL (physical
+ * replicas) are not prepared to "rewind". So the first action after
+ * finishing replay of all valid WAL must be to write a record of this type
+ * at the point where the contrecord was missing; to support xlogreader
+ * detecting the special case, XLP_FIRST_IS_OVERWRITE_CONTRECORD is also added
+ * to the page header where the record occurs. xlogreader has an ad-hoc
+ * mechanism to report metadata about the broken record, which is what we
+ * use here.
+ *
+ * At replay time, XLP_FIRST_IS_OVERWRITE_CONTRECORD instructs xlogreader to
+ * skip the record it was reading, and pass back the LSN of the skipped
+ * record, so that its caller can verify (on "replay" of that record) that the
+ * XLOG_OVERWRITE_CONTRECORD matches what was effectively overwritten.
+ */
+static XLogRecPtr
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+{
+ xl_overwrite_contrecord xlrec;
+ XLogRecPtr recptr;
+
+ /* sanity check */
+ if (!RecoveryInProgress())
+ elog(ERROR, "can only be used at end of recovery");
+
+ xlrec.overwritten_lsn = aborted_lsn;
+ xlrec.overwrite_time = GetCurrentTimestamp();
+
+ START_CRIT_SECTION();
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
+
+ XLogFlush(recptr);
+
+ END_CRIT_SECTION();
+
+ return recptr;
+}
+
+/*
* Flush all data in shared memory to disk, and fsync
*
* This is the common code shared between regular checkpoints and
@@ -10291,6 +10408,13 @@ xlog_redo(XLogReaderState *record)
RecoveryRestartPoint(&checkPoint);
}
+ else if (info == XLOG_OVERWRITE_CONTRECORD)
+ {
+ xl_overwrite_contrecord xlrec;
+
+ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
+ VerifyOverwriteContrecord(&xlrec, record);
+ }
else if (info == XLOG_END_OF_RECOVERY)
{
xl_end_of_recovery xlrec;
@@ -10451,6 +10575,26 @@ xlog_redo(XLogReaderState *record)
}
}
+/*
+ * Verify the payload of a XLOG_OVERWRITE_CONTRECORD record.
+ */
+static void
+VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state)
+{
+ if (xlrec->overwritten_lsn != state->overwrittenRecPtr)
+ elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
+ LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
+ LSN_FORMAT_ARGS(state->overwrittenRecPtr));
+
+ ereport(LOG,
+ (errmsg("sucessfully skipped missing contrecord at %X/%X, overwritten at %s",
+ LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
+ timestamptz_to_str(xlrec->overwrite_time))));
+
+ /* Verifying the record should only happen once */
+ state->overwrittenRecPtr = InvalidXLogRecPtr;
+}
+
#ifdef WAL_DEBUG
static void
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 42738eb940c..f01aea6ddad 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -275,6 +275,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
+ bool assembled;
bool gotheader;
int readOff;
@@ -290,6 +291,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
state->errormsg_buf[0] = '\0';
ResetDecoder(state);
+ state->abortedRecPtr = InvalidXLogRecPtr;
+ state->missingContrecPtr = InvalidXLogRecPtr;
RecPtr = state->EndRecPtr;
@@ -316,7 +319,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
randAccess = true;
}
+restart:
state->currRecPtr = RecPtr;
+ assembled = false;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -412,6 +417,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
char *buffer;
uint32 gotlen;
+ assembled = true;
+
/*
* Enlarge readRecordBuf as needed.
*/
@@ -445,8 +452,25 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
Assert(SizeOfXLogShortPHD <= readOff);
- /* Check that the continuation on next page looks valid */
pageHeader = (XLogPageHeader) state->readBuf;
+
+ /*
+ * If we were expecting a continuation record and got an
+ * "overwrite contrecord" flag, that means the continuation record
+ * was overwritten with a different record. Restart the read by
+ * assuming the address to read is the location where we found
+ * this flag; but keep track of the LSN of the record we were
+ * reading, for later verification.
+ */
+ if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
+ {
+ state->overwrittenRecPtr = state->currRecPtr;
+ ResetDecoder(state);
+ RecPtr = targetPagePtr;
+ goto restart;
+ }
+
+ /* Check that the continuation on next page looks valid */
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
report_invalid_record(state,
@@ -548,6 +572,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
return NULL;
err:
+ if (assembled)
+ {
+ /*
+ * We get here when a record that spans multiple pages needs to be
+ * assembled, but something went wrong -- perhaps a contrecord piece
+ * was lost. If caller is WAL replay, it will know where the aborted
+ * record was and where to direct followup WAL to be written, marking
+ * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
+ * in turn signal downstream WAL consumers that the broken WAL record
+ * is to be ignored.
+ */
+ state->abortedRecPtr = RecPtr;
+ state->missingContrecPtr = targetPagePtr;
+ }
/*
* Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 26a743b6b6c..dcf41e9257c 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define XLP_LONG_HEADER 0x0002
/* This flag indicates backup blocks starting in this page are optional */
#define XLP_BKP_REMOVABLE 0x0004
+/* Replaces a missing contrecord; see CreateOverwriteContrecordRecord */
+#define XLP_FIRST_IS_OVERWRITE_CONTRECORD 0x0008
/* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS 0x0007
+#define XLP_ALL_FLAGS 0x000F
#define XLogPageHeaderSize(hdr) \
(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@@ -249,6 +251,13 @@ typedef struct xl_restore_point
char rp_name[MAXFNAMELEN];
} xl_restore_point;
+/* Overwrite of prior contrecord */
+typedef struct xl_overwrite_contrecord
+{
+ XLogRecPtr overwritten_lsn;
+ TimestampTz overwrite_time;
+} xl_overwrite_contrecord;
+
/* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
typedef struct xl_end_of_recovery
{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df6..10458c23eda 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -252,6 +252,16 @@ struct XLogReaderState
/* Buffer to hold error message */
char *errormsg_buf;
+
+ /*
+ * Set at the end of recovery: the start point of a partial record at the
+ * end of WAL (InvalidXLogRecPtr if there wasn't one), and the start
+ * location of its first contrecord that went missing.
+ */
+ XLogRecPtr abortedRecPtr;
+ XLogRecPtr missingContrecPtr;
+ /* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */
+ XLogRecPtr overwrittenRecPtr;
};
/* Get a new XLogReader */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e3f48158ce7..749bce0cc6f 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,8 @@ typedef struct CheckPoint
#define XLOG_END_OF_RECOVERY 0x90
#define XLOG_FPI_FOR_HINT 0xA0
#define XLOG_FPI 0xB0
+/* 0xC0 is used in Postgres 9.5-11 */
+#define XLOG_OVERWRITE_CONTRECORD 0xD0
/*
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
new file mode 100644
index 00000000000..a034215dac1
--- /dev/null
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -0,0 +1,207 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for already-propagated WAL segments ending in incomplete WAL records.
+
+use strict;
+use warnings;
+
+use FindBin;
+use PostgresNode;
+use TestLib;
+use Test::More;
+
+plan tests => 5;
+
+# Test: Create a physical replica that's missing the last WAL file,
+# then restart the primary to create a divergent WAL file and observe
+# that the replica replays the "overwrite contrecord" from that new
+# file.
+
+my $node = PostgresNode->get_new_node('primary');
+$node->init(allows_streaming => 1);
+$node->append_conf('postgresql.conf', 'wal_keep_size=1GB');
+$node->start;
+
+$node->safe_psql('postgres', 'create table filler (a int)');
+# First, measure how many bytes does the insertion of 1000 rows produce
+my $start_lsn =
+ $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$node->safe_psql('postgres',
+ 'insert into filler select * from generate_series(1, 1000)');
+my $end_lsn =
+ $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+my $rows_walsize = $end_lsn - $start_lsn;
+
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql(
+ 'postgres', qq{
+WITH setting AS (
+ SELECT setting::int AS wal_segsize
+ FROM pg_settings WHERE name = 'wal_segment_size'
+)
+INSERT INTO filler
+SELECT g FROM setting,
+ generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
+});
+
+my $initfile = $node->safe_psql('postgres',
+ 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+$node->safe_psql('postgres',
+ qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+);
+#$node->safe_psql('postgres', qq{create table foo ()});
+my $endfile = $node->safe_psql('postgres',
+ 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+ok($initfile != $endfile, "$initfile differs from $endfile");
+
+# Now stop abruptly, to avoid a stop checkpoint. We can remove the tail file
+# afterwards, and on startup the large message should be overwritten with new
+# contents
+$node->stop('immediate');
+
+unlink $node->basedir . "/pgdata/pg_wal/$endfile"
+ or die "could not unlink " . $node->basedir . "/pgdata/pg_wal/$endfile: $!";
+
+# OK, create a standby at this spot.
+$node->backup_fs_cold('backup');
+my $node_standby = PostgresNode->get_new_node('standby');
+$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
+
+$node_standby->start;
+$node->start;
+
+$node->safe_psql('postgres',
+ qq{create table foo (a text); insert into foo values ('hello')});
+$node->safe_psql('postgres',
+ qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
+
+my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $caughtup_query =
+ "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for standby to catch up";
+
+ok($node_standby->safe_psql('postgres', 'select * from foo') eq 'hello',
+ 'standby replays past overwritten contrecord');
+
+# Verify message appears in standby's log
+my $log = slurp_file($node_standby->logfile);
+like(
+ $log,
+ qr[sucessfully skipped missing contrecord at],
+ "found log line in standby");
+
+$node->stop;
+$node_standby->stop;
+
+
+# Second test: a standby that receives WAL via archive/restore commands.
+$node = PostgresNode->get_new_node('primary2');
+$node->init(
+ has_archiving => 1,
+ extra => ['--wal-segsize=1']);
+
+# Note: consistent use of forward slashes here avoids any escaping problems
+# that arise from use of backslashes. That means we need to double-quote all
+# the paths in the archive_command
+my $perlbin = TestLib::perl2host($^X);
+$perlbin =~ s!\\!/!g if $TestLib::windows_os;
+my $archivedir = $node->archive_dir;
+$archivedir =~ s!\\!/!g if $TestLib::windows_os;
+$node->append_conf(
+ 'postgresql.conf',
+ qq{
+archive_command = '"$perlbin" "$FindBin::RealBin/idiosyncratic_copy" "%p" "$archivedir/%f"'
+wal_level = replica
+max_wal_senders = 2
+wal_keep_size = 1GB
+});
+# Make sure that Msys perl doesn't complain about difficulty in setting locale
+# when called from the archive_command.
+local $ENV{PERL_BADLANG} = 0;
+$node->start;
+$node->backup('backup');
+
+$node_standby = PostgresNode->get_new_node('standby2');
+$node_standby->init_from_backup($node, 'backup', has_restoring => 1);
+
+$node_standby->start;
+
+$node->safe_psql('postgres', 'create table filler (a int)');
+# First, measure how many bytes does the insertion of 1000 rows produce
+$start_lsn =
+ $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$node->safe_psql('postgres',
+ 'insert into filler select * from generate_series(1, 1000)');
+$end_lsn =
+ $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$rows_walsize = $end_lsn - $start_lsn;
+
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql(
+ 'postgres', qq{
+WITH setting AS (
+ SELECT setting::int AS wal_segsize
+ FROM pg_settings WHERE name = 'wal_segment_size'
+)
+INSERT INTO filler
+SELECT g FROM setting,
+ generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
+});
+
+# Now block idiosyncratic_copy from creating the next WAL in the replica
+my $archivedgood = $node->safe_psql('postgres',
+ q{SELECT pg_walfile_name(pg_current_wal_insert_lsn())});
+my $archivedfail = $node->safe_psql(
+ 'postgres',
+ q{SELECT pg_walfile_name(pg_current_wal_insert_lsn() + setting::integer)
+ from pg_settings where name = 'wal_segment_size'});
+open my $filefail, ">", "$archivedir/$archivedfail.fail"
+ or die "can't open $archivedir/$archivedfail.fail: $!";
+
+my $currlsn =
+ $node->safe_psql('postgres', 'select pg_current_wal_insert_lsn() - 1000');
+
+# Now produce a large WAL record in a transaction that we leave open
+my ($in, $out);
+my $timer = IPC::Run::timeout(180);
+my $h =
+ $node->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0);
+
+$in .= qq{BEGIN;
+SELECT pg_logical_emit_message(true, 'test 026', repeat('somenoise', 8192));
+};
+$h->pump_nb;
+$node->poll_query_until(
+ 'postgres',
+ "SELECT last_archived_wal >= '$archivedgood' FROM pg_stat_archiver"),
+ or die "Timed out while waiting for standby to catch up";
+
+# Now crash the node with the transaction open
+$node->stop('immediate');
+#$h->finish();
+$node->start;
+$node->safe_psql('postgres', 'create table witness (a int);');
+$node->safe_psql('postgres', 'insert into witness values (42)');
+unlink "$archivedir/$archivedfail.fail"
+ or die "can't unlink $archivedir/$archivedfail.fail: $!";
+$node->safe_psql('postgres', 'select pg_switch_wal()');
+
+$until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$caughtup_query = "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for standby to catch up";
+
+my $answer = $node_standby->safe_psql('postgres', 'select * from witness');
+is($answer, '42', 'witness tuple appears');
+
+# Verify message appears in standby's log
+$log = slurp_file($node_standby->logfile);
+like(
+ $log,
+ qr[sucessfully skipped missing contrecord at],
+ "found log line in standby");
+$node->stop;
+$node_standby->stop;
diff --git a/src/test/recovery/t/idiosyncratic_copy b/src/test/recovery/t/idiosyncratic_copy
new file mode 100755
index 00000000000..83e25e0e898
--- /dev/null
+++ b/src/test/recovery/t/idiosyncratic_copy
@@ -0,0 +1,20 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use File::Copy;
+
+die "wrong number of arguments" if @ARGV != 2;
+my ($source, $target) = @ARGV;
+if ($^O eq 'msys')
+{
+ # make a windows path look like an msys path if necessary
+ $source =~ s!^([A-Za-z]):!'/' . lc($1)!e;
+ $source =~ s!\\!/!g;
+}
+
+die "$0: failed copy of $target" if -f "$target.fail";
+
+copy($source, $target) or die "couldn't copy $source to $target: $!";
+print STDERR "$0: archived $source to $target successfully\n";
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2dcbdc56ea4..6c86e9199cb 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3702,6 +3702,7 @@ xl_logical_message
xl_multi_insert_tuple
xl_multixact_create
xl_multixact_truncate
+xl_overwrite_contrecord
xl_parameter_change
xl_relmap_update
xl_replorigin_drop