diff options
author | Vadim B. Mikheev <vadim4o@yahoo.com> | 2000-10-20 11:01:21 +0000 |
---|---|---|
committer | Vadim B. Mikheev <vadim4o@yahoo.com> | 2000-10-20 11:01:21 +0000 |
commit | b58c0411bad414a5dbde8b38f615867c68adf55c (patch) | |
tree | e50db15a8b3ea5dd2cf9d54ef433835bbdae1358 /src/backend/access/heap/heapam.c | |
parent | e18a862d46fa3169c238a6b4a98a783c9a7896f9 (diff) | |
download | postgresql-b58c0411bad414a5dbde8b38f615867c68adf55c.tar.gz postgresql-b58c0411bad414a5dbde8b38f615867c68adf55c.zip |
redo/undo support functions and cleanups.
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 253 |
1 files changed, 157 insertions, 96 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 794ba977cc1..fa6afa80575 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.88 2000/10/13 12:05:20 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.89 2000/10/20 11:01:02 vadim Exp $ * * * INTERFACE ROUTINES @@ -87,7 +87,16 @@ #include "utils/relcache.h" #ifdef XLOG /* comments are in heap_update */ +#include "access/xlogutils.h" + +void heap_redo(XLogRecPtr lsn, XLogRecord *record); +void heap_undo(XLogRecPtr lsn, XLogRecord *record); + static xl_heaptid _locked_tuple_; +static void _heap_unlock_tuple(void *data); + +static void HeapPageCleanup(Buffer buffer); + #endif @@ -1380,6 +1389,8 @@ heap_insert(Relation relation, HeapTuple tup) /* XLOG stuff */ { xl_heap_insert xlrec; + XLogRecPtr recptr; + xlrec.target.node = relation->rd_node; xlrec.target.cid = GetCurrentCommandId(); xlrec.target.tid = tup->t_self; @@ -1388,8 +1399,8 @@ heap_insert(Relation relation, HeapTuple tup) xlrec.t_hoff = tup->t_data->t_hoff; xlrec.mask = tup->t_data->t_infomask; - XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT, - (char*) xlrec, SizeOfHeapInsert, + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT, + (char*) &xlrec, SizeOfHeapInsert, (char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits), tup->t_len - offsetof(HeapTupleHeaderData, t_bits)); @@ -1493,11 +1504,13 @@ l1: /* XLOG stuff */ { xl_heap_delete xlrec; + XLogRecPtr recptr; + xlrec.target.node = relation->rd_node; xlrec.target.cid = GetCurrentCommandId(); xlrec.target.tid = tp.t_self; - XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, - (char*) xlrec, SizeOfHeapDelete, NULL, 0); + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, + (char*) &xlrec, SizeOfHeapDelete, NULL, 0); PageSetLSN(dp, recptr); PageSetSUI(dp, ThisStartUpID); @@ -1692,16 +1705,19 @@ l2: /* XLOG stuff */ { xl_heap_update xlrec; + XLogRecPtr recptr; + xlrec.target.node = relation->rd_node; xlrec.target.cid = GetCurrentCommandId(); xlrec.target.tid = oldtup.t_self; - xlrec.newtid.tid = newtup->t_self; + xlrec.newtid = newtup->t_self; xlrec.t_natts = newtup->t_data->t_natts; + xlrec.t_oid = newtup->t_data->t_oid; xlrec.t_hoff = newtup->t_data->t_hoff; xlrec.mask = newtup->t_data->t_infomask; - XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_UPDATE, - (char*) xlrec, SizeOfHeapUpdate, + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_UPDATE, + (char*) &xlrec, SizeOfHeapUpdate, (char*) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits), newtup->t_len - offsetof(HeapTupleHeaderData, t_bits)); @@ -2000,51 +2016,26 @@ heap_restrpos(HeapScanDesc scan) } #ifdef XLOG -void heap_redo(XLogRecPtr lsn, XLogRecord *record) -{ - uint8 info = record->xl_info & ~XLR_INFO_MASK; - if (info == XLOG_HEAP_INSERT) - heap_xlog_insert(true, lsn, record); - else if (info == XLOG_HEAP_DELETE) - heap_xlog_delete(true, lsn, record); - else if (info == XLOG_HEAP_UPDATE) - heap_xlog_update(true, lsn, record); - else if (info == XLOG_HEAP_MOVE) - heap_xlog_move(true, lsn, record); - else - elog(STOP, "heap_redo: unknown op code %u", info); -} - -void heap_undo(XLogRecPtr lsn, XLogRecord *record) -{ - uint8 info = record->xl_info & ~XLR_INFO_MASK; - - if (info == XLOG_HEAP_INSERT) - heap_xlog_insert(false, lsn, record); - else if (info == XLOG_HEAP_DELETE) - heap_xlog_delete(false, lsn, record); - else if (info == XLOG_HEAP_UPDATE) - heap_xlog_update(false, lsn, record); - else if (info == XLOG_HEAP_MOVE) - heap_xlog_move(false, lsn, record); - else - elog(STOP, "heap_undo: unknown op code %u", info); -} - -void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp; + HeapTupleHeader htup; if (!RelationIsValid(reln)) return; - Buffer buffer = XLogReadBuffer(false, reln, + buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) return; - Page page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) { PageInit(page, BufferGetPageSize(buffer), 0); @@ -2065,8 +2056,8 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ elog(STOP, "heap_delete_undo: bad page LSN"); - OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - ItemId lp = PageGetItemId(page, offnum); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + lp = PageGetItemId(page, offnum); if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) { @@ -2084,7 +2075,7 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) UnlockAndReleaseBuffer(buffer); return; } - HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); + htup = (HeapTupleHeader) PageGetItem(page, lp); if (redo) { @@ -2115,19 +2106,25 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) } -void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_insert *xlrec = (xl_heap_insert*) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp; + HeapTupleHeader htup; if (!RelationIsValid(reln)) return; - Buffer buffer = XLogReadBuffer((redo) ? true : false, reln, + buffer = XLogReadBuffer((redo) ? true : false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) return; - Page page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) { PageInit(page, BufferGetPageSize(buffer), 0); @@ -2142,16 +2139,16 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) if (redo) { + char tbuf[MaxTupleSize]; + HeapTupleHeader htup = (HeapTupleHeader) tbuf; + uint32 newlen = record->xl_len - SizeOfHeapInsert; + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); return; } - char tbuf[MaxTupleSize]; - HeapTupleHeader htup = (HeapTupleHeader) tbuf; - uint32 newlen = record->xl_len - SizeOfHeapInsert; - memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), (char*)xlrec + SizeOfHeapInsert, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); @@ -2162,10 +2159,9 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) htup->t_cmin = xlrec->target.cid; htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; - PageManagerModeSet(OverwritePageManagerMode); - OffsetNumber offnum = PageAddItem(page, htup, newlen, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), LP_USED); - PageManagerModeSet(ShufflePageManagerMode); + offnum = PageAddItem(page, (Item)htup, newlen, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(STOP, "heap_insert_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -2178,8 +2174,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ elog(STOP, "heap_insert_undo: bad page LSN"); - OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - ItemId lp = PageGetItemId(page, offnum); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + lp = PageGetItemId(page, offnum); if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) { @@ -2195,11 +2191,11 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) UnlockAndReleaseBuffer(buffer); return; } - HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); + htup = (HeapTupleHeader) PageGetItem(page, lp); /* is it our tuple ? */ - if (PageGetSUI(page) != ThisStartUpID || - htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + Assert(PageGetSUI(page) == ThisStartUpID); + if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) { if (!InRecovery) elog(STOP, "heap_insert_undo: invalid target tuple in rollback"); @@ -2207,33 +2203,25 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) return; } - if (InRecovery || BufferIsUpdatable(buffer)) - { - lp->lp_flags &= ~LP_USED; - PageRepairFragmentation(page); - UnlockAndWriteBuffer(buffer); - } - else /* we can't delete tuple right now */ - { - lp->lp_flags |= LP_DELETE; /* mark for deletion */ - MarkBufferForCleanup(buffer, HeapPageCleanup); - } + lp->lp_flags |= LP_DELETE; /* mark for deletion */ + MarkBufferForCleanup(buffer, HeapPageCleanup); } -void heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); - - if (!RelationIsValid(reln)) - return; Buffer buffer; Page page; OffsetNumber offnum; ItemId lp; HeapTupleHeader htup; + if (!RelationIsValid(reln)) + return; + /* * Currently UPDATE is DELETE + INSERT and so code below are near * exact sum of code in heap_xlog_delete & heap_xlog_insert. We could @@ -2339,15 +2327,15 @@ newt:; if (redo) { + char tbuf[MaxTupleSize]; + uint32 newlen = record->xl_len - SizeOfHeapUpdate; + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); return; } - char tbuf[MaxTupleSize]; - uint32 newlen = record->xl_len - SizeOfHeapUpdate; - htup = (HeapTupleHeader) tbuf; memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), (char*)xlrec + SizeOfHeapUpdate, newlen); @@ -2359,10 +2347,9 @@ newt:; htup->t_cmin = xlrec->target.cid; htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; - PageManagerModeSet(OverwritePageManagerMode); - OffsetNumber offnum = PageAddItem(page, htup, newlen, - ItemPointerGetOffsetNumber(&(xlrec->newtid)), LP_USED); - PageManagerModeSet(ShufflePageManagerMode); + offnum = PageAddItem(page, (Item)htup, newlen, + ItemPointerGetOffsetNumber(&(xlrec->newtid)), + LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(STOP, "heap_update_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -2395,8 +2382,8 @@ newt:; htup = (HeapTupleHeader) PageGetItem(page, lp); /* is it our tuple ? */ - if (PageGetSUI(page) != ThisStartUpID || - htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + Assert(PageGetSUI(page) == ThisStartUpID); + if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) { if (!InRecovery) elog(STOP, "heap_update_undo: invalid new tuple in rollback"); @@ -2404,19 +2391,93 @@ newt:; return; } - if (InRecovery || BufferIsUpdatable(buffer)) - { - lp->lp_flags &= ~LP_USED; - PageRepairFragmentation(page); - UnlockAndWriteBuffer(buffer); - } - else /* we can't delete tuple right now */ - { - lp->lp_flags |= LP_DELETE; /* mark for deletion */ - MarkBufferForCleanup(buffer, PageCleanup); - } + lp->lp_flags |= LP_DELETE; /* mark for deletion */ + MarkBufferForCleanup(buffer, HeapPageCleanup); +} + +static void +_heap_unlock_tuple(void *data) +{ + xl_heaptid *xltid = (xl_heaptid*) data; + Relation reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp; + HeapTupleHeader htup; + + if (!RelationIsValid(reln)) + elog(STOP, "_heap_unlock_tuple: can't open relation"); + + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xltid->tid))); + if (!BufferIsValid(buffer)) + elog(STOP, "_heap_unlock_tuple: can't read buffer"); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "_heap_unlock_tuple: uninitialized page"); + + offnum = ItemPointerGetOffsetNumber(&(xltid->tid)); + if (offnum > PageGetMaxOffsetNumber(page)) + elog(STOP, "_heap_unlock_tuple: invalid itemid"); + lp = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) + elog(STOP, "_heap_unlock_tuple: unused/deleted tuple in rollback"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + if (htup->t_xmax != GetCurrentTransactionId() || + htup->t_cmax != GetCurrentCommandId()) + elog(STOP, "_heap_unlock_tuple: invalid xmax/cmax in rollback"); + htup->t_infomask &= ~HEAP_XMAX_UNLOGGED; + htup->t_infomask |= HEAP_XMAX_INVALID; + UnlockAndWriteBuffer(buffer); + return; } +void heap_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_HEAP_INSERT) + heap_xlog_insert(true, lsn, record); + else if (info == XLOG_HEAP_DELETE) + heap_xlog_delete(true, lsn, record); + else if (info == XLOG_HEAP_UPDATE) + heap_xlog_update(true, lsn, record); +#ifdef NOT_USED + else if (info == XLOG_HEAP_MOVE) + heap_xlog_move(true, lsn, record); +#endif + else + elog(STOP, "heap_redo: unknown op code %u", info); +} + +void heap_undo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_HEAP_INSERT) + heap_xlog_insert(false, lsn, record); + else if (info == XLOG_HEAP_DELETE) + heap_xlog_delete(false, lsn, record); + else if (info == XLOG_HEAP_UPDATE) + heap_xlog_update(false, lsn, record); +#ifdef NOT_USED + else if (info == XLOG_HEAP_MOVE) + heap_xlog_move(false, lsn, record); +#endif + else + elog(STOP, "heap_undo: unknown op code %u", info); +} + +static void +HeapPageCleanup(Buffer buffer) +{ + Page page = (Page) BufferGetPage(buffer); + PageRepairFragmentation(page); +} #endif /* XLOG */ |