diff options
author | Andres Freund <andres@anarazel.de> | 2019-03-23 19:55:57 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2019-03-23 19:55:57 -0700 |
commit | 5db6df0c0117ff2a4e0cd87594d2db408cd5022f (patch) | |
tree | 7b06b96b6f8c1b7e4cdfb602af357f81e21f23b1 /src/backend/access/heap/heapam_handler.c | |
parent | f778e537a0d02d5e05016da3e6f4068914101dee (diff) | |
download | postgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.tar.gz postgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.zip |
tableam: Add tuple_{insert, delete, update, lock} and use.
This adds new, required, table AM callbacks for insert/delete/update
and lock_tuple. To be able to reasonably use those, the EvalPlanQual
mechanism had to be adapted, moving more logic into the AM.
Previously both delete/update/lock call-sites and the EPQ mechanism had
to have awareness of the specific tuple format to be able to fetch the
latest version of a tuple. Obviously that needs to be abstracted
away. To do so, move the logic that find the latest row version into
the AM. lock_tuple has a new flag argument,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION, that forces it to lock the last
version, rather than the current one. It'd have been possible to do
so via a separate callback as well, but finding the last version
usually also necessitates locking the newest version, making it
sensible to combine the two. This replaces the previous use of
EvalPlanQualFetch(). Additionally HeapTupleUpdated, which previously
signaled either a concurrent update or delete, is now split into two,
to avoid callers needing AM specific knowledge to differentiate.
The move of finding the latest row version into tuple_lock means that
encountering a row concurrently moved into another partition will now
raise an error about "tuple to be locked" rather than "tuple to be
updated/deleted" - which is accurate, as that always happens when
locking rows. While possible slightly less helpful for users, it seems
like an acceptable trade-off.
As part of this commit HTSU_Result has been renamed to TM_Result, and
its members been expanded to differentiated between updating and
deleting. HeapUpdateFailureData has been renamed to TM_FailureData.
The interface to speculative insertion is changed so nodeModifyTable.c
does not have to set the speculative token itself anymore. Instead
there's a version of tuple_insert, tuple_insert_speculative, that
performs the speculative insertion (without requiring a flag to signal
that fact), and the speculative insertion is either made permanent
with table_complete_speculative(succeeded = true) or aborted with
succeeded = false).
Note that multi_insert is not yet routed through tableam, nor is
COPY. Changing multi_insert requires changes to copy.c that are large
enough to better be done separately.
Similarly, although simpler, CREATE TABLE AS and CREATE MATERIALIZED
VIEW are also only going to be adjusted in a later commit.
Author: Andres Freund and Haribabu Kommi
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20190313003903.nwvrxi7rw3ywhdel@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
Diffstat (limited to 'src/backend/access/heap/heapam_handler.c')
-rw-r--r-- | src/backend/access/heap/heapam_handler.c | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6a26fcef94c..fcd4acb5aa3 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -21,7 +21,9 @@ #include "access/heapam.h" #include "access/tableam.h" +#include "access/xact.h" #include "storage/bufmgr.h" +#include "storage/lmgr.h" #include "utils/builtins.h" @@ -169,6 +171,321 @@ heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, } +/* ---------------------------------------------------------------------------- + * Functions for manipulations of physical tuples for heap AM. + * ---------------------------------------------------------------------------- + */ + +static void +heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, + int options, BulkInsertState bistate) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + /* Update the tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + /* Perform the insertion, and copy the resulting ItemPointer */ + heap_insert(relation, tuple, cid, options, bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +static void +heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid, + int options, BulkInsertState bistate, uint32 specToken) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + /* Update the tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); + options |= HEAP_INSERT_SPECULATIVE; + + /* Perform the insertion, and copy the resulting ItemPointer */ + heap_insert(relation, tuple, cid, options, bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +static void +heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 spekToken, + bool succeeded) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + /* adjust the tuple's state accordingly */ + if (!succeeded) + heap_finish_speculative(relation, &slot->tts_tid); + else + heap_abort_speculative(relation, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +static TM_Result +heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, bool wait, + TM_FailureData *tmfd, bool changingPart) +{ + /* + * Currently Deleting of index tuples are handled at vacuum, in case if + * the storage itself is cleaning the dead tuples by itself, it is the + * time to call the index tuple deletion also. + */ + return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); +} + + +static TM_Result +heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, + CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, bool *update_indexes) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + TM_Result result; + + /* Update the tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + result = heap_update(relation, otid, tuple, cid, crosscheck, wait, + tmfd, lockmode); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + /* + * Decide whether new index entries are needed for the tuple + * + * Note: heap_update returns the tid (location) of the new tuple in the + * t_self field. + * + * If it's a HOT update, we mustn't insert new index entries. + */ + *update_indexes = result == TM_Ok && !HeapTupleIsHeapOnly(tuple); + + if (shouldFree) + pfree(tuple); + + return result; +} + +static TM_Result +heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd) +{ + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + TM_Result result; + Buffer buffer; + HeapTuple tuple = &bslot->base.tupdata; + bool follow_updates; + + follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; + tmfd->traversed = false; + + Assert(TTS_IS_BUFFERTUPLE(slot)); + +tuple_lock_retry: + tuple->t_self = *tid; + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, &buffer, tmfd); + + if (result == TM_Updated && + (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) + { + ReleaseBuffer(buffer); + /* Should not encounter speculative tuple on recheck */ + Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); + + if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) + { + SnapshotData SnapshotDirty; + TransactionId priorXmax; + + /* it was updated, so look at the updated version */ + *tid = tmfd->ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = tmfd->xmax; + + /* signal that a tuple later in the chain is getting locked */ + tmfd->traversed = true; + + /* + * fetch target tuple + * + * Loop here to deal with updated or busy tuples + */ + InitDirtySnapshot(SnapshotDirty); + for (;;) + { + if (ItemPointerIndicatesMovedPartitions(tid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); + + tuple->t_self = *tid; + if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, NULL)) + { + /* + * If xmin isn't what we're expecting, the slot must have + * been recycled and reused for an unrelated tuple. This + * implies that the latest version of the row was deleted, + * so we need do nothing. (Should be safe to examine xmin + * without getting buffer's content lock. We assume + * reading a TransactionId to be atomic, and Xmin never + * changes in an existing tuple, except to invalid or + * frozen, and neither of those can match priorXmax.) + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* otherwise xmin should not be dirty... */ + if (TransactionIdIsValid(SnapshotDirty.xmin)) + elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); + + /* + * If tuple is being updated by other transaction then we + * have to wait for its commit/abort, or die trying. + */ + if (TransactionIdIsValid(SnapshotDirty.xmax)) + { + ReleaseBuffer(buffer); + switch (wait_policy) + { + case LockWaitBlock: + XactLockTableWait(SnapshotDirty.xmax, + relation, &tuple->t_self, + XLTW_FetchUpdated); + break; + case LockWaitSkip: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + /* skip instead of waiting */ + return TM_WouldBlock; + break; + case LockWaitError: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(relation)))); + break; + } + continue; /* loop back to repeat heap_fetch */ + } + + /* + * If tuple was inserted by our own transaction, we have + * to check cmin against cid: cmin >= current CID means + * our command cannot see the tuple, so we should ignore + * it. Otherwise heap_lock_tuple() will throw an error, + * and so would any later attempt to update or delete the + * tuple. (We need not check cmax because + * HeapTupleSatisfiesDirty will consider a tuple deleted + * by our transaction dead, regardless of cmax.) We just + * checked that priorXmax == xmin, so we can test that + * variable instead of doing HeapTupleHeaderGetXmin again. + */ + if (TransactionIdIsCurrentTransactionId(priorXmax) && + HeapTupleHeaderGetCmin(tuple->t_data) >= cid) + { + ReleaseBuffer(buffer); + return TM_Invisible; + } + + /* + * This is a live tuple, so try to lock it again. + */ + ReleaseBuffer(buffer); + goto tuple_lock_retry; + } + + /* + * If the referenced slot was actually empty, the latest + * version of the row must have been deleted, so we need do + * nothing. + */ + if (tuple->t_data == NULL) + { + return TM_Deleted; + } + + /* + * As above, if xmin isn't what we're expecting, do nothing. + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), + priorXmax)) + { + if (BufferIsValid(buffer)) + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* + * If we get here, the tuple was found but failed + * SnapshotDirty. Assuming the xmin is either a committed xact + * or our own xact (as it certainly should be if we're trying + * to modify the tuple), this must mean that the row was + * updated or deleted by either a committed xact or our own + * xact. If it was deleted, we can ignore it; if it was + * updated then chain up to the next version and repeat the + * whole process. + * + * As above, it should be safe to examine xmax and t_ctid + * without the buffer content lock, because they can't be + * changing. + */ + if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) + { + /* deleted, so forget about it */ + if (BufferIsValid(buffer)) + ReleaseBuffer(buffer); + return TM_Deleted; + } + + /* updated, so look at the updated row */ + *tid = tuple->t_data->t_ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + if (BufferIsValid(buffer)) + ReleaseBuffer(buffer); + /* loop back to fetch next in chain */ + } + } + else + { + /* tuple was deleted, so give up */ + return TM_Deleted; + } + } + + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + /* store in slot, transferring existing pin */ + ExecStorePinnedBufferHeapTuple(tuple, slot, buffer); + + return result; +} + + /* ------------------------------------------------------------------------ * Definition of the heap table access method. * ------------------------------------------------------------------------ @@ -193,6 +510,13 @@ static const TableAmRoutine heapam_methods = { .index_fetch_end = heapam_index_fetch_end, .index_fetch_tuple = heapam_index_fetch_tuple, + .tuple_insert = heapam_tuple_insert, + .tuple_insert_speculative = heapam_tuple_insert_speculative, + .tuple_complete_speculative = heapam_tuple_complete_speculative, + .tuple_delete = heapam_tuple_delete, + .tuple_update = heapam_tuple_update, + .tuple_lock = heapam_tuple_lock, + .tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot, }; |