diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2011-02-07 23:46:51 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2011-02-08 00:09:08 +0200 |
commit | dafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21 (patch) | |
tree | 93271101a38832fce7a6864e96fc9de65b0acff4 /src/backend/access | |
parent | c18f51da17d8cf01d62218e0404e18ba246bde54 (diff) | |
download | postgresql-dafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21.tar.gz postgresql-dafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21.zip |
Implement genuine serializable isolation level.
Until now, our Serializable mode has in fact been what's called Snapshot
Isolation, which allows some anomalies that could not occur in any
serialized ordering of the transactions. This patch fixes that using a
method called Serializable Snapshot Isolation, based on research papers by
Michael J. Cahill (see README-SSI for full references). In Serializable
Snapshot Isolation, transactions run like they do in Snapshot Isolation,
but a predicate lock manager observes the reads and writes performed and
aborts transactions if it detects that an anomaly might occur. This method
produces some false positives, ie. it sometimes aborts transactions even
though there is no anomaly.
To track reads we implement predicate locking, see storage/lmgr/predicate.c.
Whenever a tuple is read, a predicate lock is acquired on the tuple. Shared
memory is finite, so when a transaction takes many tuple-level locks on a
page, the locks are promoted to a single page-level lock, and further to a
single relation level lock if necessary. To lock key values with no matching
tuple, a sequential scan always takes a relation-level lock, and an index
scan acquires a page-level lock that covers the search key, whether or not
there are any matching keys at the moment.
A predicate lock doesn't conflict with any regular locks or with another
predicate locks in the normal sense. They're only used by the predicate lock
manager to detect the danger of anomalies. Only serializable transactions
participate in predicate locking, so there should be no extra overhead for
for other transactions.
Predicate locks can't be released at commit, but must be remembered until
all the transactions that overlapped with it have completed. That means that
we need to remember an unbounded amount of predicate locks, so we apply a
lossy but conservative method of tracking locks for committed transactions.
If we run short of shared memory, we overflow to a new "pg_serial" SLRU
pool.
We don't currently allow Serializable transactions in Hot Standby mode.
That would be hard, because even read-only transactions can cause anomalies
that wouldn't otherwise occur.
Serializable isolation mode now means the new fully serializable level.
Repeatable Read gives you the old Snapshot Isolation level that we have
always had.
Kevin Grittner and Dan Ports, reviewed by Jeff Davis, Heikki Linnakangas and
Anssi Kääriäinen
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/heap/heapam.c | 88 | ||||
-rw-r--r-- | src/backend/access/index/indexam.c | 25 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 12 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtpage.c | 7 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 2 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtsearch.c | 13 | ||||
-rw-r--r-- | src/backend/access/transam/twophase.c | 3 | ||||
-rw-r--r-- | src/backend/access/transam/twophase_rmgr.c | 5 | ||||
-rw-r--r-- | src/backend/access/transam/varsup.c | 5 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 21 |
10 files changed, 165 insertions, 16 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 25d9fdea3a0..7dcc6015de9 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -57,6 +57,7 @@ #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" #include "storage/standby.h" @@ -261,20 +262,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) { if (ItemIdIsNormal(lpp)) { + HeapTupleData loctup; bool valid; + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + loctup.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(loctup.t_self), page, lineoff); + if (all_visible) valid = true; else - { - HeapTupleData loctup; + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - loctup.t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(loctup.t_self), page, lineoff); + CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer); - valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - } if (valid) scan->rs_vistuples[ntup++] = lineoff; } @@ -468,12 +469,16 @@ heapgettup(HeapScanDesc scan, snapshot, scan->rs_cbuf); + CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf); + if (valid && key != NULL) HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), nkeys, key, valid); if (valid) { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); return; } @@ -741,12 +746,16 @@ heapgettup_pagemode(HeapScanDesc scan, nkeys, key, valid); if (valid) { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); scan->rs_cindex = lineindex; return; } } else { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); scan->rs_cindex = lineindex; return; } @@ -1213,6 +1222,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_strategy = NULL; /* set in initscan */ scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; + scan->rs_relpredicatelocked = false; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1459,8 +1469,13 @@ heap_fetch(Relation relation, */ valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); + if (valid) + PredicateLockTuple(relation, tuple); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + CheckForSerializableConflictOut(valid, relation, tuple, buffer); + if (valid) { /* @@ -1506,13 +1521,15 @@ heap_fetch(Relation relation, * heap_fetch, we do not report any pgstats count; caller may do so if wanted. */ bool -heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, - bool *all_dead) +heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, + Snapshot snapshot, bool *all_dead) { Page dp = (Page) BufferGetPage(buffer); TransactionId prev_xmax = InvalidTransactionId; OffsetNumber offnum; bool at_chain_start; + bool valid; + bool match_found; if (all_dead) *all_dead = true; @@ -1522,6 +1539,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer)); offnum = ItemPointerGetOffsetNumber(tid); at_chain_start = true; + match_found = false; /* Scan through possible multiple members of HOT-chain */ for (;;) @@ -1552,6 +1570,8 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp); heapTuple.t_len = ItemIdGetLength(lp); + heapTuple.t_tableOid = relation->rd_id; + heapTuple.t_self = *tid; /* * Shouldn't see a HEAP_ONLY tuple at chain start. @@ -1569,12 +1589,18 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, break; /* If it's visible per the snapshot, we must return it */ - if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer)) + valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer); + CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer); + if (valid) { ItemPointerSetOffsetNumber(tid, offnum); + PredicateLockTuple(relation, &heapTuple); if (all_dead) *all_dead = false; - return true; + if (IsolationIsSerializable()) + match_found = true; + else + return true; } /* @@ -1603,7 +1629,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, break; /* end of chain */ } - return false; + return match_found; } /* @@ -1622,7 +1648,7 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(buffer, BUFFER_LOCK_SHARE); - result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead); + result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; @@ -1729,6 +1755,7 @@ heap_get_latest_tid(Relation relation, * result candidate. */ valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer); + CheckForSerializableConflictOut(valid, relation, &tp, buffer); if (valid) *tid = ctid; @@ -1893,6 +1920,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate); + /* + * We're about to do the actual insert -- check for conflict at the + * relation or buffer level first, to avoid possibly having to roll + * back work we've just done. + */ + CheckForSerializableConflictIn(relation, NULL, buffer); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -2193,6 +2227,12 @@ l1: return result; } + /* + * We're about to do the actual delete -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + */ + CheckForSerializableConflictIn(relation, &tp, buffer); + /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); @@ -2546,6 +2586,12 @@ l2: return result; } + /* + * We're about to do the actual update -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + */ + CheckForSerializableConflictIn(relation, &oldtup, buffer); + /* Fill in OID and transaction status data for newtup */ if (relation->rd_rel->relhasoids) { @@ -2691,6 +2737,16 @@ l2: } /* + * We're about to create the new tuple -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + * + * NOTE: For a tuple insert, we only need to check for table locks, since + * predicate locking at the index level will cover ranges for anything + * except a table scan. Therefore, only provide the relation. + */ + CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); + + /* * At this point newbuf and buffer are both pinned and locked, and newbuf * has enough space for the new tuple. If they are the same buffer, only * one pin is held. @@ -2799,6 +2855,12 @@ l2: END_CRIT_SECTION(); + /* + * Any existing SIREAD locks on the old tuple must be linked to the new + * tuple for conflict detection purposes. + */ + PredicateLockTupleRowVersionLink(relation, &oldtup, newtup); + if (newbuf != buffer) LockBuffer(newbuf, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 32af32a2067..6e0db795176 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -64,9 +64,11 @@ #include "access/relscan.h" #include "access/transam.h" +#include "access/xact.h" #include "pgstat.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/tqual.h" @@ -192,6 +194,11 @@ index_insert(Relation indexRelation, RELATION_CHECKS; GET_REL_PROCEDURE(aminsert); + if (!(indexRelation->rd_am->ampredlocks)) + CheckForSerializableConflictIn(indexRelation, + (HeapTuple) NULL, + InvalidBuffer); + /* * have the am's insert proc do all the work. */ @@ -266,6 +273,9 @@ index_beginscan_internal(Relation indexRelation, RELATION_CHECKS; GET_REL_PROCEDURE(ambeginscan); + if (!(indexRelation->rd_am->ampredlocks)) + PredicateLockRelation(indexRelation); + /* * We hold a reference count to the relcache entry throughout the scan. */ @@ -523,6 +533,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) { ItemId lp; ItemPointer ctid; + bool valid; /* check for bogus TID */ if (offnum < FirstOffsetNumber || @@ -577,8 +588,13 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) break; /* If it's visible per the snapshot, we must return it */ - if (HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot, - scan->xs_cbuf)) + valid = HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot, + scan->xs_cbuf); + + CheckForSerializableConflictOut(valid, scan->heapRelation, + heapTuple, scan->xs_cbuf); + + if (valid) { /* * If the snapshot is MVCC, we know that it could accept at @@ -586,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) * any more members. Otherwise, check for continuation of the * HOT-chain, and set state for next time. */ - if (IsMVCCSnapshot(scan->xs_snapshot)) + if (IsMVCCSnapshot(scan->xs_snapshot) + && !IsolationIsSerializable()) scan->xs_next_hot = InvalidOffsetNumber; else if (HeapTupleIsHotUpdated(heapTuple)) { @@ -598,6 +615,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction) else scan->xs_next_hot = InvalidOffsetNumber; + PredicateLockTuple(scan->heapRelation, heapTuple); + LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); pgstat_count_heap_fetch(scan->indexRelation); diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 91b72b8f91d..0dd745f19a4 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -21,6 +21,7 @@ #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "utils/inval.h" #include "utils/tqual.h" @@ -174,6 +175,14 @@ top: if (checkUnique != UNIQUE_CHECK_EXISTING) { + /* + * The only conflict predicate locking cares about for indexes is when + * an index tuple insert conflicts with an existing lock. Since the + * actual location of the insert is hard to predict because of the + * random search used to prevent O(N^2) performance when there are many + * duplicate entries, we can just use the "first valid" page. + */ + CheckForSerializableConflictIn(rel, NULL, buf); /* do the insertion */ _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel); _bt_insertonpg(rel, buf, stack, itup, offset, false); @@ -696,6 +705,9 @@ _bt_insertonpg(Relation rel, /* split the buffer into left and right halves */ rbuf = _bt_split(rel, buf, firstright, newitemoff, itemsz, itup, newitemonleft); + PredicateLockPageSplit(rel, + BufferGetBlockNumber(buf), + BufferGetBlockNumber(rbuf)); /*---------- * By here, diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index db86ec9a1ad..27964455f7c 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -29,6 +29,7 @@ #include "storage/freespace.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "utils/inval.h" #include "utils/snapmgr.h" @@ -1184,6 +1185,12 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack) RelationGetRelationName(rel)); /* + * Any insert which would have gone on the target block will now go to the + * right sibling block. + */ + PredicateLockPageCombine(rel, target, rightsib); + + /* * Next find and write-lock the current parent of the target page. This is * essentially the same as the corresponding step of splitting. */ diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index d66bdd47e49..558ace1562b 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -29,6 +29,7 @@ #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "storage/smgr.h" #include "utils/memutils.h" @@ -822,6 +823,7 @@ restart: if (_bt_page_recyclable(page)) { /* Okay to recycle this page */ + Assert(!PageIsPredicateLocked(rel, blkno)); RecordFreeIndexPage(rel, blkno); vstate->totFreePages++; stats->pages_deleted++; diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 42d956c6eac..cf74f7776ce 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -21,6 +21,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "storage/bufmgr.h" +#include "storage/predicate.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -63,7 +64,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, /* If index is empty and access = BT_READ, no root page is created. */ if (!BufferIsValid(*bufP)) + { + PredicateLockRelation(rel); /* Nothing finer to lock exists. */ return (BTStack) NULL; + } /* Loop iterates once per level descended in the tree */ for (;;) @@ -88,7 +92,11 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, page = BufferGetPage(*bufP); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (P_ISLEAF(opaque)) + { + if (access == BT_READ) + PredicateLockPage(rel, BufferGetBlockNumber(*bufP)); break; + } /* * Find the appropriate item on the internal page, and get the child @@ -1142,6 +1150,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) { + PredicateLockPage(rel, blkno); /* see if there are any matches on this page */ /* note that this will clear moreRight if we can stop */ if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) @@ -1189,6 +1198,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) { + PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf)); /* see if there are any matches on this page */ /* note that this will clear moreLeft if we can stop */ if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) @@ -1352,6 +1362,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost) if (!BufferIsValid(buf)) { /* empty index... */ + PredicateLockRelation(rel); /* Nothing finer to lock exists. */ return InvalidBuffer; } @@ -1431,10 +1442,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) if (!BufferIsValid(buf)) { /* empty index... */ + PredicateLockRelation(rel); /* Nothing finer to lock exists. */ so->currPos.buf = InvalidBuffer; return false; } + PredicateLockPage(rel, BufferGetBlockNumber(buf)); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); Assert(P_ISLEAF(opaque)); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 4fee9c3244b..287ad266980 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -57,6 +57,7 @@ #include "pgstat.h" #include "replication/walsender.h" #include "storage/fd.h" +#include "storage/predicate.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" @@ -1357,6 +1358,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) else ProcessRecords(bufptr, xid, twophase_postabort_callbacks); + PredicateLockTwoPhaseFinish(xid, isCommit); + /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit); diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c index 02de1e85269..47c15af241d 100644 --- a/src/backend/access/transam/twophase_rmgr.c +++ b/src/backend/access/transam/twophase_rmgr.c @@ -18,12 +18,14 @@ #include "access/twophase_rmgr.h" #include "pgstat.h" #include "storage/lock.h" +#include "storage/predicate.h" const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_recover, /* Lock */ + predicatelock_twophase_recover, /* PredicateLock */ NULL, /* pgstat */ multixact_twophase_recover /* MultiXact */ }; @@ -32,6 +34,7 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_postcommit, /* Lock */ + NULL, /* PredicateLock */ pgstat_twophase_postcommit, /* pgstat */ multixact_twophase_postcommit /* MultiXact */ }; @@ -40,6 +43,7 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_postabort, /* Lock */ + NULL, /* PredicateLock */ pgstat_twophase_postabort, /* pgstat */ multixact_twophase_postabort /* MultiXact */ }; @@ -48,6 +52,7 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1 { NULL, /* END ID */ lock_twophase_standby_recover, /* Lock */ + NULL, /* PredicateLock */ NULL, /* pgstat */ NULL /* MultiXact */ }; diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index a03ec85f8fc..a828b3de48f 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -21,6 +21,7 @@ #include "miscadmin.h" #include "postmaster/autovacuum.h" #include "storage/pmsignal.h" +#include "storage/predicate.h" #include "storage/proc.h" #include "utils/builtins.h" #include "utils/syscache.h" @@ -161,6 +162,10 @@ GetNewTransactionId(bool isSubXact) ExtendCLOG(xid); ExtendSUBTRANS(xid); + /* If it's top level, the predicate locking system also needs to know. */ + if (!isSubXact) + RegisterPredicateLockingXid(xid); + /* * Now advance the nextXid counter. This must not happen until after we * have successfully completed ExtendCLOG() --- if that routine fails, we diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1e31e07ec97..a0170b42e20 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -40,6 +40,7 @@ #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" @@ -63,6 +64,9 @@ int XactIsoLevel; bool DefaultXactReadOnly = false; bool XactReadOnly; +bool DefaultXactDeferrable = false; +bool XactDeferrable; + bool XactSyncCommit = true; int CommitDelay = 0; /* precommit delay in microseconds */ @@ -1640,6 +1644,7 @@ StartTransaction(void) s->startedInRecovery = false; XactReadOnly = DefaultXactReadOnly; } + XactDeferrable = DefaultXactDeferrable; XactIsoLevel = DefaultXactIsoLevel; forceSyncCommit = false; MyXactAccessedTempRel = false; @@ -1787,6 +1792,13 @@ CommitTransaction(void) AtEOXact_LargeObject(true); /* + * Mark serializable transaction as complete for predicate locking + * purposes. This should be done as late as we can put it and still + * allow errors to be raised for failure patterns found at commit. + */ + PreCommit_CheckForSerializationFailure(); + + /* * Insert notifications sent by NOTIFY commands into the queue. This * should be late in the pre-commit sequence to minimize time spent * holding the notify-insertion lock. @@ -1980,6 +1992,13 @@ PrepareTransaction(void) /* close large objects before lower-level cleanup */ AtEOXact_LargeObject(true); + /* + * Mark serializable transaction as complete for predicate locking + * purposes. This should be done as late as we can put it and still + * allow errors to be raised for failure patterns found at commit. + */ + PreCommit_CheckForSerializationFailure(); + /* NOTIFY will be handled below */ /* @@ -2044,6 +2063,7 @@ PrepareTransaction(void) AtPrepare_Notify(); AtPrepare_Locks(); + AtPrepare_PredicateLocks(); AtPrepare_PgStat(); AtPrepare_MultiXact(); AtPrepare_RelationMap(); @@ -2103,6 +2123,7 @@ PrepareTransaction(void) PostPrepare_MultiXact(xid); PostPrepare_Locks(xid); + PostPrepare_PredicateLocks(xid); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, |