diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/heap/heapam.c | 377 | ||||
-rw-r--r-- | src/backend/access/heap/hio.c | 27 | ||||
-rw-r--r-- | src/backend/access/heap/tuptoaster.c | 8 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 28 | ||||
-rw-r--r-- | src/backend/access/rmgrdesc/heapdesc.c | 9 |
5 files changed, 397 insertions, 52 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e84c1743f4f..7ea9a77e7ea 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2061,8 +2061,17 @@ FreeBulkInsertState(BulkInsertState bistate) * This causes rows to be frozen, which is an MVCC violation and * requires explicit options chosen by user. * + * HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions", + * which can be backed out afterwards without aborting the whole transaction. + * Other sessions can wait for the speculative insertion to be confirmed, + * turning it into a regular tuple, or aborted, as if it never existed. + * Speculatively inserted tuples behave as "value locks" of short duration, + * used to implement INSERT .. ON CONFLICT. + * * Note that these options will be applied when inserting into the heap's * TOAST table, too, if the tuple requires any out-of-line data. + * FIXME: Do we mark TOAST tuples as speculative too? What about confirming + * or aborting them? * * The BulkInsertState object (if any; bistate can be NULL for default * behavior) is also just passed through to RelationGetBufferForTuple. @@ -2115,7 +2124,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, + (options & HEAP_INSERT_SPECULATIVE) != 0); if (PageIsAllVisible(BufferGetPage(buffer))) { @@ -2169,7 +2179,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED; + if (options & HEAP_INSERT_SPECULATIVE) + xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer)); /* @@ -2179,7 +2193,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ if (RelationIsLogicallyLogged(relation)) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; } @@ -2224,6 +2238,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ CacheInvalidateHeapTuple(relation, heaptup, NULL); + /* Note: speculative insertions are counted too, even if aborted later */ pgstat_count_heap_insert(relation, 1); /* @@ -2395,7 +2410,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * RelationGetBufferForTuple has ensured that the first tuple fits. * Put that on the page, and then as many other tuples as fit. */ - RelationPutHeapTuple(relation, buffer, heaptuples[ndone]); + RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false); for (nthispage = 1; ndone + nthispage < ntuples; nthispage++) { HeapTuple heaptup = heaptuples[ndone + nthispage]; @@ -2403,7 +2418,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace) break; - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, false); /* * We don't use heap_multi_insert for catalog tuples yet, but @@ -2463,7 +2478,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec->flags = all_visible_cleared ? XLH_INSERT_ALL_VISIBLE_CLEARED : 0; xlrec->ntuples = nthispage; /* @@ -2498,7 +2513,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Assert((scratchptr - scratch) < BLCKSZ); if (need_tuple_data) - xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; /* * Signal that this is the last xl_heap_multi_insert record @@ -2506,7 +2521,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * decoding so it knows when to cleanup temporary data. */ if (ndone + nthispage == ntuples) - xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT; + xlrec->flags |= XLH_INSERT_LAST_IN_MULTI; if (init) { @@ -2914,7 +2929,12 @@ l1: MarkBufferDirty(buffer); - /* XLOG stuff */ + /* + * XLOG stuff + * + * NB: heap_abort_speculative() uses the same xlog record and replay + * routines. + */ if (RelationNeedsWAL(relation)) { xl_heap_delete xlrec; @@ -2924,7 +2944,7 @@ l1: if (RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, &tp); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = all_visible_cleared ? XLH_DELETE_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); @@ -2933,9 +2953,9 @@ l1: if (old_key_tuple != NULL) { if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } XLogBeginInsert(); @@ -3742,7 +3762,7 @@ l2: HeapTupleClearHeapOnly(newtup); } - RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */ + RelationPutHeapTuple(relation, newbuf, heaptup, false); /* insert new tuple */ if (!already_marked) { @@ -4133,14 +4153,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * * Function result may be: * HeapTupleMayBeUpdated: lock was successfully acquired + * HeapTupleInvisible: lock failed because tuple was never visible to us * HeapTupleSelfUpdated: lock failed because tuple updated by self * HeapTupleUpdated: lock failed because tuple updated by other xact * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip * - * In the failure cases, the routine fills *hufd with the tuple's t_ctid, - * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax - * (the last only for HeapTupleSelfUpdated, since we - * cannot obtain cmax from a combocid generated by another transaction). + * In the failure cases other than HeapTupleInvisible, the routine fills + * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, + * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated, + * since we cannot obtain cmax from a combocid generated by another + * transaction). * See comments for struct HeapUpdateFailureData for additional info. * * See README.tuplock for a thorough explanation of this mechanism. @@ -4179,8 +4201,15 @@ l3: if (result == HeapTupleInvisible) { - UnlockReleaseBuffer(*buffer); - elog(ERROR, "attempted to lock invisible tuple"); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + /* + * This is possible, but only when locking a tuple for ON CONFLICT + * UPDATE. We return this value here rather than throwing an error in + * order to give that case the opportunity to throw a more specific + * error. + */ + return HeapTupleInvisible; } else if (result == HeapTupleBeingUpdated) { @@ -5417,6 +5446,234 @@ heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid, return HeapTupleMayBeUpdated; } +/* + * heap_finish_speculative - mark speculative insertion as successful + * + * To successfully finish a speculative insertion we have to clear speculative + * token from tuple. To do so the t_ctid field, which will contain a + * speculative token value, is modified in place to point to the tuple itself, + * which is characteristic of a newly inserted ordinary tuple. + * + * NB: It is not ok to commit without either finishing or aborting a + * speculative insertion. We could treat speculative tuples of committed + * transactions implicitly as completed, but then we would have to be prepared + * to deal with speculative tokens on committed tuples. That wouldn't be + * difficult - no-one looks at the ctid field of a tuple with invalid xmax - + * but clearing the token at completion isn't very expensive either. + * An explicit confirmation WAL record also makes logical decoding simpler. + */ +void +heap_finish_speculative(Relation relation, HeapTuple tuple) +{ + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + offnum = ItemPointerGetOffsetNumber(&(tuple->t_self)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(ERROR, "heap_confirm_insert: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* SpecTokenOffsetNumber should be distinguishable from any real offset */ + StaticAssertStmt(MaxOffsetNumber < SpecTokenOffsetNumber, + "invalid speculative token constant"); + + /* NO EREPORT(ERROR) from here till changes are logged */ + START_CRIT_SECTION(); + + Assert(HeapTupleHeaderIsSpeculative(tuple->t_data)); + + MarkBufferDirty(buffer); + + /* + * Replace the speculative insertion token with a real t_ctid, + * pointing to itself like it does on regular tuples. + */ + htup->t_ctid = tuple->t_self; + + /* XLOG stuff */ + if (RelationNeedsWAL(relation)) + { + xl_heap_confirm xlrec; + XLogRecPtr recptr; + + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); + + XLogBeginInsert(); + + /* We want the same filtering on this as on a plain insert */ + XLogIncludeOrigin(); + + XLogRegisterData((char *) &xlrec, SizeOfHeapConfirm); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CONFIRM); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + +/* + * heap_abort_speculative - kill a speculatively inserted tuple + * + * Marks a tuple that was speculatively inserted in the same command as dead, + * by setting its xmin as invalid. That makes it immediately appear as dead + * to all transactions, including our own. In particular, it makes + * HeapTupleSatisfiesDirty() regard the tuple as dead, so that another backend + * inserting a duplicate key value won't unnecessarily wait for our whole + * transaction to finish (it'll just wait for our speculative insertion to + * finish). + * + * Killing the tuple prevents "unprincipled deadlocks", which are deadlocks + * that arise due to a mutual dependency that is not user visible. By + * definition, unprincipled deadlocks cannot be prevented by the user + * reordering lock acquisition in client code, because the implementation level + * lock acquisitions are not under the user's direct control. If speculative + * inserters did not take this precaution, then under high concurrency they + * could deadlock with each other, which would not be acceptable. + * + * This is somewhat redundant with heap_delete, but we prefer to have a + * dedicated routine with stripped down requirements. + * + * This routine does not affect logical decoding as it only looks at + * confirmation records. + */ +void +heap_abort_speculative(Relation relation, HeapTuple tuple) +{ + TransactionId xid = GetCurrentTransactionId(); + ItemPointer tid = &(tuple->t_self); + ItemId lp; + HeapTupleData tp; + Page page; + BlockNumber block; + Buffer buffer; + + Assert(ItemPointerIsValid(tid)); + + block = ItemPointerGetBlockNumber(tid); + buffer = ReadBuffer(relation, block); + page = BufferGetPage(buffer); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Page can't be all visible, we just inserted into it, and are still + * running. + */ + Assert(!PageIsAllVisible(page)); + + lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); + Assert(ItemIdIsNormal(lp)); + + tp.t_tableOid = RelationGetRelid(relation); + tp.t_data = (HeapTupleHeader) PageGetItem(page, lp); + tp.t_len = ItemIdGetLength(lp); + tp.t_self = *tid; + + /* + * Sanity check that the tuple really is a speculatively inserted tuple, + * inserted by us. + */ + if (tp.t_data->t_choice.t_heap.t_xmin != xid) + elog(ERROR, "attempted to kill a tuple inserted by another transaction"); + if (!HeapTupleHeaderIsSpeculative(tp.t_data)) + elog(ERROR, "attempted to kill a non-speculative tuple"); + Assert(!HeapTupleHeaderIsHeapOnly(tp.t_data)); + + /* + * No need to check for serializable conflicts here. There is never a + * need for a combocid, either. No need to extract replica identity, or + * do anything special with infomask bits. + */ + + START_CRIT_SECTION(); + + /* + * The tuple will become DEAD immediately. Flag that this page + * immediately is a candidate for pruning by setting xmin to + * RecentGlobalXmin. That's not pretty, but it doesn't seem worth + * inventing a nicer API for this. + */ + Assert(TransactionIdIsValid(RecentGlobalXmin)); + PageSetPrunable(page, RecentGlobalXmin); + + /* store transaction information of xact deleting the tuple */ + tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; + + /* + * Set the tuple header xmin to InvalidTransactionId. This makes the + * tuple immediately invisible everyone. (In particular, to any + * transactions waiting on the speculative token, woken up later.) + */ + HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId); + + /* Clear the speculative insertion token too */ + tp.t_data->t_ctid = tp.t_self; + + MarkBufferDirty(buffer); + + /* + * XLOG stuff + * + * The WAL records generated here match heap_delete(). The same recovery + * routines are used. + */ + if (RelationNeedsWAL(relation)) + { + xl_heap_delete xlrec; + XLogRecPtr recptr; + + xlrec.flags = XLH_DELETE_IS_SUPER; + xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, + tp.t_data->t_infomask2); + xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); + xlrec.xmax = xid; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + /* No replica identity & replication origin logged */ + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + if (HeapTupleHasExternal(&tp)) + toast_delete(relation, &tp); + + /* + * Never need to mark tuple for invalidation, since catalogs don't support + * speculative insertion + */ + + /* Now we can release the buffer */ + ReleaseBuffer(buffer); + + /* count deletion, as we counted the insertion too */ + pgstat_count_heap_delete(relation); +} /* * heap_inplace_update - update a tuple "in place" (ie, overwrite it) @@ -6732,22 +6989,22 @@ log_heap_update(Relation reln, Buffer oldbuf, /* Prepare main WAL data chain */ xlrec.flags = 0; if (all_visible_cleared) - xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED; if (new_all_visible_cleared) - xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED; if (prefixlen > 0) - xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD; if (suffixlen > 0) - xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD; if (need_tuple_data) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE; if (old_key_tuple) { if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY; } } @@ -7378,7 +7635,7 @@ heap_xlog_delete(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7406,13 +7663,16 @@ heap_xlog_delete(XLogReaderState *record) HeapTupleHeaderClearHotUpdated(htup); fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); + if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + else + HeapTupleHeaderSetXmin(htup, InvalidTransactionId); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -7453,7 +7713,7 @@ heap_xlog_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7516,7 +7776,7 @@ heap_xlog_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7573,7 +7833,7 @@ heap_xlog_multi_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7655,7 +7915,7 @@ heap_xlog_multi_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7728,7 +7988,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7783,7 +8043,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); PageSetLSN(page, lsn); @@ -7812,7 +8072,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7840,13 +8100,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&prefixlen, recdata, sizeof(uint16)); recdata += sizeof(uint16); } - if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&suffixlen, recdata, sizeof(uint16)); @@ -7918,7 +8178,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7952,6 +8212,42 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) } static void +heap_xlog_confirm(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_confirm_redo: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* + * Confirm tuple as actually inserted + */ + ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void heap_xlog_lock(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; @@ -8101,6 +8397,9 @@ heap_redo(XLogReaderState *record) case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; + case XLOG_HEAP_CONFIRM: + heap_xlog_confirm(record); + break; case XLOG_HEAP_LOCK: heap_xlog_lock(record); break; diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 6d091f63af0..a9f0ca35e49 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -35,12 +35,17 @@ void RelationPutHeapTuple(Relation relation, Buffer buffer, - HeapTuple tuple) + HeapTuple tuple, + bool token) { Page pageHeader; OffsetNumber offnum; - ItemId itemId; - Item item; + + /* + * A tuple that's being inserted speculatively should already have its + * token set. + */ + Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data)); /* Add the tuple to the page */ pageHeader = BufferGetPage(buffer); @@ -54,10 +59,18 @@ RelationPutHeapTuple(Relation relation, /* Update tuple->t_self to the actual position where it was stored */ ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum); - /* Insert the correct position into CTID of the stored tuple, too */ - itemId = PageGetItemId(pageHeader, offnum); - item = PageGetItem(pageHeader, itemId); - ((HeapTupleHeader) item)->t_ctid = tuple->t_self; + /* + * Insert the correct position into CTID of the stored tuple, too + * (unless this is a speculative insertion, in which case the token is + * held in CTID field instead) + */ + if (!token) + { + ItemId itemId = PageGetItemId(pageHeader, offnum); + Item item = PageGetItem(pageHeader, itemId); + + ((HeapTupleHeader) item)->t_ctid = tuple->t_self; + } } /* diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c index 8464e8794f6..274155ad0c7 100644 --- a/src/backend/access/heap/tuptoaster.c +++ b/src/backend/access/heap/tuptoaster.c @@ -523,6 +523,14 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, bool toast_delold[MaxHeapAttributeNumber]; /* + * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super + * deletions just normally insert/delete the toast values. It seems + * easiest to deal with that here, instead on, potentially, multiple + * callers. + */ + options &= ~HEAP_INSERT_SPECULATIVE; + + /* * We should only ever be called for tuples of plain relations or * materialized views --- recursing on a toast rel is bad news. */ diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index ef68a7145fc..4a60c5fa2c8 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -51,7 +51,8 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf, OffsetNumber offset, ScanKey itup_scankey, - IndexUniqueCheck checkUnique, bool *is_unique); + IndexUniqueCheck checkUnique, bool *is_unique, + uint32 *speculativeToken); static void _bt_findinsertloc(Relation rel, Buffer *bufptr, OffsetNumber *offsetptr, @@ -159,17 +160,27 @@ top: */ if (checkUnique != UNIQUE_CHECK_NO) { - TransactionId xwait; + TransactionId xwait; + uint32 speculativeToken; offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey, - checkUnique, &is_unique); + checkUnique, &is_unique, &speculativeToken); if (TransactionIdIsValid(xwait)) { /* Have to wait for the other guy ... */ _bt_relbuf(rel, buf); - XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex); + /* + * If it's a speculative insertion, wait for it to finish (ie. + * to go ahead with the insertion, or kill the tuple). Otherwise + * wait for the transaction to finish as usual. + */ + if (speculativeToken) + SpeculativeInsertionWait(xwait, speculativeToken); + else + XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex); + /* start over... */ _bt_freestack(stack); goto top; @@ -213,7 +224,10 @@ top: * * Returns InvalidTransactionId if there is no conflict, else an xact ID * we must wait for to see if it commits a conflicting tuple. If an actual - * conflict is detected, no return --- just ereport(). + * conflict is detected, no return --- just ereport(). If an xact ID is + * returned, and the conflicting tuple still has a speculative insertion in + * progress, *speculativeToken is set to non-zero, and the caller can wait for + * the verdict on the insertion using SpeculativeInsertionWait(). * * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return * InvalidTransactionId because we don't want to wait. In this case we @@ -223,7 +237,8 @@ top: static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf, OffsetNumber offset, ScanKey itup_scankey, - IndexUniqueCheck checkUnique, bool *is_unique) + IndexUniqueCheck checkUnique, bool *is_unique, + uint32 *speculativeToken) { TupleDesc itupdesc = RelationGetDescr(rel); int natts = rel->rd_rel->relnatts; @@ -340,6 +355,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf); /* Tell _bt_doinsert to wait... */ + *speculativeToken = SnapshotDirty.speculativeToken; return xwait; } diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index 4f06a2637ae..f4a1b002cf1 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -75,6 +75,12 @@ heap_desc(StringInfo buf, XLogReaderState *record) xlrec->new_offnum, xlrec->new_xmax); } + else if (info == XLOG_HEAP_CONFIRM) + { + xl_heap_confirm *xlrec = (xl_heap_confirm *) rec; + + appendStringInfo(buf, "off %u", xlrec->offnum); + } else if (info == XLOG_HEAP_LOCK) { xl_heap_lock *xlrec = (xl_heap_lock *) rec; @@ -177,6 +183,9 @@ heap_identify(uint8 info) case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE: id = "HOT_UPDATE+INIT"; break; + case XLOG_HEAP_CONFIRM: + id = "HEAP_CONFIRM"; + break; case XLOG_HEAP_LOCK: id = "LOCK"; break; |