diff options
author | Andres Freund <andres@anarazel.de> | 2015-05-08 05:31:36 +0200 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2015-05-08 05:43:10 +0200 |
commit | 168d5805e4c08bed7b95d351bf097cff7c07dd65 (patch) | |
tree | cd55bff71bf05324f388d3404c1b3697f3a96e7e /src/backend/access/heap/heapam.c | |
parent | 2c8f4836db058d0715bc30a30655d646287ba509 (diff) | |
download | postgresql-168d5805e4c08bed7b95d351bf097cff7c07dd65.tar.gz postgresql-168d5805e4c08bed7b95d351bf097cff7c07dd65.zip |
Add support for INSERT ... ON CONFLICT DO NOTHING/UPDATE.
The newly added ON CONFLICT clause allows to specify an alternative to
raising a unique or exclusion constraint violation error when inserting.
ON CONFLICT refers to constraints that can either be specified using a
inference clause (by specifying the columns of a unique constraint) or
by naming a unique or exclusion constraint. DO NOTHING avoids the
constraint violation, without touching the pre-existing row. DO UPDATE
SET ... [WHERE ...] updates the pre-existing tuple, and has access to
both the tuple proposed for insertion and the existing tuple; the
optional WHERE clause can be used to prevent an update from being
executed. The UPDATE SET and WHERE clauses have access to the tuple
proposed for insertion using the "magic" EXCLUDED alias, and to the
pre-existing tuple using the table name or its alias.
This feature is often referred to as upsert.
This is implemented using a new infrastructure called "speculative
insertion". It is an optimistic variant of regular insertion that first
does a pre-check for existing tuples and then attempts an insert. If a
violating tuple was inserted concurrently, the speculatively inserted
tuple is deleted and a new attempt is made. If the pre-check finds a
matching tuple the alternative DO NOTHING or DO UPDATE action is taken.
If the insertion succeeds without detecting a conflict, the tuple is
deemed inserted.
To handle the possible ambiguity between the excluded alias and a table
named excluded, and for convenience with long relation names, INSERT
INTO now can alias its target table.
Bumps catversion as stored rules change.
Author: Peter Geoghegan, with significant contributions from Heikki
Linnakangas and Andres Freund. Testing infrastructure by Jeff Janes.
Reviewed-By: Heikki Linnakangas, Andres Freund, Robert Haas, Simon Riggs,
Dean Rasheed, Stephen Frost and many others.
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 377 |
1 files changed, 338 insertions, 39 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e84c1743f4f..7ea9a77e7ea 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2061,8 +2061,17 @@ FreeBulkInsertState(BulkInsertState bistate) * This causes rows to be frozen, which is an MVCC violation and * requires explicit options chosen by user. * + * HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions", + * which can be backed out afterwards without aborting the whole transaction. + * Other sessions can wait for the speculative insertion to be confirmed, + * turning it into a regular tuple, or aborted, as if it never existed. + * Speculatively inserted tuples behave as "value locks" of short duration, + * used to implement INSERT .. ON CONFLICT. + * * Note that these options will be applied when inserting into the heap's * TOAST table, too, if the tuple requires any out-of-line data. + * FIXME: Do we mark TOAST tuples as speculative too? What about confirming + * or aborting them? * * The BulkInsertState object (if any; bistate can be NULL for default * behavior) is also just passed through to RelationGetBufferForTuple. @@ -2115,7 +2124,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, + (options & HEAP_INSERT_SPECULATIVE) != 0); if (PageIsAllVisible(BufferGetPage(buffer))) { @@ -2169,7 +2179,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED; + if (options & HEAP_INSERT_SPECULATIVE) + xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer)); /* @@ -2179,7 +2193,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ if (RelationIsLogicallyLogged(relation)) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; } @@ -2224,6 +2238,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ CacheInvalidateHeapTuple(relation, heaptup, NULL); + /* Note: speculative insertions are counted too, even if aborted later */ pgstat_count_heap_insert(relation, 1); /* @@ -2395,7 +2410,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * RelationGetBufferForTuple has ensured that the first tuple fits. * Put that on the page, and then as many other tuples as fit. */ - RelationPutHeapTuple(relation, buffer, heaptuples[ndone]); + RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false); for (nthispage = 1; ndone + nthispage < ntuples; nthispage++) { HeapTuple heaptup = heaptuples[ndone + nthispage]; @@ -2403,7 +2418,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace) break; - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, false); /* * We don't use heap_multi_insert for catalog tuples yet, but @@ -2463,7 +2478,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec->flags = all_visible_cleared ? XLH_INSERT_ALL_VISIBLE_CLEARED : 0; xlrec->ntuples = nthispage; /* @@ -2498,7 +2513,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Assert((scratchptr - scratch) < BLCKSZ); if (need_tuple_data) - xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; /* * Signal that this is the last xl_heap_multi_insert record @@ -2506,7 +2521,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * decoding so it knows when to cleanup temporary data. */ if (ndone + nthispage == ntuples) - xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT; + xlrec->flags |= XLH_INSERT_LAST_IN_MULTI; if (init) { @@ -2914,7 +2929,12 @@ l1: MarkBufferDirty(buffer); - /* XLOG stuff */ + /* + * XLOG stuff + * + * NB: heap_abort_speculative() uses the same xlog record and replay + * routines. + */ if (RelationNeedsWAL(relation)) { xl_heap_delete xlrec; @@ -2924,7 +2944,7 @@ l1: if (RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, &tp); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = all_visible_cleared ? XLH_DELETE_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); @@ -2933,9 +2953,9 @@ l1: if (old_key_tuple != NULL) { if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } XLogBeginInsert(); @@ -3742,7 +3762,7 @@ l2: HeapTupleClearHeapOnly(newtup); } - RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */ + RelationPutHeapTuple(relation, newbuf, heaptup, false); /* insert new tuple */ if (!already_marked) { @@ -4133,14 +4153,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * * Function result may be: * HeapTupleMayBeUpdated: lock was successfully acquired + * HeapTupleInvisible: lock failed because tuple was never visible to us * HeapTupleSelfUpdated: lock failed because tuple updated by self * HeapTupleUpdated: lock failed because tuple updated by other xact * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip * - * In the failure cases, the routine fills *hufd with the tuple's t_ctid, - * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax - * (the last only for HeapTupleSelfUpdated, since we - * cannot obtain cmax from a combocid generated by another transaction). + * In the failure cases other than HeapTupleInvisible, the routine fills + * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, + * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated, + * since we cannot obtain cmax from a combocid generated by another + * transaction). * See comments for struct HeapUpdateFailureData for additional info. * * See README.tuplock for a thorough explanation of this mechanism. @@ -4179,8 +4201,15 @@ l3: if (result == HeapTupleInvisible) { - UnlockReleaseBuffer(*buffer); - elog(ERROR, "attempted to lock invisible tuple"); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + /* + * This is possible, but only when locking a tuple for ON CONFLICT + * UPDATE. We return this value here rather than throwing an error in + * order to give that case the opportunity to throw a more specific + * error. + */ + return HeapTupleInvisible; } else if (result == HeapTupleBeingUpdated) { @@ -5417,6 +5446,234 @@ heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid, return HeapTupleMayBeUpdated; } +/* + * heap_finish_speculative - mark speculative insertion as successful + * + * To successfully finish a speculative insertion we have to clear speculative + * token from tuple. To do so the t_ctid field, which will contain a + * speculative token value, is modified in place to point to the tuple itself, + * which is characteristic of a newly inserted ordinary tuple. + * + * NB: It is not ok to commit without either finishing or aborting a + * speculative insertion. We could treat speculative tuples of committed + * transactions implicitly as completed, but then we would have to be prepared + * to deal with speculative tokens on committed tuples. That wouldn't be + * difficult - no-one looks at the ctid field of a tuple with invalid xmax - + * but clearing the token at completion isn't very expensive either. + * An explicit confirmation WAL record also makes logical decoding simpler. + */ +void +heap_finish_speculative(Relation relation, HeapTuple tuple) +{ + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + offnum = ItemPointerGetOffsetNumber(&(tuple->t_self)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(ERROR, "heap_confirm_insert: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* SpecTokenOffsetNumber should be distinguishable from any real offset */ + StaticAssertStmt(MaxOffsetNumber < SpecTokenOffsetNumber, + "invalid speculative token constant"); + + /* NO EREPORT(ERROR) from here till changes are logged */ + START_CRIT_SECTION(); + + Assert(HeapTupleHeaderIsSpeculative(tuple->t_data)); + + MarkBufferDirty(buffer); + + /* + * Replace the speculative insertion token with a real t_ctid, + * pointing to itself like it does on regular tuples. + */ + htup->t_ctid = tuple->t_self; + + /* XLOG stuff */ + if (RelationNeedsWAL(relation)) + { + xl_heap_confirm xlrec; + XLogRecPtr recptr; + + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); + + XLogBeginInsert(); + + /* We want the same filtering on this as on a plain insert */ + XLogIncludeOrigin(); + + XLogRegisterData((char *) &xlrec, SizeOfHeapConfirm); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CONFIRM); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + +/* + * heap_abort_speculative - kill a speculatively inserted tuple + * + * Marks a tuple that was speculatively inserted in the same command as dead, + * by setting its xmin as invalid. That makes it immediately appear as dead + * to all transactions, including our own. In particular, it makes + * HeapTupleSatisfiesDirty() regard the tuple as dead, so that another backend + * inserting a duplicate key value won't unnecessarily wait for our whole + * transaction to finish (it'll just wait for our speculative insertion to + * finish). + * + * Killing the tuple prevents "unprincipled deadlocks", which are deadlocks + * that arise due to a mutual dependency that is not user visible. By + * definition, unprincipled deadlocks cannot be prevented by the user + * reordering lock acquisition in client code, because the implementation level + * lock acquisitions are not under the user's direct control. If speculative + * inserters did not take this precaution, then under high concurrency they + * could deadlock with each other, which would not be acceptable. + * + * This is somewhat redundant with heap_delete, but we prefer to have a + * dedicated routine with stripped down requirements. + * + * This routine does not affect logical decoding as it only looks at + * confirmation records. + */ +void +heap_abort_speculative(Relation relation, HeapTuple tuple) +{ + TransactionId xid = GetCurrentTransactionId(); + ItemPointer tid = &(tuple->t_self); + ItemId lp; + HeapTupleData tp; + Page page; + BlockNumber block; + Buffer buffer; + + Assert(ItemPointerIsValid(tid)); + + block = ItemPointerGetBlockNumber(tid); + buffer = ReadBuffer(relation, block); + page = BufferGetPage(buffer); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Page can't be all visible, we just inserted into it, and are still + * running. + */ + Assert(!PageIsAllVisible(page)); + + lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); + Assert(ItemIdIsNormal(lp)); + + tp.t_tableOid = RelationGetRelid(relation); + tp.t_data = (HeapTupleHeader) PageGetItem(page, lp); + tp.t_len = ItemIdGetLength(lp); + tp.t_self = *tid; + + /* + * Sanity check that the tuple really is a speculatively inserted tuple, + * inserted by us. + */ + if (tp.t_data->t_choice.t_heap.t_xmin != xid) + elog(ERROR, "attempted to kill a tuple inserted by another transaction"); + if (!HeapTupleHeaderIsSpeculative(tp.t_data)) + elog(ERROR, "attempted to kill a non-speculative tuple"); + Assert(!HeapTupleHeaderIsHeapOnly(tp.t_data)); + + /* + * No need to check for serializable conflicts here. There is never a + * need for a combocid, either. No need to extract replica identity, or + * do anything special with infomask bits. + */ + + START_CRIT_SECTION(); + + /* + * The tuple will become DEAD immediately. Flag that this page + * immediately is a candidate for pruning by setting xmin to + * RecentGlobalXmin. That's not pretty, but it doesn't seem worth + * inventing a nicer API for this. + */ + Assert(TransactionIdIsValid(RecentGlobalXmin)); + PageSetPrunable(page, RecentGlobalXmin); + + /* store transaction information of xact deleting the tuple */ + tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; + + /* + * Set the tuple header xmin to InvalidTransactionId. This makes the + * tuple immediately invisible everyone. (In particular, to any + * transactions waiting on the speculative token, woken up later.) + */ + HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId); + + /* Clear the speculative insertion token too */ + tp.t_data->t_ctid = tp.t_self; + + MarkBufferDirty(buffer); + + /* + * XLOG stuff + * + * The WAL records generated here match heap_delete(). The same recovery + * routines are used. + */ + if (RelationNeedsWAL(relation)) + { + xl_heap_delete xlrec; + XLogRecPtr recptr; + + xlrec.flags = XLH_DELETE_IS_SUPER; + xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, + tp.t_data->t_infomask2); + xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); + xlrec.xmax = xid; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + /* No replica identity & replication origin logged */ + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + if (HeapTupleHasExternal(&tp)) + toast_delete(relation, &tp); + + /* + * Never need to mark tuple for invalidation, since catalogs don't support + * speculative insertion + */ + + /* Now we can release the buffer */ + ReleaseBuffer(buffer); + + /* count deletion, as we counted the insertion too */ + pgstat_count_heap_delete(relation); +} /* * heap_inplace_update - update a tuple "in place" (ie, overwrite it) @@ -6732,22 +6989,22 @@ log_heap_update(Relation reln, Buffer oldbuf, /* Prepare main WAL data chain */ xlrec.flags = 0; if (all_visible_cleared) - xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED; if (new_all_visible_cleared) - xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED; if (prefixlen > 0) - xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD; if (suffixlen > 0) - xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD; if (need_tuple_data) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE; if (old_key_tuple) { if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY; } } @@ -7378,7 +7635,7 @@ heap_xlog_delete(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7406,13 +7663,16 @@ heap_xlog_delete(XLogReaderState *record) HeapTupleHeaderClearHotUpdated(htup); fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); + if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + else + HeapTupleHeaderSetXmin(htup, InvalidTransactionId); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -7453,7 +7713,7 @@ heap_xlog_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7516,7 +7776,7 @@ heap_xlog_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7573,7 +7833,7 @@ heap_xlog_multi_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7655,7 +7915,7 @@ heap_xlog_multi_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7728,7 +7988,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7783,7 +8043,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); PageSetLSN(page, lsn); @@ -7812,7 +8072,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7840,13 +8100,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&prefixlen, recdata, sizeof(uint16)); recdata += sizeof(uint16); } - if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&suffixlen, recdata, sizeof(uint16)); @@ -7918,7 +8178,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7952,6 +8212,42 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) } static void +heap_xlog_confirm(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_confirm_redo: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* + * Confirm tuple as actually inserted + */ + ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void heap_xlog_lock(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; @@ -8101,6 +8397,9 @@ heap_redo(XLogReaderState *record) case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; + case XLOG_HEAP_CONFIRM: + heap_xlog_confirm(record); + break; case XLOG_HEAP_LOCK: heap_xlog_lock(record); break; |