aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c147
1 files changed, 100 insertions, 47 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 197067d4a1d..afe5e66b4bc 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -4199,6 +4199,9 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
* conflict processing to occur before we begin index vacuum actions. see
* vacuumlazy.c and also comments in btvacuumpage()
*/
+
+ /* Backup blocks are not used in cleanup_info records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
}
/*
@@ -4231,10 +4234,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
- RestoreBkpBlocks(lsn, record, true);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * If we have a full-page image, restore it (using a cleanup lock) and
+ * we're done.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
+ }
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
if (!BufferIsValid(buffer))
@@ -4300,15 +4308,16 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
- RestoreBkpBlocks(lsn, record, false);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
- buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
+ buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
- LockBufferForCleanup(buffer);
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
@@ -4349,6 +4358,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* Backup blocks are not used in newpage records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
/*
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
@@ -4401,8 +4413,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
if (!BufferIsValid(buffer))
@@ -4479,8 +4495,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
@@ -4562,9 +4582,10 @@ static void
heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
{
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- Buffer buffer;
bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ Buffer obuffer,
+ nbuffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
@@ -4592,27 +4613,44 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * In normal operation, it is important to lock the two pages in
+ * page-number order, to avoid possible deadlocks against other update
+ * operations going the other way. However, during WAL replay there can
+ * be no other update happening, so we don't need to worry about that. But
+ * we *do* need to worry that we don't expose an inconsistent state to Hot
+ * Standby queries --- so the original page can't be unlocked before we've
+ * added the new tuple to the new page.
+ */
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
{
+ obuffer = RestoreBackupBlock(lsn, record, 0, false, true);
if (samepage)
- return; /* backup block covered both changes */
+ {
+ /* backup block covered both changes, so we're done */
+ UnlockReleaseBuffer(obuffer);
+ return;
+ }
goto newt;
}
/* Deal with old tuple version */
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- false);
- if (!BufferIsValid(buffer))
+ obuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)),
+ false);
+ if (!BufferIsValid(obuffer))
goto newt;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(obuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
if (samepage)
+ {
+ UnlockReleaseBuffer(obuffer);
return;
+ }
goto newt;
}
@@ -4650,11 +4688,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* is already applied
*/
if (samepage)
+ {
+ nbuffer = obuffer;
goto newsame;
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(obuffer);
/* Deal with new tuple */
@@ -4672,31 +4713,38 @@ newt:;
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_2)
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ {
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ true);
+ Assert(BufferIsValid(nbuffer));
+ page = (Page) BufferGetPage(nbuffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
+ PageInit(page, BufferGetPageSize(nbuffer), 0);
}
else
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- false);
- if (!BufferIsValid(buffer))
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ false);
+ if (!BufferIsValid(nbuffer))
return;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(nbuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(nbuffer);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
}
}
@@ -4741,11 +4789,14 @@ newsame:;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(nbuffer);
+ UnlockReleaseBuffer(nbuffer);
+
+ if (BufferIsValid(obuffer) && obuffer != nbuffer)
+ UnlockReleaseBuffer(obuffer);
/*
- * If the page is running low on free space, update the FSM as well.
+ * If the new page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
@@ -4761,7 +4812,8 @@ newsame:;
*/
if (!hot_update && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ freespace);
}
static void
@@ -4774,8 +4826,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
@@ -4833,8 +4889,12 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
uint32 oldlen;
uint32 newlen;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
@@ -4883,8 +4943,6 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
* required. The ones in heap2 rmgr do.
*/
- RestoreBkpBlocks(lsn, record, false);
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
@@ -4918,11 +4976,6 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- /*
- * Note that RestoreBkpBlocks() is called after conflict processing within
- * each record type handling function.
- */
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_FREEZE: