aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2011-02-07 23:46:51 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2011-02-08 00:09:08 +0200
commitdafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21 (patch)
tree93271101a38832fce7a6864e96fc9de65b0acff4 /src/backend/storage
parentc18f51da17d8cf01d62218e0404e18ba246bde54 (diff)
downloadpostgresql-dafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21.tar.gz
postgresql-dafaa3efb75ce1aae2e6dbefaf6f3a889dea0d21.zip
Implement genuine serializable isolation level.
Until now, our Serializable mode has in fact been what's called Snapshot Isolation, which allows some anomalies that could not occur in any serialized ordering of the transactions. This patch fixes that using a method called Serializable Snapshot Isolation, based on research papers by Michael J. Cahill (see README-SSI for full references). In Serializable Snapshot Isolation, transactions run like they do in Snapshot Isolation, but a predicate lock manager observes the reads and writes performed and aborts transactions if it detects that an anomaly might occur. This method produces some false positives, ie. it sometimes aborts transactions even though there is no anomaly. To track reads we implement predicate locking, see storage/lmgr/predicate.c. Whenever a tuple is read, a predicate lock is acquired on the tuple. Shared memory is finite, so when a transaction takes many tuple-level locks on a page, the locks are promoted to a single page-level lock, and further to a single relation level lock if necessary. To lock key values with no matching tuple, a sequential scan always takes a relation-level lock, and an index scan acquires a page-level lock that covers the search key, whether or not there are any matching keys at the moment. A predicate lock doesn't conflict with any regular locks or with another predicate locks in the normal sense. They're only used by the predicate lock manager to detect the danger of anomalies. Only serializable transactions participate in predicate locking, so there should be no extra overhead for for other transactions. Predicate locks can't be released at commit, but must be remembered until all the transactions that overlapped with it have completed. That means that we need to remember an unbounded amount of predicate locks, so we apply a lossy but conservative method of tracking locks for committed transactions. If we run short of shared memory, we overflow to a new "pg_serial" SLRU pool. We don't currently allow Serializable transactions in Hot Standby mode. That would be hard, because even read-only transactions can cause anomalies that wouldn't otherwise occur. Serializable isolation mode now means the new fully serializable level. Repeatable Read gives you the old Snapshot Isolation level that we have always had. Kevin Grittner and Dan Ports, reviewed by Jeff Davis, Heikki Linnakangas and Anssi Kääriäinen
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/ipc/ipci.c7
-rw-r--r--src/backend/storage/ipc/shmem.c2
-rw-r--r--src/backend/storage/ipc/shmqueue.c8
-rw-r--r--src/backend/storage/lmgr/Makefile2
-rw-r--r--src/backend/storage/lmgr/README4
-rw-r--r--src/backend/storage/lmgr/README-SSI537
-rw-r--r--src/backend/storage/lmgr/lwlock.c4
-rw-r--r--src/backend/storage/lmgr/predicate.c4439
8 files changed, 4995 insertions, 8 deletions
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2dbac56c25d..56c0bd8d498 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -32,6 +32,7 @@
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
@@ -105,6 +106,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
sizeof(ShmemIndexEnt)));
size = add_size(size, BufferShmemSize());
size = add_size(size, LockShmemSize());
+ size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
@@ -200,6 +202,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
InitLocks();
/*
+ * Set up predicate lock manager
+ */
+ InitPredicateLocks();
+
+ /*
* Set up process table
*/
if (!IsUnderPostmaster)
diff --git a/src/backend/storage/ipc/shmem.c b/src/backend/storage/ipc/shmem.c
index 62809577991..0811da7b6fe 100644
--- a/src/backend/storage/ipc/shmem.c
+++ b/src/backend/storage/ipc/shmem.c
@@ -198,7 +198,7 @@ ShmemAlloc(Size size)
* Returns TRUE if the pointer points within the shared memory segment.
*/
bool
-ShmemAddrIsValid(void *addr)
+ShmemAddrIsValid(const void *addr)
{
return (addr >= ShmemBase) && (addr < ShmemEnd);
}
diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c
index 0277c3f9f40..1cf69a09c83 100644
--- a/src/backend/storage/ipc/shmqueue.c
+++ b/src/backend/storage/ipc/shmqueue.c
@@ -43,14 +43,12 @@ SHMQueueInit(SHM_QUEUE *queue)
* SHMQueueIsDetached -- TRUE if element is not currently
* in a queue.
*/
-#ifdef NOT_USED
bool
-SHMQueueIsDetached(SHM_QUEUE *queue)
+SHMQueueIsDetached(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
return (queue->prev == NULL);
}
-#endif
/*
* SHMQueueElemInit -- clear an element's links
@@ -146,7 +144,7 @@ SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
*--------------------
*/
Pointer
-SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
+SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
{
SHM_QUEUE *elemPtr = curElem->next;
@@ -162,7 +160,7 @@ SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
bool
-SHMQueueEmpty(SHM_QUEUE *queue)
+SHMQueueEmpty(const SHM_QUEUE *queue)
{
Assert(ShmemAddrIsValid(queue));
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index 9aa9a5c0868..e12a8549f74 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o
+OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o predicate.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 87cae18cb6f..40779d2e359 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -3,7 +3,7 @@ src/backend/storage/lmgr/README
Locking Overview
================
-Postgres uses three types of interprocess locks:
+Postgres uses four types of interprocess locks:
* Spinlocks. These are intended for *very* short-term locks. If a lock
is to be held more than a few dozen instructions, or across any sort of
@@ -34,6 +34,8 @@ supports a variety of lock modes with table-driven semantics, and it has
full deadlock detection and automatic release at transaction end.
Regular locks should be used for all user-driven lock requests.
+* SIReadLock predicate locks. See separate README-SSI file for details.
+
Acquisition of either a spinlock or a lightweight lock causes query
cancel and die() interrupts to be held off until all such locks are
released. No such restriction exists for regular locks, however. Also
diff --git a/src/backend/storage/lmgr/README-SSI b/src/backend/storage/lmgr/README-SSI
new file mode 100644
index 00000000000..a2bb63e3f8b
--- /dev/null
+++ b/src/backend/storage/lmgr/README-SSI
@@ -0,0 +1,537 @@
+src/backend/storage/lmgr/README-SSI
+
+Serializable Snapshot Isolation (SSI) and Predicate Locking
+===========================================================
+
+This is currently sitting in the lmgr directory because about 90% of
+the code is an implementation of predicate locking, which is required
+for SSI, rather than being directly related to SSI itself. When
+another use for predicate locking justifies the effort to tease these
+two things apart, this README file should probably be split.
+
+
+Credits
+-------
+
+This feature was developed by Kevin Grittner and Dan R. K. Ports,
+with review and suggestions from Joe Conway, Heikki Linnakangas, and
+Jeff Davis. It is based on work published in these papers:
+
+ Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
+ Serializable isolation for snapshot databases.
+ In SIGMOD ’08: Proceedings of the 2008 ACM SIGMOD
+ international conference on Management of data,
+ pages 729–738, New York, NY, USA. ACM.
+ http://doi.acm.org/10.1145/1376616.1376690
+
+ Michael James Cahill. 2009.
+ Serializable Isolation for Snapshot Databases.
+ Sydney Digital Theses.
+ University of Sydney, School of Information Technologies.
+ http://hdl.handle.net/2123/5353
+
+
+Overview
+--------
+
+With true serializable transactions, if you can show that your
+transaction will do the right thing if there are no concurrent
+transactions, it will do the right thing in any mix of serializable
+transactions or be rolled back with a serialization failure. This
+feature has been implemented in PostgreSQL using SSI.
+
+
+Serializable and Snapshot Transaction Isolation Levels
+------------------------------------------------------
+
+Serializable transaction isolation is attractive for shops with
+active development by many programmers against a complex schema
+because it guarantees data integrity with very little staff time --
+if a transaction can be shown to always do the right thing when it is
+run alone (before or after any other transaction), it will always do
+the right thing in any mix of concurrent serializable transactions.
+Where conflicts with other transactions would result in an
+inconsistent state within the database, or an inconsistent view of
+the data, a serializable transaction will block or roll back to
+prevent the anomaly. The SQL standard provides a specific SQLSTATE
+for errors generated when a transaction rolls back for this reason,
+so that transactions can be retried automatically.
+
+Before version 9.1 PostgreSQL did not support a full serializable
+isolation level. A request for serializable transaction isolation
+actually provided snapshot isolation. This has well known anomalies
+which can allow data corruption or inconsistent views of the data
+during concurrent transactions; although these anomalies only occur
+when certain patterns of read-write dependencies exist within a set
+of concurrent transactions. Where these patterns exist, the anomalies
+can be prevented by introducing conflicts through explicitly
+programmed locks or otherwise unnecessary writes to the database.
+Snapshot isolation is popular because performance is better than
+serializable isolation and the integrity guarantees which it does
+provide allow anomalies to be avoided or managed with reasonable
+effort in many environments.
+
+
+Serializable Isolation Implementation Strategies
+------------------------------------------------
+
+Techniques for implementing full serializable isolation have been
+published and in use in many database products for decades. The
+primary technique which has been used is Strict 2 Phase Locking
+(S2PL), which operates by blocking writes against data which has been
+read by concurrent transactions and blocking any access (read or
+write) against data which has been written by concurrent
+transactions. A cycle in a graph of blocking indicates a deadlock,
+requiring a rollback. Blocking and deadlocks under S2PL in high
+contention workloads can be debilitating, crippling throughput and
+response time.
+
+A new technique for implementing full serializable isolation in an
+MVCC database appears in the literature beginning in 2008. This
+technique, known as Serializable Snapshot Isolation (SSI) has many of
+the advantages of snapshot isolation. In particular, reads don't
+block anything and writes don't block reads. Essentially, it runs
+snapshot isolation but monitors the read-write conflicts between
+transactions to identify dangerous structures in the transaction
+graph which indicate that a set of concurrent transactions might
+produce an anomaly, and rolls back transactions to ensure that no
+anomalies occur. It will produce some false positives (where a
+transaction is rolled back even though there would not have been an
+anomaly), but will never let an anomaly occur. In the two known
+prototype implementations, performance for many workloads (even with
+the need to restart transactions which are rolled back) is very close
+to snapshot isolation and generally far better than an S2PL
+implementation.
+
+
+Apparent Serial Order of Execution
+----------------------------------
+
+One way to understand when snapshot anomalies can occur, and to
+visualize the difference between the serializable implementations
+described above, is to consider that among transactions executing at
+the serializable transaction isolation level, the results are
+required to be consistent with some serial (one-at-a-time) execution
+of the transactions[1]. How is that order determined in each?
+
+S2PL locks rows used by the transaction in a way which blocks
+conflicting access, so that at the moment of a successful commit it
+is certain that no conflicting access has occurred. Some transactions
+may have blocked, essentially being partially serialized with the
+committing transaction, to allow this. Some transactions may have
+been rolled back, due to cycles in the blocking. But with S2PL,
+transactions can always be viewed as having occurred serially, in the
+order of successful commit.
+
+With snapshot isolation, reads never block writes, nor vice versa, so
+there is much less actual serialization. The order in which
+transactions appear to have executed is determined by something more
+subtle than in S2PL: read/write dependencies. If a transaction
+attempts to read data which is not visible to it because the
+transaction which wrote it (or will later write it) is concurrent
+(one of them was running when the other acquired its snapshot), then
+the reading transaction appears to have executed first, regardless of
+the actual sequence of transaction starts or commits (since it sees a
+database state prior to that in which the other transaction leaves
+it). If one transaction has both rw-dependencies in (meaning that a
+concurrent transaction attempts to read data it writes) and out
+(meaning it attempts to read data a concurrent transaction writes),
+and a couple other conditions are met, there can appear to be a cycle
+in execution order of the transactions. This is when the anomalies
+occur.
+
+SSI works by watching for the conditions mentioned above, and rolling
+back a transaction when needed to prevent any anomaly. The apparent
+order of execution will always be consistent with any actual
+serialization (i.e., a transaction which run by itself can always be
+considered to have run after any transactions committed before it
+started and before any transacton which starts after it commits); but
+among concurrent transactions it will appear that the transaction on
+the read side of a rw-dependency executed before the transaction on
+the write side.
+
+
+PostgreSQL Implementation
+-------------------------
+
+The implementation of serializable transactions for PostgreSQL is
+accomplished through Serializable Snapshot Isolation (SSI), based on
+the work of Cahill, et al. Fundamentally, this allows snapshot
+isolation to run as it has, while monitoring for conditions which
+could create a serialization anomaly.
+
+ * Since this technique is based on Snapshot Isolation (SI), those
+areas in PostgreSQL which don't use SI can't be brought under SSI.
+This includes system tables, temporary tables, sequences, hint bit
+rewrites, etc. SSI can not eliminate existing anomalies in these
+areas.
+
+ * Any transaction which is run at a transaction isolation level
+other than SERIALIZABLE will not be affected by SSI. If you want to
+enforce business rules through SSI, all transactions should be run at
+the SERIALIZABLE transaction isolation level, and that should
+probably be set as the default.
+
+ * If all transactions are run at the SERIALIZABLE transaction
+isolation level, business rules can be enforced in triggers or
+application code without ever having a need to acquire an explicit
+lock or to use SELECT FOR SHARE or SELECT FOR UPDATE.
+
+ * Those who want to continue to use snapshot isolation without
+the additional protections of SSI (and the associated costs of
+enforcing those protections), can use the REPEATABLE READ transaction
+isolation level. This level will retain its legacy behavior, which
+is identical to the old SERIALIZABLE implementation and fully
+consistent with the standard's requirements for the REPEATABLE READ
+transaction isolation level.
+
+ * Performance under this SSI implementation will be significantly
+improved if transactions which don't modify permanent tables are
+declared to be READ ONLY before they begin reading data.
+
+ * Performance under SSI will tend to degrade more rapidly with a
+large number of active database transactions than under less strict
+isolation levels. Limiting the number of active transactions through
+use of a connection pool or similar techniques may be necessary to
+maintain good performance.
+
+ * Any transaction which must be rolled back to prevent
+serialization anomalies will fail with SQLSTATE 40001, which has a
+standard meaning of "serialization failure".
+
+ * This SSI implementation makes an effort to choose the
+transaction to be cancelled such that an immediate retry of the
+transaction will not fail due to conflicts with exactly the same
+transactions. Pursuant to this goal, no transaction is cancelled
+until one of the other transactions in the set of conflicts which
+could generate an anomaly has successfully committed. This is
+conceptually similar to how write conflicts are handled. To fully
+implement this guarantee there needs to be a way to roll back the
+active transaction for another process with a serialization failure
+SQLSTATE, even if it is "idle in transaction".
+
+
+Predicate Locking
+-----------------
+
+Both S2PL and SSI require some form of predicate locking to handle
+situations where reads conflict with later inserts or with later
+updates which move data into the selected range. PostgreSQL didn't
+already have predicate locking, so it needed to be added to support
+full serializable transactions under either strategy. Practical
+implementations of predicate locking generally involve acquiring
+locks against data as it is accessed, using multiple granularities
+(tuple, page, table, etc.) with escalation as needed to keep the lock
+count to a number which can be tracked within RAM structures, and
+this was used in PostgreSQL. Coarse granularities can cause some
+false positive indications of conflict. The number of false positives
+can be influenced by plan choice.
+
+
+Implementation overview
+-----------------------
+
+New RAM structures, inspired by those used to track traditional locks
+in PostgreSQL, but tailored to the needs of SIREAD predicate locking,
+are used. These refer to physical objects actually accessed in the
+course of executing the query, to model the predicates through
+inference. Anyone interested in this subject should review the
+Hellerstein, Stonebraker and Hamilton paper[2], along with the
+locking papers referenced from that and the Cahill papers.
+
+Because the SIREAD locks don't block, traditional locking techniques
+were be modified. Intent locking (locking higher level objects
+before locking lower level objects) doesn't work with non-blocking
+"locks" (which are, in some respects, more like flags than locks).
+
+A configurable amount of shared memory is reserved at postmaster
+start-up to track predicate locks. This size cannot be changed
+without a restart.
+
+ * To prevent resource exhaustion, multiple fine-grained locks may
+be promoted to a single coarser-grained lock as needed.
+
+ * An attempt to acquire an SIREAD lock on a tuple when the same
+transaction already holds an SIREAD lock on the page or the relation
+will be ignored. Likewise, an attempt to lock a page when the
+relation is locked will be ignored, and the acquisition of a coarser
+lock will result in the automatic release of all finer-grained locks
+it covers.
+
+
+Heap locking
+------------
+
+Predicate locks will be acquired for the heap based on the following:
+
+ * For a table scan, the entire relation will be locked.
+
+ * Each tuple read which is visible to the reading transaction
+will be locked, whether or not it meets selection criteria; except
+that there is no need to acquire an SIREAD lock on a tuple when the
+transaction already holds a write lock on any tuple representing the
+row, since a rw-dependency would also create a ww-dependency which
+has more aggressive enforcement and will thus prevent any anomaly.
+
+
+Index AM implementations
+------------------------
+
+Since predicate locks only exist to detect writes which conflict with
+earlier reads, and heap tuple locks are acquired to cover all heap
+tuples actually read, including those read through indexes, the index
+tuples which were actually scanned are not of interest in themselves;
+we only care about their "new neighbors" -- later inserts into the
+index which would have been included in the scan had they existed at
+the time. Conceptually, we want to lock the gaps between and
+surrounding index entries within the scanned range.
+
+Correctness requires that any insert into an index generates a
+rw-conflict with a concurrent serializable transaction if, after that
+insert, re-execution of any index scan of the other transaction would
+access the heap for a row not accessed during the previous execution.
+Note that a non-HOT update which expires an old index entry covered
+by the scan and adds a new entry for the modified row's new tuple
+need not generate a conflict, although an update which "moves" a row
+into the scan must generate a conflict. While correctness allows
+false positives, they should be minimized for performance reasons.
+
+Several optimizations are possible:
+
+ * An index scan which is just finding the right position for an
+index insertion or deletion need not acquire a predicate lock.
+
+ * An index scan which is comparing for equality on the entire key
+for a unique index need not acquire a predicate lock as long as a key
+is found corresponding to a visible tuple which has not been modified
+by another transaction -- there are no "between or around" gaps to
+cover.
+
+ * As long as built-in foreign key enforcement continues to use
+its current "special tricks" to deal with MVCC issues, predicate
+locks should not be needed for scans done by enforcement code.
+
+ * If a search determines that no rows can be found regardless of
+index contents because the search conditions are contradictory (e.g.,
+x = 1 AND x = 2), then no predicate lock is needed.
+
+Other index AM implementation considerations:
+
+ * If a btree search discovers that no root page has yet been
+created, a predicate lock on the index relation is required;
+otherwise btree searches must get to the leaf level to determine
+which tuples match, so predicate locks go there.
+
+ * GiST searches can determine that there are no matches at any
+level of the index, so there must be a predicate lock at each index
+level during a GiST search. An index insert at the leaf level can
+then be trusted to ripple up to all levels and locations where
+conflicting predicate locks may exist.
+
+ * The effects of page splits, overflows, consolidations, and
+removals must be carefully reviewed to ensure that predicate locks
+aren't "lost" during those operations, or kept with pages which could
+get re-used for different parts of the index.
+
+
+Innovations
+-----------
+
+The PostgreSQL implementation of Serializable Snapshot Isolation
+differs from what is described in the cited papers for several
+reasons:
+
+ 1. PostgreSQL didn't have any existing predicate locking. It had
+to be added from scratch.
+
+ 2. The existing in-memory lock structures were not suitable for
+tracking SIREAD locks.
+ * The database products used for the prototype
+implementations for the papers used update-in-place with a rollback
+log for their MVCC implementations, while PostgreSQL leaves the old
+version of a row in place and adds a new tuple to represent the row
+at a new location.
+ * In PostgreSQL, tuple level locks are not held in RAM for
+any length of time; lock information is written to the tuples
+involved in the transactions.
+ * In PostgreSQL, existing lock structures have pointers to
+memory which is related to a connection. SIREAD locks need to persist
+past the end of the originating transaction and even the connection
+which ran it.
+ * PostgreSQL needs to be able to tolerate a large number of
+transactions executing while one long-running transaction stays open
+-- the in-RAM techniques discussed in the papers wouldn't support
+that.
+
+ 3. Unlike the database products used for the prototypes described
+in the papers, PostgreSQL didn't already have a true serializable
+isolation level distinct from snapshot isolation.
+
+ 4. PostgreSQL supports subtransactions -- an issue not mentioned
+in the papers.
+
+ 5. PostgreSQL doesn't assign a transaction number to a database
+transaction until and unless necessary.
+
+ 6. PostgreSQL has pluggable data types with user-definable
+operators, as well as pluggable index types, not all of which are
+based around data types which support ordering.
+
+ 7. Some possible optimizations became apparent during development
+and testing.
+
+Differences from the implementation described in the papers are
+listed below.
+
+ * New structures needed to be created in shared memory to track
+the proper information for serializable transactions and their SIREAD
+locks.
+
+ * Because PostgreSQL does not have the same concept of an "oldest
+transaction ID" for all serializable transactions as assumed in the
+Cahill these, we track the oldest snapshot xmin among serializable
+transactions, and a count of how many active transactions use that
+xmin. When the count hits zero we find the new oldest xmin and run a
+clean-up based on that.
+
+ * Because reads in a subtransaction may cause that subtransaction
+to roll back, thereby affecting what is written by the top level
+transaction, predicate locks must survive a subtransaction rollback.
+As a consequence, all xid usage in SSI, including predicate locking,
+is based on the top level xid. When looking at an xid that comes
+from a tuple's xmin or xmax, for example, we always call
+SubTransGetTopmostTransaction() before doing much else with it.
+
+ * Predicate locking in PostgreSQL will start at the tuple level
+when possible, with automatic conversion of multiple fine-grained
+locks to coarser granularity as need to avoid resource exhaustion.
+The amount of memory used for these structures will be configurable,
+to balance RAM usage against SIREAD lock granularity.
+
+ * A process-local copy of locks held by a process and the coarser
+covering locks with counts, are kept to support granularity promotion
+decisions with low CPU and locking overhead.
+
+ * Conflicts will be identified by looking for predicate locks
+when tuples are written and looking at the MVCC information when
+tuples are read. There is no matching between two RAM-based locks.
+
+ * Because write locks are stored in the heap tuples rather than a
+RAM-based lock table, the optimization described in the Cahill thesis
+which eliminates an SIREAD lock where there is a write lock is
+implemented by the following:
+ 1. When checking a heap write for conflicts against existing
+predicate locks, a tuple lock on the tuple being written is removed.
+ 2. When acquiring a predicate lock on a heap tuple, we
+return quickly without doing anything if it is a tuple written by the
+reading transaction.
+
+ * Rather than using conflictIn and conflictOut pointers which use
+NULL to indicate no conflict and a self-reference to indicate
+multiple conflicts or conflicts with committed transactions, we use a
+list of rw-conflicts. With the more complete information, false
+positives are reduced and we have sufficient data for more aggressive
+clean-up and other optimizations.
+ o We can avoid ever rolling back a transaction until and
+unless there is a pivot where a transaction on the conflict *out*
+side of the pivot committed before either of the other transactions.
+ o We can avoid ever rolling back a transaction when the
+transaction on the conflict *in* side of the pivot is explicitly or
+implicitly READ ONLY unless the transaction on the conflict *out*
+side of the pivot committed before the READ ONLY transaction acquired
+its snapshot. (An implicit READ ONLY transaction is one which
+committed without writing, even though it was not explicitly declared
+to be READ ONLY.)
+ o We can more aggressively clean up conflicts, predicate
+locks, and SSI transaction information.
+
+ * Allow a READ ONLY transaction to "opt out" of SSI if there are
+no READ WRITE transactions which could cause the READ ONLY
+transaction to ever become part of a "dangerous structure" of
+overlapping transaction dependencies.
+
+ * Allow the user to request that a READ ONLY transaction wait
+until the conditions are right for it to start in the "opt out" state
+described above. We add a DEFERRABLE state to transactions, which is
+specified and maintained in a way similar to READ ONLY. It is
+ignored for transactions which are not SERIALIZABLE and READ ONLY.
+
+ * When a transaction must be rolled back, we pick among the
+active transactions such that an immediate retry will not fail again
+on conflicts with the same transactions.
+
+ * We use the PostgreSQL SLRU system to hold summarized
+information about older committed transactions to put an upper bound
+on RAM used. Beyond that limit, information spills to disk.
+Performance can degrade in a pessimal situation, but it should be
+tolerable, and transactions won't need to be cancelled or blocked
+from starting.
+
+
+R&D Issues
+----------
+
+This is intended to be the place to record specific issues which need
+more detailed review or analysis.
+
+ * WAL file replay. While serializable implementations using S2PL
+can guarantee that the write-ahead log contains commits in a sequence
+consistent with some serial execution of serializable transactions,
+SSI cannot make that guarantee. While the WAL replay is no less
+consistent than under snapshot isolation, it is possible that under
+PITR recovery or hot standby a database could reach a readable state
+where some transactions appear before other transactions which would
+have had to precede them to maintain serializable consistency. In
+essence, if we do nothing, WAL replay will be at snapshot isolation
+even for serializable transactions. Is this OK? If not, how do we
+address it?
+
+ * External replication. Look at how this impacts external
+replication solutions, like Postgres-R, Slony, pgpool, HS/SR, etc.
+This is related to the "WAL file replay" issue.
+
+ * Weak-memory-ordering machines. Make sure that shared memory
+access which involves visibility across multiple transactions uses
+locks as needed to avoid problems. On the other hand, ensure that we
+really need volatile where we're using it.
+http://archives.postgresql.org/pgsql-committers/2008-06/msg00228.php
+
+ * UNIQUE btree search for equality on all columns. Since a search
+of a UNIQUE index using equality tests on all columns will lock the
+heap tuple if an entry is found, it appears that there is no need to
+get a predicate lock on the index in that case. A predicate lock is
+still needed for such a search if a matching index entry which points
+to a visible tuple is not found.
+
+ * Planner index probes. To avoid problems with data skew at the
+ends of an index which have historically caused bad plans, the
+planner now probes the end of an index to see what the maximum or
+minimum value is when a query appears to be requesting a range of
+data outside what statistics shows is present. These planner checks
+don't require predicate locking, but there's currently no easy way to
+avoid it. What can we do to avoid predicate locking for such planner
+activity?
+
+ * Minimize touching of shared memory. Should lists in shared
+memory push entries which have just been returned to the front of the
+available list, so they will be popped back off soon and some memory
+might never be touched, or should we keep adding returned items to
+the end of the available list?
+
+
+Footnotes
+---------
+
+[1] http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt
+Search for serial execution to find the relevant section.
+
+[2] http://db.cs.berkeley.edu/papers/fntdb07-architecture.pdf
+Joseph M. Hellerstein, Michael Stonebraker and James Hamilton. 2007.
+Architecture of a Database System. Foundations and Trends(R) in
+Databases Vol. 1, No. 2 (2007) 141–259.
+ Of particular interest:
+ * 6.1 A Note on ACID
+ * 6.2 A Brief Review of Serializability
+ * 6.3 Locking and Latching
+ * 6.3.1 Transaction Isolation Levels
+ * 6.5.3 Next-Key Locking: Physical Surrogates for Logical
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 61621a4d0ae..0fe7ce45cd6 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -28,6 +28,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/ipc.h"
+#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/spin.h"
@@ -178,6 +179,9 @@ NumLWLocks(void)
/* async.c needs one per Async buffer */
numLocks += NUM_ASYNC_BUFFERS;
+ /* predicate.c needs one per old serializable xid buffer */
+ numLocks += NUM_OLDSERXID_BUFFERS;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
new file mode 100644
index 00000000000..5e62ba9e4d9
--- /dev/null
+++ b/src/backend/storage/lmgr/predicate.c
@@ -0,0 +1,4439 @@
+/*-------------------------------------------------------------------------
+ *
+ * predicate.c
+ * POSTGRES predicate locking
+ * to support full serializable transaction isolation
+ *
+ *
+ * The approach taken is to implement Serializable Snapshot Isolation (SSI)
+ * as initially described in this paper:
+ *
+ * Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
+ * Serializable isolation for snapshot databases.
+ * In SIGMOD ’08: Proceedings of the 2008 ACM SIGMOD
+ * international conference on Management of data,
+ * pages 729–738, New York, NY, USA. ACM.
+ * http://doi.acm.org/10.1145/1376616.1376690
+ *
+ * and further elaborated in Cahill's doctoral thesis:
+ *
+ * Michael James Cahill. 2009.
+ * Serializable Isolation for Snapshot Databases.
+ * Sydney Digital Theses.
+ * University of Sydney, School of Information Technologies.
+ * http://hdl.handle.net/2123/5353
+ *
+ *
+ * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
+ * locks, which are so different from normal locks that a distinct set of
+ * structures is required to handle them. They are needed to detect
+ * rw-conflicts when the read happens before the write. (When the write
+ * occurs first, the reading transaction can check for a conflict by
+ * examining the MVCC data.)
+ *
+ * (1) Besides tuples actually read, they must cover ranges of tuples
+ * which would have been read based on the predicate. This will
+ * require modelling the predicates through locks against database
+ * objects such as pages, index ranges, or entire tables.
+ *
+ * (2) They must be kept in RAM for quick access. Because of this, it
+ * isn't possible to always maintain tuple-level granularity -- when
+ * the space allocated to store these approaches exhaustion, a
+ * request for a lock may need to scan for situations where a single
+ * transaction holds many fine-grained locks which can be coalesced
+ * into a single coarser-grained lock.
+ *
+ * (3) They never block anything; they are more like flags than locks
+ * in that regard; although they refer to database objects and are
+ * used to identify rw-conflicts with normal write locks.
+ *
+ * (4) While they are associated with a transaction, they must survive
+ * a successful COMMIT of that transaction, and remain until all
+ * overlapping transactions complete. This even means that they
+ * must survive termination of the transaction's process. If a
+ * top level transaction is rolled back, however, it is immediately
+ * flagged so that it can be ignored, and its SIREAD locks can be
+ * released any time after that.
+ *
+ * (5) The only transactions which create SIREAD locks or check for
+ * conflicts with them are serializable transactions.
+ *
+ * (6) When a write lock for a top level transaction is found to cover
+ * an existing SIREAD lock for the same transaction, the SIREAD lock
+ * can be deleted.
+ *
+ * (7) A write from a serializable transaction must ensure that a xact
+ * record exists for the transaction, with the same lifespan (until
+ * all concurrent transaction complete or the transaction is rolled
+ * back) so that rw-dependencies to that transaction can be
+ * detected.
+ *
+ * We use an optimization for read-only transactions. Under certain
+ * circumstances, a read-only transaction's snapshot can be shown to
+ * never have conflicts with other transactions. This is referred to
+ * as a "safe" snapshot (and one known not to be is "unsafe").
+ * However, it can't be determined whether a snapshot is safe until
+ * all concurrent read/write transactions complete.
+ *
+ * Once a read-only transaction is known to have a safe snapshot, it
+ * can release its predicate locks and exempt itself from further
+ * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
+ * on safe snapshots, waiting as necessary for one to be available.
+ *
+ *
+ * Lightweight locks to manage access to the predicate locking shared
+ * memory objects must be taken in this order, and should be released in
+ * reverse order:
+ *
+ * SerializableFinishedListLock
+ * - Protects the list of transactions which have completed but which
+ * may yet matter because they overlap still-active transactions.
+ *
+ * SerializablePredicateLockListLock
+ * - Protects the linked list of locks held by a transaction. Note
+ * that the locks themselves are also covered by the partition
+ * locks of their respective lock targets; this lock only affects
+ * the linked list connecting the locks related to a transaction.
+ * - All transactions share this single lock (with no partitioning).
+ * - There is never a need for a process other than the one running
+ * an active transaction to walk the list of locks held by that
+ * transaction.
+ * - It is relatively infrequent that another process needs to
+ * modify the list for a transaction, but it does happen for such
+ * things as index page splits for pages with predicate locks and
+ * freeing of predicate locked pages by a vacuum process. When
+ * removing a lock in such cases, the lock itself contains the
+ * pointers needed to remove it from the list. When adding a
+ * lock in such cases, the lock can be added using the anchor in
+ * the transaction structure. Neither requires walking the list.
+ * - Cleaning up the list for a terminated transaction is sometimes
+ * not done on a retail basis, in which case no lock is required.
+ * - Due to the above, a process accessing its active transaction's
+ * list always uses a shared lock, regardless of whether it is
+ * walking or maintaining the list. This improves concurrency
+ * for the common access patterns.
+ * - A process which needs to alter the list of a transaction other
+ * than its own active transaction must acquire an exclusive
+ * lock.
+ *
+ * FirstPredicateLockMgrLock based partition locks
+ * - The same lock protects a target, all locks on that target, and
+ * the linked list of locks on the target..
+ * - When more than one is needed, acquire in ascending order.
+ *
+ * SerializableXactHashLock
+ * - Protects both PredXact and SerializableXidHash.
+ *
+ * PredicateLockNextRowLinkLock
+ * - Protects the priorVersionOfRow and nextVersionOfRow fields of
+ * PREDICATELOCKTARGET when linkage is being created or destroyed.
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/storage/lmgr/predicate.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ *
+ * housekeeping for setting up shared memory predicate lock structures
+ * InitPredicateLocks(void)
+ * PredicateLockShmemSize(void)
+ *
+ * predicate lock reporting
+ * GetPredicateLockStatusData(void)
+ * PageIsPredicateLocked(Relation relation, BlockNumber blkno)
+ *
+ * predicate lock maintenance
+ * RegisterSerializableTransaction(Snapshot snapshot)
+ * RegisterPredicateLockingXid(void)
+ * PredicateLockRelation(Relation relation)
+ * PredicateLockPage(Relation relation, BlockNumber blkno)
+ * PredicateLockTuple(Relation relation, HeapTuple tuple)
+ * PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
+ * BlockNumber newblkno);
+ * PredicateLockTupleRowVersionLink(const Relation relation,
+ * const HeapTuple oldTuple,
+ * const HeapTuple newTuple)
+ * ReleasePredicateLocks(bool isCommit)
+ *
+ * conflict detection (may also trigger rollback)
+ * CheckForSerializableConflictOut(bool visible, Relation relation,
+ * HeapTupleData *tup, Buffer buffer)
+ * CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
+ * Buffer buffer)
+ *
+ * final rollback checking
+ * PreCommit_CheckForSerializationFailure(void)
+ *
+ * two-phase commit support
+ * AtPrepare_PredicateLocks(void);
+ * PostPrepare_PredicateLocks(TransactionId xid);
+ * PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
+ * predicatelock_twophase_recover(TransactionId xid, uint16 info,
+ * void *recdata, uint32 len);
+ */
+
+#include "postgres.h"
+
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/twophase_rmgr.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/predicate.h"
+#include "storage/predicate_internals.h"
+#include "storage/procarray.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+/* Uncomment the next line to test the graceful degradation code. */
+/* #define TEST_OLDSERXID */
+
+/*
+ * Test the most selective fields first, for performance.
+ *
+ * a is covered by b if all of the following hold:
+ * 1) a.database = b.database
+ * 2) a.relation = b.relation
+ * 3) b.offset is invalid (b is page-granularity or higher)
+ * 4) either of the following:
+ * 4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
+ * or 4b) a.offset is invalid and b.page is invalid (a is
+ * page-granularity and b is relation-granularity
+ */
+#define TargetTagIsCoveredBy(covered_target, covering_target) \
+ ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */ \
+ GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
+ && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
+ InvalidOffsetNumber) /* (3) */ \
+ && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
+ InvalidOffsetNumber) /* (4a) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
+ || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
+ InvalidBlockNumber) /* (4b) */ \
+ && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
+ != InvalidBlockNumber))) \
+ && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == /* (1) */ \
+ GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
+
+/*
+ * The predicate locking target and lock shared hash tables are partitioned to
+ * reduce contention. To determine which partition a given target belongs to,
+ * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
+ * apply one of these macros.
+ * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
+ */
+#define PredicateLockHashPartition(hashcode) \
+ ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
+#define PredicateLockHashPartitionLock(hashcode) \
+ ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
+
+#define NPREDICATELOCKTARGETENTS() \
+ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
+
+#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
+
+#define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
+#define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
+#define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
+#define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
+#define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
+#define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
+#define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
+#define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
+#define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
+#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsMarkedForDeath(sxact) (((sxact)->flags & SXACT_FLAG_MARKED_FOR_DEATH) != 0)
+
+/*
+ * When a public interface method is called for a split on an index relation,
+ * this is the test to see if we should do a quick return.
+ */
+#define SkipSplitTracking(relation) \
+ (((relation)->rd_id < FirstBootstrapObjectId) \
+ || RelationUsesLocalBuffers(relation))
+
+/*
+ * When a public interface method is called for serializing a relation within
+ * the current transaction, this is the test to see if we should do a quick
+ * return.
+ */
+#define SkipSerialization(relation) \
+ ((!IsolationIsSerializable()) \
+ || ((MySerializableXact == InvalidSerializableXact)) \
+ || ReleasePredicateLocksIfROSafe() \
+ || SkipSplitTracking(relation))
+
+
+/*
+ * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
+ *
+ * To avoid unnecessary recomputations of the hash code, we try to do this
+ * just once per function, and then pass it around as needed. Aside from
+ * passing the hashcode to hash_search_with_hash_value(), we can extract
+ * the lock partition number from the hashcode.
+ */
+#define PredicateLockTargetTagHashCode(predicatelocktargettag) \
+ (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
+
+/*
+ * Given a predicate lock tag, and the hash for its target,
+ * compute the lock hash.
+ *
+ * To make the hash code also depend on the transaction, we xor the sxid
+ * struct's address into the hash code, left-shifted so that the
+ * partition-number bits don't change. Since this is only a hash, we
+ * don't care if we lose high-order bits of the address; use an
+ * intermediate variable to suppress cast-pointer-to-int warnings.
+ */
+#define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
+ ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
+ << LOG2_NUM_PREDICATELOCK_PARTITIONS)
+
+
+/*
+ * The SLRU buffer area through which we access the old xids.
+ */
+static SlruCtlData OldSerXidSlruCtlData;
+
+#define OldSerXidSlruCtl (&OldSerXidSlruCtlData)
+
+#define OLDSERXID_PAGESIZE BLCKSZ
+#define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo)
+#define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
+#define OLDSERXID_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
+
+#define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
+
+#define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
+ (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
+ ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
+
+#define OldSerXidPage(xid) ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
+#define OldSerXidSegment(page) ((page) / SLRU_PAGES_PER_SEGMENT)
+
+typedef struct OldSerXidControlData
+{
+ int headPage;
+ int tailSegment;
+ TransactionId headXid;
+ TransactionId tailXid;
+ bool warningIssued;
+} OldSerXidControlData;
+
+typedef struct OldSerXidControlData *OldSerXidControl;
+
+static OldSerXidControl oldSerXidControl;
+
+/*
+ * When the oldest committed transaction on the "finished" list is moved to
+ * SLRU, its predicate locks will be moved to this "dummy" transaction,
+ * collapsing duplicate targets. When a duplicate is found, the later
+ * commitSeqNo is used.
+ */
+static SERIALIZABLEXACT *OldCommittedSxact;
+
+
+/* This configuration variable is used to set the predicate lock table size */
+int max_predicate_locks_per_xact; /* set by guc.c */
+
+/*
+ * This provides a list of objects in order to track transactions
+ * participating in predicate locking. Entries in the list are fixed size,
+ * and reside in shared memory. The memory address of an entry must remain
+ * fixed during its lifetime. The list will be protected from concurrent
+ * update externally; no provision is made in this code to manage that. The
+ * number of entries in the list, and the size allowed for each entry is
+ * fixed upon creation.
+ */
+static PredXactList PredXact;
+
+/*
+ * This provides a pool of RWConflict data elements to use in conflict lists
+ * between transactions.
+ */
+static RWConflictPoolHeader RWConflictPool;
+
+/*
+ * The predicate locking hash tables are in shared memory.
+ * Each backend keeps pointers to them.
+ */
+static HTAB *SerializableXidHash;
+static HTAB *PredicateLockTargetHash;
+static HTAB *PredicateLockHash;
+static SHM_QUEUE *FinishedSerializableTransactions;
+
+/*
+ * Tag for a reserved entry in PredicateLockTargetHash; used to ensure
+ * there's an element available for scratch space if we need it,
+ * e.g. in PredicateLockPageSplit. This is an otherwise-invalid tag.
+ */
+static const PREDICATELOCKTARGETTAG ReservedTargetTag = {0, 0, 0, 0, 0};
+
+/*
+ * The local hash table used to determine when to combine multiple fine-
+ * grained locks into a single courser-grained lock.
+ */
+static HTAB *LocalPredicateLockHash = NULL;
+
+/*
+ * Keep a pointer to the currently-running serializable transaction (if any)
+ * for quick reference.
+ * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
+ */
+static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
+
+/* local functions */
+
+static SERIALIZABLEXACT *CreatePredXact(void);
+static void ReleasePredXact(SERIALIZABLEXACT *sxact);
+static SERIALIZABLEXACT *FirstPredXact(void);
+static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
+
+static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
+static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
+static void ReleaseRWConflict(RWConflict conflict);
+static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
+
+static bool OldSerXidPagePrecedesLogically(int p, int q);
+static void OldSerXidInit(void);
+static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
+static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
+static void OldSerXidSetActiveSerXmin(TransactionId xid);
+
+static uint32 predicatelock_hash(const void *key, Size keysize);
+static void SummarizeOldestCommittedSxact(void);
+static Snapshot GetSafeSnapshot(Snapshot snapshot);
+static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot);
+static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
+static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent);
+static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
+static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
+ uint32 targettaghash);
+static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
+static int PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
+static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
+static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
+static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
+ uint32 targettaghash,
+ SERIALIZABLEXACT *sxact);
+static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
+static bool TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
+ const PREDICATELOCKTARGETTAG newtargettag,
+ bool removeOld);
+static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
+static void SetNewSxactGlobalXmin(void);
+static bool ReleasePredicateLocksIfROSafe(void);
+static void ClearOldPredicateLocks(void);
+static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
+ bool summarize);
+static bool XidIsConcurrent(TransactionId xid);
+static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
+static bool CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
+ PREDICATELOCKTARGETTAG *nexttargettag);
+static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
+static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ SERIALIZABLEXACT *writer);
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * These functions are a simple implementation of a list for this specific
+ * type of struct. If there is ever a generalized shared memory list, we
+ * should probably switch to that.
+ */
+static SERIALIZABLEXACT *
+CreatePredXact(void)
+{
+ PredXactListElement ptle;
+
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->availableList,
+ &PredXact->availableList,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
+ return &ptle->sxact;
+}
+
+static void
+ReleasePredXact(SERIALIZABLEXACT *sxact)
+{
+ PredXactListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredXactListElement)
+ (((char *) sxact)
+ - offsetof(PredXactListElementData, sxact)
+ +offsetof(PredXactListElementData, link));
+ SHMQueueDelete(&ptle->link);
+ SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
+}
+
+static SERIALIZABLEXACT *
+FirstPredXact(void)
+{
+ PredXactListElement ptle;
+
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->activeList,
+ &PredXact->activeList,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+}
+
+static SERIALIZABLEXACT *
+NextPredXact(SERIALIZABLEXACT *sxact)
+{
+ PredXactListElement ptle;
+
+ Assert(ShmemAddrIsValid(sxact));
+
+ ptle = (PredXactListElement)
+ (((char *) sxact)
+ - offsetof(PredXactListElementData, sxact)
+ +offsetof(PredXactListElementData, link));
+ ptle = (PredXactListElement)
+ SHMQueueNext(&PredXact->activeList,
+ &ptle->link,
+ offsetof(PredXactListElementData, link));
+ if (!ptle)
+ return NULL;
+
+ return &ptle->sxact;
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * These functions manage primitive access to the RWConflict pool and lists.
+ */
+static bool
+RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
+{
+ RWConflict conflict;
+
+ Assert(reader != writer);
+
+ /* Check the ends of the purported conflict first. */
+ if (SxactIsRolledBack(reader)
+ || SxactIsRolledBack(writer)
+ || SHMQueueEmpty(&reader->outConflicts)
+ || SHMQueueEmpty(&writer->inConflicts))
+ return false;
+
+ /* A conflict is possible; walk the list to find out. */
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &reader->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if (conflict->sxactIn == writer)
+ return true;
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+
+ /* No conflict found. */
+ return false;
+}
+
+static void
+SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+{
+ RWConflict conflict;
+
+ Assert(reader != writer);
+ Assert(!RWConflictExists(reader, writer));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a rw-conflict"),
+ errhint("You might need to run fewer transactions at a time or increase max_connections.")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = reader;
+ conflict->sxactIn = writer;
+ SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
+ SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+}
+
+static void
+SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
+ SERIALIZABLEXACT *activeXact)
+{
+ RWConflict conflict;
+
+ Assert(roXact != activeXact);
+ Assert(SxactIsReadOnly(roXact));
+ Assert(!SxactIsReadOnly(activeXact));
+
+ conflict = (RWConflict)
+ SHMQueueNext(&RWConflictPool->availableList,
+ &RWConflictPool->availableList,
+ offsetof(RWConflictData, outLink));
+ if (!conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"),
+ errhint("You might need to run fewer transactions at a time or increase max_connections.")));
+
+ SHMQueueDelete(&conflict->outLink);
+
+ conflict->sxactOut = activeXact;
+ conflict->sxactIn = roXact;
+ SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
+ &conflict->outLink);
+ SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
+ &conflict->inLink);
+}
+
+static void
+ReleaseRWConflict(RWConflict conflict)
+{
+ SHMQueueDelete(&conflict->inLink);
+ SHMQueueDelete(&conflict->outLink);
+ SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+}
+
+static void
+FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
+{
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(SxactIsReadOnly(sxact));
+ Assert(!SxactIsROSafe(sxact));
+
+ sxact->flags |= SXACT_FLAG_RO_UNSAFE;
+
+ /*
+ * We know this isn't a safe snapshot, so we can stop looking for other
+ * potential conflicts.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &sxact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->possibleUnsafeConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(conflict->sxactOut));
+ Assert(sxact == conflict->sxactIn);
+
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
+ * Compares using wraparound logic, as is required by slru.c.
+ */
+static bool
+OldSerXidPagePrecedesLogically(int p, int q)
+{
+ int diff;
+
+ /*
+ * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2. Both inputs should
+ * be in the range 0..OLDSERXID_MAX_PAGE.
+ */
+ Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
+ Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
+
+ diff = p - q;
+ if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
+ diff -= OLDSERXID_MAX_PAGE + 1;
+ else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2))
+ diff += OLDSERXID_MAX_PAGE + 1;
+ return diff < 0;
+}
+
+/*
+ * Initialize for the tracking of old serializable committed xids.
+ */
+static void
+OldSerXidInit(void)
+{
+ bool found;
+
+ /*
+ * Set up SLRU management of the pg_serial data.
+ */
+ OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
+ SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl", NUM_OLDSERXID_BUFFERS, 0,
+ OldSerXidLock, "pg_serial");
+ /* Override default assumption that writes should be fsync'd */
+ OldSerXidSlruCtl->do_fsync = false;
+
+ /*
+ * Create or attach to the OldSerXidControl structure.
+ */
+ oldSerXidControl = (OldSerXidControl)
+ ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
+
+ if (!found)
+ {
+ /*
+ * Set control information to reflect empty SLRU.
+ */
+ oldSerXidControl->headPage = -1;
+ oldSerXidControl->tailSegment = -1;
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->tailXid = InvalidTransactionId;
+ oldSerXidControl->warningIssued = false;
+ }
+}
+
+/*
+ * Record a committed read write serializable xid and the minimum
+ * commitSeqNo of any transactions to which this xid had a rw-conflict out.
+ * A zero seqNo means that there were no conflicts out from xid.
+ *
+ * The return value is normally false -- true means that we're about to
+ * wrap around our space for tracking these xids, so the caller might want
+ * to take action to prevent that.
+ */
+static void
+OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
+{
+ TransactionId tailXid;
+ int targetPage;
+ int slotno;
+ int page;
+ int xidSpread;
+ bool isNewPage;
+
+ Assert(TransactionIdIsValid(xid));
+
+ targetPage = OldSerXidPage(xid);
+
+ LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
+
+ /*
+ * If no serializable transactions are active, there shouldn't be anything
+ * to push out to this SLRU. Hitting this assert would mean there's
+ * something wrong with the earlier cleanup logic.
+ */
+ tailXid = oldSerXidControl->tailXid;
+ Assert(TransactionIdIsValid(tailXid));
+
+ if (oldSerXidControl->headPage < 0)
+ {
+ page = OldSerXidPage(tailXid);
+ oldSerXidControl->tailSegment = OldSerXidSegment(page);
+ page = oldSerXidControl->tailSegment * OLDSERXID_ENTRIESPERPAGE;
+ isNewPage = true;
+ }
+ else
+ {
+ page = OldSerXidNextPage(oldSerXidControl->headPage);
+ isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage);
+ }
+
+ if (!TransactionIdIsValid(oldSerXidControl->headXid)
+ || TransactionIdFollows(xid, oldSerXidControl->headXid))
+ oldSerXidControl->headXid = xid;
+ if (oldSerXidControl->headPage < 0
+ || OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage))
+ oldSerXidControl->headPage = targetPage;
+
+ xidSpread = (((uint32) xid) - ((uint32) tailXid));
+ if (oldSerXidControl->warningIssued)
+ {
+ if (xidSpread < 800000000)
+ oldSerXidControl->warningIssued = false;
+ }
+ else if (xidSpread >= 1000000000)
+ {
+ oldSerXidControl->warningIssued = true;
+ ereport(WARNING,
+ (errmsg("memory for serializable conflict tracking is nearly exhausted"),
+ errhint("There may be an idle transaction or a forgotten prepared transaction causing this.")));
+ }
+
+ if (isNewPage)
+ {
+ /* Initialize intervening pages. */
+ while (page != targetPage)
+ {
+ (void) SimpleLruZeroPage(OldSerXidSlruCtl, page);
+ page = OldSerXidNextPage(page);
+ }
+ slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
+ }
+ else
+ slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
+
+ OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
+
+ LWLockRelease(OldSerXidLock);
+}
+
+/*
+ * Get the minimum commitSeqNo for any conflict out for the given xid. For
+ * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
+ * will be returned.
+ */
+static SerCommitSeqNo
+OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
+{
+ TransactionId headXid;
+ TransactionId tailXid;
+ SerCommitSeqNo val;
+ int slotno;
+
+ Assert(TransactionIdIsValid(xid));
+
+ LWLockAcquire(OldSerXidLock, LW_SHARED);
+ headXid = oldSerXidControl->headXid;
+ tailXid = oldSerXidControl->tailXid;
+ LWLockRelease(OldSerXidLock);
+
+ if (!TransactionIdIsValid(headXid))
+ return 0;
+
+ Assert(TransactionIdIsValid(tailXid));
+
+ if (TransactionIdPrecedes(xid, tailXid)
+ || TransactionIdFollows(xid, headXid))
+ return 0;
+
+ /*
+ * The following function must be called without holding OldSerXidLock,
+ * but will return with that lock held, which must then be released.
+ */
+ slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
+ OldSerXidPage(xid), xid);
+ val = OldSerXidValue(slotno, xid);
+ LWLockRelease(OldSerXidLock);
+ return val;
+}
+
+/*
+ * Call this whenever there is a new xmin for active serializable
+ * transactions. We don't need to keep information on transactions which
+ * preceed that. InvalidTransactionId means none active, so everything in
+ * the SLRU should be discarded.
+ */
+static void
+OldSerXidSetActiveSerXmin(TransactionId xid)
+{
+ int newTailPage;
+ int newTailSegment;
+
+ LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
+
+ /*
+ * When no sxacts are active, nothing overlaps, set the xid values to
+ * invalid to show that there are no valid entries. Don't clear the
+ * segment/page information, though. A new xmin might still land in an
+ * existing segment, and we don't want to repeatedly delete and re-create
+ * the same segment file.
+ */
+ if (!TransactionIdIsValid(xid))
+ {
+ if (TransactionIdIsValid(oldSerXidControl->tailXid))
+ {
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->tailXid = InvalidTransactionId;
+ }
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ /*
+ * When we're recovering prepared transactions, the global xmin might move
+ * backwards depending on the order they're recovered. Normally that's not
+ * OK, but during recovery no serializable transactions will commit, so
+ * the SLRU is empty and we can get away with it.
+ */
+ if (RecoveryInProgress())
+ {
+ Assert(oldSerXidControl->headPage < 0);
+ if (!TransactionIdIsValid(oldSerXidControl->tailXid)
+ || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
+ oldSerXidControl->tailXid = xid;
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
+ || TransactionIdFollows(xid, oldSerXidControl->tailXid));
+
+ oldSerXidControl->tailXid = xid;
+
+ /* Exit quickly if there are no segments active. */
+ if (oldSerXidControl->headPage < 0)
+ {
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ newTailPage = OldSerXidPage(xid);
+ newTailSegment = OldSerXidSegment(newTailPage);
+
+ /* Exit quickly if we're still on the same segment. */
+ if (newTailSegment == oldSerXidControl->tailSegment)
+ {
+ LWLockRelease(OldSerXidLock);
+ return;
+ }
+
+ oldSerXidControl->tailSegment = newTailSegment;
+
+ /* See if that has cleared the last segment. */
+ if (OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
+ newTailSegment * SLRU_PAGES_PER_SEGMENT))
+ {
+ oldSerXidControl->headXid = InvalidTransactionId;
+ oldSerXidControl->headPage = -1;
+ oldSerXidControl->tailSegment = -1;
+ }
+
+ LWLockRelease(OldSerXidLock);
+
+ SimpleLruTruncate(OldSerXidSlruCtl, newTailPage);
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * InitPredicateLocks -- Initialize the predicate locking data structures.
+ *
+ * This is called from CreateSharedMemoryAndSemaphores(), which see for
+ * more comments. In the normal postmaster case, the shared hash tables
+ * are created here. Backends inherit the pointers
+ * to the shared tables via fork(). In the EXEC_BACKEND case, each
+ * backend re-executes this code to obtain pointers to the already existing
+ * shared hash tables.
+ */
+void
+InitPredicateLocks(void)
+{
+ HASHCTL info;
+ int hash_flags;
+ long init_table_size,
+ max_table_size;
+ Size requestSize;
+ bool found;
+
+ /*
+ * Compute init/max size to request for predicate lock target hashtable.
+ * Note these calculations must agree with PredicateLockShmemSize!
+ */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate hash table for PREDICATELOCKTARGET structs. This stores
+ * per-predicate-lock-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ info.entrysize = sizeof(PREDICATELOCKTARGET);
+ info.hash = tag_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /* Assume an average of 2 xacts per target */
+ max_table_size *= 2;
+ init_table_size *= 2;
+
+ /*
+ * Reserve an entry in the hash table; we use it to make sure there's
+ * always one entry available when we need to split or combine a page,
+ * because running out of space there could mean aborting a
+ * non-serializable transaction.
+ */
+ hash_search(PredicateLockTargetHash, &ReservedTargetTag,
+ HASH_ENTER, NULL);
+
+
+ /*
+ * Allocate hash table for PREDICATELOCK structs. This stores per
+ * xact-lock-of-a-target information.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(PREDICATELOCKTAG);
+ info.entrysize = sizeof(PREDICATELOCK);
+ info.hash = predicatelock_hash;
+ info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+
+ PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Compute init/max size to request for serializable transaction
+ * hashtable. Note these calculations must agree with
+ * PredicateLockShmemSize!
+ */
+ max_table_size = (MaxBackends + max_prepared_xacts);
+ init_table_size = max_table_size / 2;
+
+ /*
+ * Allocate a list to hold information on transactions participating in
+ * predicate locking.
+ *
+ * Assume an average of 10 predicate locking transactions per backend.
+ * This allows aggressive cleanup while detail is present before data must
+ * be summarized for storage in SLRU and the "dummy" transaction.
+ */
+ max_table_size *= 10;
+ init_table_size *= 10;
+
+ PredXact = ShmemInitStruct("PredXactList",
+ PredXactListDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&PredXact->availableList);
+ SHMQueueInit(&PredXact->activeList);
+ PredXact->SxactGlobalXmin = InvalidTransactionId;
+ PredXact->SxactGlobalXminCount = 0;
+ PredXact->WritableSxactCount = 0;
+ PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
+ PredXact->CanPartialClearThrough = 0;
+ PredXact->HavePartialClearedThrough = 0;
+ PredXact->NeedTargetLinkCleanup = false;
+ requestSize = mul_size((Size) max_table_size,
+ PredXactListElementDataSize);
+ PredXact->element = ShmemAlloc(requestSize);
+ if (PredXact->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "PredXactList", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(PredXact->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(PredXact->availableList),
+ &(PredXact->element[i].link));
+ }
+ PredXact->OldCommittedSxact = CreatePredXact();
+ SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
+ PredXact->OldCommittedSxact->commitSeqNo = 0;
+ PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
+ SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
+ SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
+ SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
+ SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+ PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
+ PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
+ PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
+ PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
+ PredXact->OldCommittedSxact->pid = 0;
+ }
+ /* This never changes, so let's keep a local copy. */
+ OldCommittedSxact = PredXact->OldCommittedSxact;
+
+ /*
+ * Allocate hash table for SERIALIZABLEXID structs. This stores per-xid
+ * information for serializable transactions which have accessed data.
+ */
+ MemSet(&info, 0, sizeof(info));
+ info.keysize = sizeof(SERIALIZABLEXIDTAG);
+ info.entrysize = sizeof(SERIALIZABLEXID);
+ info.hash = tag_hash;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION);
+
+ SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ /*
+ * Allocate space for tracking rw-conflicts in lists attached to the
+ * transactions.
+ *
+ * Assume an average of 5 conflicts per transaction. Calculations suggest
+ * that this will prevent resource exhaustion in even the most pessimal
+ * loads up to max_connections = 200 with all 200 connections pounding the
+ * database with serializable transactions. Beyond that, there may be
+ * occassional transactions canceled when trying to flag conflicts. That's
+ * probably OK.
+ */
+ max_table_size *= 5;
+
+ RWConflictPool = ShmemInitStruct("RWConflictPool",
+ RWConflictPoolHeaderDataSize,
+ &found);
+ if (!found)
+ {
+ int i;
+
+ SHMQueueInit(&RWConflictPool->availableList);
+ requestSize = mul_size((Size) max_table_size,
+ PredXactListElementDataSize);
+ RWConflictPool->element = ShmemAlloc(requestSize);
+ if (RWConflictPool->element == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for elements of data structure"
+ " \"%s\" (%lu bytes requested)",
+ "RWConflictPool", (unsigned long) requestSize)));
+ /* Add all elements to available list, clean. */
+ memset(RWConflictPool->element, 0, requestSize);
+ for (i = 0; i < max_table_size; i++)
+ {
+ SHMQueueInsertBefore(&(RWConflictPool->availableList),
+ &(RWConflictPool->element[i].outLink));
+ }
+ }
+
+ /*
+ * Create or attach to the header for the list of finished serializable
+ * transactions.
+ */
+ FinishedSerializableTransactions = (SHM_QUEUE *)
+ ShmemInitStruct("FinishedSerializableTransactions",
+ sizeof(SHM_QUEUE),
+ &found);
+ if (!found)
+ SHMQueueInit(FinishedSerializableTransactions);
+
+ /*
+ * Initialize the SLRU storage for old committed serializable
+ * transactions.
+ */
+ OldSerXidInit();
+}
+
+/*
+ * Estimate shared-memory space used for predicate lock table
+ */
+Size
+PredicateLockShmemSize(void)
+{
+ Size size = 0;
+ long max_table_size;
+
+ /* predicate lock target hash table */
+ max_table_size = NPREDICATELOCKTARGETENTS();
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCKTARGET)));
+
+ /* predicate lock hash table */
+ max_table_size *= 2;
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(PREDICATELOCK)));
+
+ /*
+ * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
+ * margin.
+ */
+ size = add_size(size, size / 10);
+
+ /* transaction list */
+ max_table_size = MaxBackends + max_prepared_xacts;
+ max_table_size *= 10;
+ size = add_size(size, PredXactListDataSize);
+ size = add_size(size, mul_size((Size) max_table_size,
+ PredXactListElementDataSize));
+
+ /* transaction xid table */
+ size = add_size(size, hash_estimate_size(max_table_size,
+ sizeof(SERIALIZABLEXID)));
+
+ /* Head for list of finished serializable transactions. */
+ size = add_size(size, sizeof(SHM_QUEUE));
+
+ /* Shared memory structures for SLRU tracking of old committed xids. */
+ size = add_size(size, sizeof(OldSerXidControl));
+ size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
+
+ return size;
+}
+
+
+/*
+ * Compute the hash code associated with a PREDICATELOCKTAG.
+ *
+ * Because we want to use just one set of partition locks for both the
+ * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
+ * that PREDICATELOCKs fall into the same partition number as their
+ * associated PREDICATELOCKTARGETs. dynahash.c expects the partition number
+ * to be the low-order bits of the hash code, and therefore a
+ * PREDICATELOCKTAG's hash code must have the same low-order bits as the
+ * associated PREDICATELOCKTARGETTAG's hash code. We achieve this with this
+ * specialized hash function.
+ */
+static uint32
+predicatelock_hash(const void *key, Size keysize)
+{
+ const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
+ uint32 targethash;
+
+ Assert(keysize == sizeof(PREDICATELOCKTAG));
+
+ /* Look into the associated target object, and compute its hash code */
+ targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
+
+ return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
+}
+
+
+/*
+ * GetPredicateLockStatusData
+ * Return a table containing the internal state of the predicate
+ * lock manager for use in pg_lock_status.
+ *
+ * Like GetLockStatusData, this function tries to hold the partition LWLocks
+ * for as short a time as possible by returning two arrays that simply
+ * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
+ * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
+ * SERIALIZABLEXACT will likely appear.
+ */
+PredicateLockData *
+GetPredicateLockStatusData(void)
+{
+ PredicateLockData *data;
+ int i;
+ int els,
+ el;
+ HASH_SEQ_STATUS seqstat;
+ PREDICATELOCK *predlock;
+
+ data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
+
+ /*
+ * To ensure consistency, take simultaneous locks on all partition locks
+ * in ascending order, then SerializableXactHashLock.
+ */
+ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+
+ /* Get number of locks and allocate appropriately-sized arrays. */
+ els = hash_get_num_entries(PredicateLockHash);
+ data->nelements = els;
+ data->locktags = (PREDICATELOCKTARGETTAG *)
+ palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
+ data->xacts = (SERIALIZABLEXACT *)
+ palloc(sizeof(SERIALIZABLEXACT) * els);
+
+
+ /* Scan through PredicateLockHash and copy contents */
+ hash_seq_init(&seqstat, PredicateLockHash);
+
+ el = 0;
+
+ while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
+ {
+ data->locktags[el] = predlock->tag.myTarget->tag;
+ data->xacts[el] = *predlock->tag.myXact;
+ el++;
+ }
+
+ Assert(el == els);
+
+ /* Release locks in reverse order */
+ LWLockRelease(SerializableXactHashLock);
+ for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ LWLockRelease(FirstPredicateLockMgrLock + i);
+
+ return data;
+}
+
+/*
+ * Free up shared memory structures by pushing the oldest sxact (the one at
+ * the front of the SummarizeOldestCommittedSxact queue) into summary form.
+ * Each call will free exactly one SERIALIZABLEXACT structure and may also
+ * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
+ * PREDICATELOCKTARGET, RWConflictData.
+ */
+static void
+SummarizeOldestCommittedSxact(void)
+{
+ SERIALIZABLEXACT *sxact;
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+
+#ifdef TEST_OLDSERXID
+ if (SHMQueueEmpty(FinishedSerializableTransactions))
+ {
+ LWLockRelease(SerializableFinishedListLock);
+ return;
+ }
+#else
+ Assert(!SHMQueueEmpty(FinishedSerializableTransactions));
+#endif
+
+ /*
+ * Grab the first sxact off the finished list -- this will be the earliest
+ * commit. Remove it from the list.
+ */
+ sxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ FinishedSerializableTransactions,
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ SHMQueueDelete(&(sxact->finishedLink));
+
+ /* Add to SLRU summary information. */
+ if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
+ OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
+ ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
+
+ /* Summarize and release the detail. */
+ ReleaseOneSerializableXact(sxact, false, true);
+
+ LWLockRelease(SerializableFinishedListLock);
+}
+
+/*
+ * GetSafeSnapshot
+ * Obtain and register a snapshot for a READ ONLY DEFERRABLE
+ * transaction. Ensures that the snapshot is "safe", i.e. a
+ * read-only transaction running on it can execute serializably
+ * without further checks. This requires waiting for concurrent
+ * transactions to complete, and retrying with a new snapshot if
+ * one of them could possibly create a conflict.
+ */
+static Snapshot
+GetSafeSnapshot(Snapshot origSnapshot)
+{
+ Snapshot snapshot;
+
+ Assert(XactReadOnly && XactDeferrable);
+
+ while (true)
+ {
+ /*
+ * RegisterSerializableTransactionInt is going to call
+ * GetSnapshotData, so we need to provide it the static snapshot our
+ * caller passed to us. It returns a copy of that snapshot and
+ * registers it on TopTransactionResourceOwner.
+ */
+ snapshot = RegisterSerializableTransactionInt(origSnapshot);
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return snapshot; /* no concurrent r/w xacts; it's safe */
+
+ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+
+ /*
+ * Wait for concurrent transactions to finish. Stop early if one of
+ * them marked us as conflicted.
+ */
+ while (!(SHMQueueEmpty((SHM_QUEUE *)
+ &MySerializableXact->possibleUnsafeConflicts) ||
+ SxactIsROUnsafe(MySerializableXact)))
+ ProcWaitForSignal();
+
+ MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+ if (!SxactIsROUnsafe(MySerializableXact))
+ break; /* success */
+
+ /* else, need to retry... */
+ ereport(DEBUG2,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("deferrable snapshot was unsafe; trying a new one")));
+ ReleasePredicateLocks(false);
+ UnregisterSnapshotFromOwner(snapshot,
+ TopTransactionResourceOwner);
+ }
+
+ /*
+ * Now we have a safe snapshot, so we don't need to do any further checks.
+ */
+ Assert(SxactIsROSafe(MySerializableXact));
+ ReleasePredicateLocks(false);
+
+ return snapshot;
+}
+
+/*
+ * Acquire and register a snapshot which can be used for this transaction..
+ * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
+ * It should be current for this process and be contained in PredXact.
+ */
+Snapshot
+RegisterSerializableTransaction(Snapshot snapshot)
+{
+ Assert(IsolationIsSerializable());
+
+ /*
+ * A special optimization is available for SERIALIZABLE READ ONLY
+ * DEFERRABLE transactions -- we can wait for a suitable snapshot and
+ * thereby avoid all SSI overhead once it's running..
+ */
+ if (XactReadOnly && XactDeferrable)
+ return GetSafeSnapshot(snapshot);
+
+ return RegisterSerializableTransactionInt(snapshot);
+}
+
+static Snapshot
+RegisterSerializableTransactionInt(Snapshot snapshot)
+{
+ PGPROC *proc;
+ VirtualTransactionId vxid;
+ SERIALIZABLEXACT *sxact,
+ *othersxact;
+ HASHCTL hash_ctl;
+
+ /* We only do this for serializable transactions. Once. */
+ Assert(MySerializableXact == InvalidSerializableXact);
+
+ Assert(!RecoveryInProgress());
+
+ proc = MyProc;
+ Assert(proc != NULL);
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+
+ /*
+ * First we get the sxact structure, which may involve looping and access
+ * to the "finished" list to free a structure for use.
+ */
+#ifdef TEST_OLDSERXID
+ SummarizeOldestCommittedSxact();
+#endif
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ do
+ {
+ sxact = CreatePredXact();
+ /* If null, push out committed sxact to SLRU summary & retry. */
+ if (!sxact)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ SummarizeOldestCommittedSxact();
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ }
+ } while (!sxact);
+
+ /* Get and register a snapshot */
+ snapshot = GetSnapshotData(snapshot);
+ snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
+
+ /*
+ * If there are no serializable transactions which are not read-only, we
+ * can "opt out" of predicate locking and conflict checking for a
+ * read-only transaction.
+ *
+ * The reason this is safe is that a read-only transaction can only become
+ * part of a dangerous structure if it overlaps a writable transaction
+ * which in turn overlaps a writable transaction which committed before
+ * the read-only transaction started. A new writable transaction can
+ * overlap this one, but it can't meet the other condition of overlapping
+ * a transaction which committed before this one started.
+ */
+ if (XactReadOnly && PredXact->WritableSxactCount == 0)
+ {
+ ReleasePredXact(sxact);
+ LWLockRelease(SerializableXactHashLock);
+ return snapshot;
+ }
+
+ /* Maintain serializable global xmin info. */
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount == 0);
+ PredXact->SxactGlobalXmin = snapshot->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ OldSerXidSetActiveSerXmin(snapshot->xmin);
+ }
+ else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ PredXact->SxactGlobalXminCount++;
+ }
+ else
+ {
+ Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
+ }
+
+ /* Initialize the structure. */
+ sxact->vxid = vxid;
+ sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
+ sxact->commitSeqNo = InvalidSerCommitSeqNo;
+ SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflicts));
+ SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+ sxact->topXid = GetTopTransactionIdIfAny();
+ sxact->finishedBefore = InvalidTransactionId;
+ sxact->xmin = snapshot->xmin;
+ sxact->pid = MyProcPid;
+ SHMQueueInit(&(sxact->predicateLocks));
+ SHMQueueElemInit(&(sxact->finishedLink));
+ sxact->flags = 0;
+ if (XactReadOnly)
+ {
+ sxact->flags |= SXACT_FLAG_READ_ONLY;
+
+ /*
+ * Register all concurrent r/w transactions as possible conflicts; if
+ * all of them commit without any outgoing conflicts to earlier
+ * transactions then this snapshot can be deemed safe (and we can run
+ * without tracking predicate locks).
+ */
+ for (othersxact = FirstPredXact();
+ othersxact != NULL;
+ othersxact = NextPredXact(othersxact))
+ {
+ if (!SxactIsOnFinishedList(othersxact) &&
+ !SxactIsReadOnly(othersxact))
+ {
+ SetPossibleUnsafeConflict(sxact, othersxact);
+ }
+ }
+ }
+ else
+ {
+ ++(PredXact->WritableSxactCount);
+ Assert(PredXact->WritableSxactCount <=
+ (MaxBackends + max_prepared_xacts));
+ }
+
+ MySerializableXact = sxact;
+
+ LWLockRelease(SerializableXactHashLock);
+
+ /* Initialize the backend-local hash table of parent locks */
+ Assert(LocalPredicateLockHash == NULL);
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
+ hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
+ hash_ctl.hash = tag_hash;
+ LocalPredicateLockHash = hash_create("Local predicate lock",
+ max_predicate_locks_per_xact,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+
+ return snapshot;
+}
+
+/*
+ * Register the top level XID in SerializableXidHash.
+ * Also store it for easy reference in MySerializableXact.
+ */
+void
+RegisterPredicateLockingXid(const TransactionId xid)
+{
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ bool found;
+
+ /*
+ * If we're not tracking predicate lock data for this transaction, we
+ * should ignore the request and return quickly.
+ */
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ /* This should only be done once per transaction. */
+ Assert(MySerializableXact->topXid == InvalidTransactionId);
+
+ /* We should have a valid XID and be at the top level. */
+ Assert(TransactionIdIsValid(xid));
+
+ MySerializableXact->topXid = xid;
+
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
+ &sxidtag,
+ HASH_ENTER, &found);
+ if (!sxid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ Assert(!found);
+
+ /* Initialize the structure. */
+ sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
+ LWLockRelease(SerializableXactHashLock);
+}
+
+
+/*
+ * Check whether there are any predicate locks held by any transaction
+ * for the page at the given block number.
+ *
+ * Note that the transaction may be completed but not yet subject to
+ * cleanup due to overlapping serializable transactions. This must
+ * return valid information regardless of transaction isolation level.
+ *
+ * Also note that this doesn't check for a conflicting relation lock,
+ * just a lock specifically on the given page.
+ *
+ * One use is to support proper behavior during GiST index vacuum.
+ */
+bool
+PageIsPredicateLocked(const Relation relation, const BlockNumber blkno)
+{
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &targettag, targettaghash,
+ HASH_FIND, NULL);
+ LWLockRelease(partitionLock);
+
+ return (target != NULL);
+}
+
+
+/*
+ * Check whether a particular lock is held by this transaction.
+ *
+ * Important note: this function may return false even if the lock is
+ * being held, because it uses the local lock table which is not
+ * updated if another transaction modifies our lock list (e.g. to
+ * split an index page). However, it will never return true if the
+ * lock is not held. We only use this function in circumstances where
+ * such false negatives are acceptable.
+ */
+static bool
+PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
+{
+ LOCALPREDICATELOCK *lock;
+
+ /* check local hash table */
+ lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ targettag,
+ HASH_FIND, NULL);
+
+ if (!lock)
+ return false;
+
+ /*
+ * Found entry in the table, but still need to check whether it's actually
+ * held -- it could just be a parent of some held lock.
+ */
+ return lock->held;
+}
+
+/*
+ * Return the parent lock tag in the lock hierarchy: the next coarser
+ * lock that covers the provided tag.
+ *
+ * Returns true and sets *parent to the parent tag if one exists,
+ * returns false if none exists.
+ */
+static bool
+GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
+ PREDICATELOCKTARGETTAG *parent)
+{
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ /* relation locks have no parent lock */
+ return false;
+
+ case PREDLOCKTAG_PAGE:
+ /* parent lock is relation lock */
+ SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
+
+ return true;
+
+ case PREDLOCKTAG_TUPLE:
+ /* parent lock is page lock */
+ SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
+ GET_PREDICATELOCKTARGETTAG_DB(*tag),
+ GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
+ GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
+ return true;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return false;
+}
+
+/*
+ * Check whether the lock we are considering is already covered by a
+ * coarser lock for our transaction.
+ */
+static bool
+CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
+{
+ PREDICATELOCKTARGETTAG targettag,
+ parenttag;
+
+ targettag = *newtargettag;
+
+ /* check parents iteratively until no more */
+ while (GetParentPredicateLockTag(&targettag, &parenttag))
+ {
+ targettag = parenttag;
+ if (PredicateLockExists(&targettag))
+ return true;
+ }
+
+ /* no more parents to check; lock is not covered */
+ return false;
+}
+
+/*
+ * Check whether both the list of related predicate locks and the pointer to
+ * a prior version of the row (if this is a tuple lock target) are empty for
+ * a predicate lock target, and remove the target if they are.
+ */
+static void
+RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
+{
+ PREDICATELOCKTARGET *rmtarget;
+ PREDICATELOCKTARGET *next;
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+
+ /* Can't remove it until no locks at this target. */
+ if (!SHMQueueEmpty(&target->predicateLocks))
+ return;
+
+ /* Can't remove it if there are locks for a prior row version. */
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
+ if (target->priorVersionOfRow != NULL)
+ {
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ return;
+ }
+
+ /*
+ * We are going to release this target, This requires that we let the
+ * next version of the row (if any) know that it's previous version is
+ * done.
+ *
+ * It might be that the link was all that was keeping the other target
+ * from cleanup, but we can't clean that up here -- LW locking is all
+ * wrong for that. We'll pass the HTAB in the general cleanup function to
+ * get rid of such "dead" targets.
+ */
+ next = target->nextVersionOfRow;
+ if (next != NULL)
+ {
+ next->priorVersionOfRow = NULL;
+ if (SHMQueueEmpty(&next->predicateLocks))
+ PredXact->NeedTargetLinkCleanup = true;
+ }
+ LWLockRelease(PredicateLockNextRowLinkLock);
+
+ /* Actually remove the target. */
+ rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &target->tag,
+ targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmtarget == target);
+}
+
+/*
+ * Delete child target locks owned by this process.
+ * This implementation is assuming that the usage of each target tag field
+ * is uniform. No need to make this hard if we don't have to.
+ *
+ * We aren't acquiring lightweight locks for the predicate lock or lock
+ * target structures associated with this transaction unless we're going
+ * to modify them, because no other process is permitted to modify our
+ * locks.
+ */
+static void
+DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
+{
+ SERIALIZABLEXACT *sxact;
+ PREDICATELOCK *predlock;
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ sxact = (SERIALIZABLEXACT *) MySerializableXact;
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ SHM_QUEUE *predlocksxactlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG oldlocktag;
+ PREDICATELOCKTARGET *oldtarget;
+ PREDICATELOCKTARGETTAG oldtargettag;
+
+ predlocksxactlink = &(predlock->xactLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ predlocksxactlink,
+ offsetof(PREDICATELOCK, xactLink));
+
+ oldlocktag = predlock->tag;
+ Assert(oldlocktag.myXact == sxact);
+ oldtarget = oldlocktag.myTarget;
+ oldtargettag = oldtarget->tag;
+
+ if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
+ {
+ uint32 oldtargettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCK *rmpredlock;
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(predlocksxactlink);
+ SHMQueueDelete(&(predlock->targetLink));
+ rmpredlock = hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
+ oldtargettaghash),
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
+
+ LWLockRelease(partitionLock);
+
+ DecrementParentLocks(&oldtargettag);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Returns the promotion threshold for a given predicate lock
+ * target. This is the number of descendant locks required to promote
+ * to the specified tag. Note that the threshold includes non-direct
+ * descendants, e.g. both tuples and pages for a relation lock.
+ *
+ * TODO SSI: We should do something more intelligent about what the
+ * thresholds are, either making it proportional to the number of
+ * tuples in a page & pages in a relation, or at least making it a
+ * GUC. Currently the threshold is 3 for a page lock, and
+ * max_predicate_locks_per_transaction/2 for a relation lock, chosen
+ * entirely arbitrarily (and without benchmarking).
+ */
+static int
+PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
+{
+ switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
+ {
+ case PREDLOCKTAG_RELATION:
+ return max_predicate_locks_per_xact / 2;
+
+ case PREDLOCKTAG_PAGE:
+ return 3;
+
+ case PREDLOCKTAG_TUPLE:
+
+ /*
+ * not reachable: nothing is finer-granularity than a tuple, so we
+ * should never try to promote to it.
+ */
+ Assert(false);
+ return 0;
+ }
+
+ /* not reachable */
+ Assert(false);
+ return 0;
+}
+
+/*
+ * For all ancestors of a newly-acquired predicate lock, increment
+ * their child count in the parent hash table. If any of them have
+ * more descendants than their promotion threshold, acquire the
+ * coarsest such lock.
+ *
+ * Returns true if a parent lock was acquired and false otherwise.
+ */
+static bool
+CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
+{
+ PREDICATELOCKTARGETTAG targettag,
+ nexttag,
+ promotiontag;
+ LOCALPREDICATELOCK *parentlock;
+ bool found,
+ promote;
+
+ promote = false;
+
+ targettag = *reqtag;
+
+ /* check parents iteratively */
+ while (GetParentPredicateLockTag(&targettag, &nexttag))
+ {
+ targettag = nexttag;
+ parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
+ &targettag,
+ HASH_ENTER,
+ &found);
+ if (!found)
+ {
+ parentlock->held = false;
+ parentlock->childLocks = 1;
+ }
+ else
+ parentlock->childLocks++;
+
+ if (parentlock->childLocks >=
+ PredicateLockPromotionThreshold(&targettag))
+ {
+ /*
+ * We should promote to this parent lock. Continue to check its
+ * ancestors, however, both to get their child counts right and to
+ * check whether we should just go ahead and promote to one of
+ * them.
+ */
+ promotiontag = targettag;
+ promote = true;
+ }
+ }
+
+ if (promote)
+ {
+ /* acquire coarsest ancestor eligible for promotion */
+ PredicateLockAcquire(&promotiontag);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*
+ * When releasing a lock, decrement the child count on all ancestor
+ * locks.
+ *
+ * This is called only when releasing a lock via
+ * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
+ * we've acquired its parent, possibly due to promotion) or when a new
+ * MVCC write lock makes the predicate lock unnecessary. There's no
+ * point in calling it when locks are released at transaction end, as
+ * this information is no longer needed.
+ */
+static void
+DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
+{
+ PREDICATELOCKTARGETTAG parenttag,
+ nexttag;
+
+ parenttag = *targettag;
+
+ while (GetParentPredicateLockTag(&parenttag, &nexttag))
+ {
+ uint32 targettaghash;
+ LOCALPREDICATELOCK *parentlock,
+ *rmlock;
+
+ parenttag = nexttag;
+ targettaghash = PredicateLockTargetTagHashCode(&parenttag);
+ parentlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_FIND, NULL);
+
+ /*
+ * There's a small chance the parent lock doesn't exist in the lock
+ * table. This can happen if we prematurely removed it because an
+ * index split caused the child refcount to be off.
+ */
+ if (parentlock == NULL)
+ continue;
+
+ parentlock->childLocks--;
+
+ /*
+ * Under similar circumstances the parent lock's refcount might be
+ * zero. This only happens if we're holding that lock (otherwise we
+ * would have removed the entry).
+ */
+ if (parentlock->childLocks < 0)
+ {
+ Assert(parentlock->held);
+ parentlock->childLocks = 0;
+ }
+
+ if ((parentlock->childLocks == 0) && (!parentlock->held))
+ {
+ rmlock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ &parenttag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlock == parentlock);
+ }
+ }
+}
+
+/*
+ * Indicate that a predicate lock on the given target is held by the
+ * specified transaction. Has no effect if the lock is already held.
+ *
+ * This updates the lock table and the sxact's lock list, and creates
+ * the lock target if necessary, but does *not* do anything related to
+ * granularity promotion or the local lock table. See
+ * PredicateLockAcquire for that.
+ */
+static void
+CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
+ uint32 targettaghash,
+ SERIALIZABLEXACT *sxact)
+{
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTAG locktag;
+ PREDICATELOCK *lock;
+ LWLockId partitionLock;
+ bool found;
+
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ /* Make sure that the target is represented. */
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ if (!target)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ {
+ SHMQueueInit(&(target->predicateLocks));
+ target->priorVersionOfRow = NULL;
+ target->nextVersionOfRow = NULL;
+ }
+
+ /* We've got the sxact and target, make sure they're joined. */
+ locktag.myTarget = target;
+ locktag.myXact = sxact;
+ lock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash, &locktag,
+ PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
+ HASH_ENTER, &found);
+ if (!lock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+
+ if (!found)
+ {
+ SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
+ SHMQueueInsertBefore(&(sxact->predicateLocks),
+ &(lock->xactLink));
+ lock->commitSeqNo = 0;
+ }
+
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Acquire a predicate lock on the specified target for the current
+ * connection if not already held. This updates the local lock table
+ * and uses it to implement granularity promotion. It will consolidate
+ * multiple locks into a coarser lock if warranted, and will release
+ * any finer-grained locks covered by the new one.
+ */
+static void
+PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
+{
+ uint32 targettaghash;
+ bool found;
+ LOCALPREDICATELOCK *locallock;
+
+ /* Do we have the lock already, or a covering lock? */
+ if (PredicateLockExists(targettag))
+ return;
+
+ if (CoarserLockCovers(targettag))
+ return;
+
+ /* the same hash and LW lock apply to the lock target and the local lock. */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+
+ /* Acquire lock in local table */
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_ENTER, &found);
+ /* We should not hold the lock (but its entry might still exist) */
+ Assert(!found || !locallock->held);
+ locallock->held = true;
+ if (!found)
+ locallock->childLocks = 0;
+
+ /* Actually create the lock */
+ CreatePredicateLock(targettag, targettaghash,
+ (SERIALIZABLEXACT *) MySerializableXact);
+
+ /*
+ * Lock has been acquired. Check whether it should be promoted to a
+ * coarser granularity, or whether there are finer-granularity locks to
+ * clean up.
+ */
+ if (CheckAndPromotePredicateLockRequest(targettag))
+ {
+ /*
+ * Lock request was promoted to a coarser-granularity lock, and that
+ * lock was acquired. It will delete this lock and any of its
+ * children, so we're done.
+ */
+ }
+ else
+ {
+ /* Clean up any finer-granularity locks */
+ if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
+ DeleteChildTargetLocks(targettag);
+ }
+}
+
+
+/*
+ * PredicateLockRelation
+ *
+ * Gets a predicate lock at the relation level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+void
+PredicateLockRelation(const Relation relation)
+{
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * PredicateLockPage
+ *
+ * Gets a predicate lock at the page level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ * Skip if a coarser predicate lock already covers this page.
+ * Clear any finer-grained predicate locks this session has on the relation.
+ */
+void
+PredicateLockPage(const Relation relation, const BlockNumber blkno)
+{
+ PREDICATELOCKTARGETTAG tag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ blkno);
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * PredicateLockTuple
+ *
+ * Gets a predicate lock at the tuple level.
+ * Skip if not in full serializable transaction isolation level.
+ * Skip if this is a temporary table.
+ */
+void
+PredicateLockTuple(const Relation relation, const HeapTuple tuple)
+{
+ PREDICATELOCKTARGETTAG tag;
+ ItemPointer tid;
+
+ if (SkipSerialization(relation))
+ return;
+
+ /*
+ * If it's a heap tuple, return if this xact wrote it.
+ */
+ if (relation->rd_index == NULL)
+ {
+ TransactionId myxid = GetTopTransactionIdIfAny();
+
+ if (TransactionIdIsValid(myxid))
+ {
+ TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data);
+
+ if (TransactionIdFollowsOrEquals(xid, TransactionXmin))
+ {
+ xid = SubTransGetTopmostTransaction(xid);
+ if (TransactionIdEquals(xid, myxid))
+ {
+ /* We wrote it; we already have a write lock. */
+ return;
+ }
+ }
+ }
+ }
+
+ /*
+ * Do quick-but-not-definitive test for a relation lock first. This will
+ * never cause a return when the relation is *not* locked, but will
+ * occasionally let the check continue when there really *is* a relation
+ * level lock.
+ */
+ SET_PREDICATELOCKTARGETTAG_RELATION(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ if (PredicateLockExists(&tag))
+ return;
+
+ tid = &(tuple->t_self);
+ SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid));
+ PredicateLockAcquire(&tag);
+}
+
+/*
+ * If the old tuple has any predicate locks, create a lock target for the
+ * new tuple and point them at each other. Conflict detection needs to
+ * look for locks against prior versions of the row.
+ */
+void
+PredicateLockTupleRowVersionLink(const Relation relation,
+ const HeapTuple oldTuple,
+ const HeapTuple newTuple)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ PREDICATELOCKTARGET *oldtarget;
+ PREDICATELOCKTARGET *newtarget;
+ PREDICATELOCKTARGET *next;
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+ bool found;
+
+ SET_PREDICATELOCKTARGETTAG_TUPLE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(oldTuple->t_self)),
+ ItemPointerGetOffsetNumber(&(oldTuple->t_self)));
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+
+ SET_PREDICATELOCKTARGETTAG_TUPLE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(newTuple->t_self)),
+ ItemPointerGetOffsetNumber(&(newTuple->t_self)));
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ /* Lock lower numbered partition first. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (newpartitionLock < oldpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock, LW_SHARED);
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ oldtarget = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag, oldtargettaghash,
+ HASH_FIND, NULL);
+
+ /* Only need to link if there is an old target already. */
+ if (oldtarget)
+ {
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_EXCLUSIVE);
+
+ /* Guard against stale pointers from rollback. */
+ next = oldtarget->nextVersionOfRow;
+ if (next != NULL)
+ {
+ next->priorVersionOfRow = NULL;
+ oldtarget->nextVersionOfRow = NULL;
+ }
+
+ /* Find or create the new target, and link old and new. */
+ newtarget = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag, newtargettaghash,
+ HASH_ENTER, &found);
+ if (!newtarget)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (!found)
+ {
+ SHMQueueInit(&(newtarget->predicateLocks));
+ newtarget->nextVersionOfRow = NULL;
+ }
+ else
+ Assert(newtarget->priorVersionOfRow == NULL);
+
+ newtarget->priorVersionOfRow = oldtarget;
+ oldtarget->nextVersionOfRow = newtarget;
+
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ }
+
+ /* Release lower number partition last. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (newpartitionLock < oldpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+}
+
+
+/*
+ * DeleteLockTarget
+ *
+ * Remove a predicate lock target along with any locks held for it.
+ *
+ * Caller must hold SerializablePredicateLockListLock and the
+ * appropriate hash partition lock for the target.
+ */
+static void
+DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
+{
+ PREDICATELOCK *predlock;
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ bool found;
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (predlock)
+ {
+ predlocktargetlink = &(predlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+
+ SHMQueueDelete(&(predlock->xactLink));
+ SHMQueueDelete(&(predlock->targetLink));
+
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &predlock->tag,
+ PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
+ targettaghash),
+ HASH_REMOVE, &found);
+ Assert(found);
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ /* Remove the target itself, if possible. */
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+}
+
+
+/*
+ * TransferPredicateLocksToNewTarget
+ *
+ * Move or copy all the predicate locks for a lock target, for use by
+ * index page splits/combines and other things that create or replace
+ * lock targets. If 'removeOld' is true, the old locks and the target
+ * will be removed.
+ *
+ * Returns true on success, or false if we ran out of shared memory to
+ * allocate the new target or locks. Guaranteed to always succeed if
+ * removeOld is set (by using the reserved entry in
+ * PredicateLockTargetHash for scratch space).
+ *
+ * Caller must hold SerializablePredicateLockListLock.
+ */
+static bool
+TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldtargettag,
+ const PREDICATELOCKTARGETTAG newtargettag,
+ bool removeOld)
+{
+ uint32 oldtargettaghash;
+ LWLockId oldpartitionLock;
+ PREDICATELOCKTARGET *oldtarget;
+ uint32 newtargettaghash;
+ LWLockId newpartitionLock;
+ bool found;
+ bool outOfShmem = false;
+ uint32 reservedtargettaghash;
+ LWLockId reservedpartitionLock;
+
+
+ Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+
+ oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
+ newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
+ oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
+ newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
+
+ reservedtargettaghash = 0; /* Quiet compiler warnings. */
+ reservedpartitionLock = 0; /* Quiet compiler warnings. */
+
+ if (removeOld)
+ {
+ /*
+ * Remove the reserved entry to give us scratch space, so we know
+ * we'll be able to create the new lock target.
+ */
+ reservedtargettaghash = PredicateLockTargetTagHashCode(&ReservedTargetTag);
+ reservedpartitionLock = PredicateLockHashPartitionLock(reservedtargettaghash);
+ LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &ReservedTargetTag,
+ reservedtargettaghash,
+ HASH_REMOVE, &found);
+ Assert(found);
+ LWLockRelease(reservedpartitionLock);
+ }
+
+ /*
+ * We must get the partition locks in ascending sequence to avoid
+ * deadlocks. If old and new partitions are the same, we must request the
+ * lock only once.
+ */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockAcquire(oldpartitionLock,
+ (removeOld ? LW_EXCLUSIVE : LW_SHARED));
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(oldpartitionLock,
+ (removeOld ? LW_EXCLUSIVE : LW_SHARED));
+ }
+ else
+ LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
+
+ /*
+ * Look for the old target. If not found, that's OK; no predicate locks
+ * are affected, so we can just clean up and return. If it does exist,
+ * walk its list of predicate locks and move or copy them to the new
+ * target.
+ */
+ oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &oldtargettag,
+ oldtargettaghash,
+ HASH_FIND, NULL);
+
+ if (oldtarget)
+ {
+ PREDICATELOCKTARGET *newtarget;
+ PREDICATELOCK *oldpredlock;
+ PREDICATELOCKTAG newpredlocktag;
+
+ newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ &newtargettag,
+ newtargettaghash,
+ HASH_ENTER_NULL, &found);
+
+ if (!newtarget)
+ {
+ /* Failed to allocate due to insufficient shmem */
+ outOfShmem = true;
+ goto exit;
+ }
+
+ /* If we created a new entry, initialize it */
+ if (!found)
+ {
+ SHMQueueInit(&(newtarget->predicateLocks));
+ newpredlocktag.myTarget = newtarget;
+ }
+
+ oldpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ &(oldtarget->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ while (oldpredlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *newpredlock;
+
+ predlocktargetlink = &(oldpredlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(oldtarget->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+ newpredlocktag.myXact = oldpredlock->tag.myXact;
+
+ if (removeOld)
+ {
+ SHMQueueDelete(&(oldpredlock->xactLink));
+ SHMQueueDelete(&(oldpredlock->targetLink));
+
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &oldpredlock->tag,
+ PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
+ oldtargettaghash),
+ HASH_REMOVE, &found);
+ Assert(found);
+ }
+
+
+ newpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value
+ (PredicateLockHash,
+ &newpredlocktag,
+ PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
+ newtargettaghash),
+ HASH_ENTER_NULL, &found);
+ if (!newpredlock)
+ {
+ /* Out of shared memory. Undo what we've done so far. */
+ LWLockRelease(SerializableXactHashLock);
+ DeleteLockTarget(newtarget, newtargettaghash);
+ outOfShmem = true;
+ goto exit;
+ }
+ SHMQueueInsertBefore(&(newtarget->predicateLocks),
+ &(newpredlock->targetLink));
+ SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
+
+ oldpredlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ if (removeOld)
+ {
+ Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
+ RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
+ }
+ }
+
+
+exit:
+ /* Release partition locks in reverse order of acquisition. */
+ if (oldpartitionLock < newpartitionLock)
+ {
+ LWLockRelease(newpartitionLock);
+ LWLockRelease(oldpartitionLock);
+ }
+ else if (oldpartitionLock > newpartitionLock)
+ {
+ LWLockRelease(oldpartitionLock);
+ LWLockRelease(newpartitionLock);
+ }
+ else
+ LWLockRelease(newpartitionLock);
+
+ if (removeOld)
+ {
+ /* We shouldn't run out of memory if we're moving locks */
+ Assert(!outOfShmem);
+
+ /* Put the reserved entry back */
+ LWLockAcquire(reservedpartitionLock, LW_EXCLUSIVE);
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ &ReservedTargetTag,
+ reservedtargettaghash,
+ HASH_ENTER, &found);
+ Assert(!found);
+ LWLockRelease(reservedpartitionLock);
+ }
+
+ return !outOfShmem;
+}
+
+
+/*
+ * PredicateLockPageSplit
+ *
+ * Copies any predicate locks for the old page to the new page.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page split (or overflow) affects all serializable transactions,
+ * even if it occurs in the context of another transaction isolation level.
+ *
+ * NOTE: This currently leaves the local copy of the locks without
+ * information on the new lock which is in shared memory. This could cause
+ * problems if enough page splits occur on locked pages without the processes
+ * which hold the locks getting in and noticing.
+ */
+void
+PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ bool success;
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /*
+ * Try copying the locks over to the new page's tag, creating it if
+ * necessary.
+ */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ false);
+
+ if (!success)
+ {
+ /*
+ * No more predicate lock entries are available. Failure isn't an
+ * option here, so promote the page lock to a relation lock.
+ */
+
+ /* Get the parent relation lock's lock tag */
+ success = GetParentPredicateLockTag(&oldtargettag,
+ &newtargettag);
+ Assert(success);
+
+ /* Move the locks to the parent. This shouldn't fail. */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ true);
+ Assert(success);
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * PredicateLockPageCombine
+ *
+ * Combines predicate locks for two existing pages.
+ * Skip if this is a temporary table or toast table.
+ *
+ * NOTE: A page combine affects all serializable transactions, even if it
+ * occurs in the context of another transaction isolation level.
+ */
+void
+PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno,
+ const BlockNumber newblkno)
+{
+ PREDICATELOCKTARGETTAG oldtargettag;
+ PREDICATELOCKTARGETTAG newtargettag;
+ bool success;
+
+
+ if (SkipSplitTracking(relation))
+ return;
+
+ Assert(oldblkno != newblkno);
+ Assert(BlockNumberIsValid(oldblkno));
+ Assert(BlockNumberIsValid(newblkno));
+
+ SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ oldblkno);
+ SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ newblkno);
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+
+ /* Move the locks. This shouldn't fail. */
+ success = TransferPredicateLocksToNewTarget(oldtargettag,
+ newtargettag,
+ true);
+ Assert(success);
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * Walk the hash table and find the new xmin.
+ */
+static void
+SetNewSxactGlobalXmin(void)
+{
+ SERIALIZABLEXACT *sxact;
+
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ PredXact->SxactGlobalXmin = InvalidTransactionId;
+ PredXact->SxactGlobalXminCount = 0;
+
+ for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
+ {
+ if (!SxactIsRolledBack(sxact)
+ && !SxactIsCommitted(sxact)
+ && sxact != OldCommittedSxact)
+ {
+ Assert(sxact->xmin != InvalidTransactionId);
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
+ || TransactionIdPrecedes(sxact->xmin,
+ PredXact->SxactGlobalXmin))
+ {
+ PredXact->SxactGlobalXmin = sxact->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ }
+ else if (TransactionIdEquals(sxact->xmin,
+ PredXact->SxactGlobalXmin))
+ PredXact->SxactGlobalXminCount++;
+ }
+ }
+
+ OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
+}
+
+/*
+ * ReleasePredicateLocks
+ *
+ * Releases predicate locks based on completion of the current transaction,
+ * whether committed or rolled back. It can also be called for a read only
+ * transaction when it becomes impossible for the transaction to become
+ * part of a dangerous structure.
+ *
+ * We do nothing unless this is a serializable transaction.
+ *
+ * This method must ensure that shared memory hash tables are cleaned
+ * up in some relatively timely fashion.
+ *
+ * If this transaction is committing and is holding any predicate locks,
+ * it must be added to a list of completed serializable transaction still
+ * holding locks.
+ */
+void
+ReleasePredicateLocks(const bool isCommit)
+{
+ bool needToClear;
+ RWConflict conflict,
+ nextConflict,
+ possibleUnsafeConflict;
+ SERIALIZABLEXACT *roXact;
+
+ /*
+ * We can't trust XactReadOnly here, because a transaction which started
+ * as READ WRITE can show as READ ONLY later, e.g., within
+ * substransactions. We want to flag a transaction as READ ONLY if it
+ * commits without writing so that de facto READ ONLY transactions get the
+ * benefit of some RO optimizations, so we will use this local variable to
+ * get some cleanup logic right which is based on whether the transaction
+ * was declared READ ONLY at the top level.
+ */
+ bool topLevelIsDeclaredReadOnly;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ {
+ Assert(LocalPredicateLockHash == NULL);
+ return;
+ }
+
+ Assert(!isCommit || SxactIsPrepared(MySerializableXact));
+ Assert(!SxactIsRolledBack(MySerializableXact));
+ Assert(!SxactIsCommitted(MySerializableXact));
+
+ /* may not be serializable during COMMIT/ROLLBACK PREPARED */
+ if (MySerializableXact->pid != 0)
+ Assert(IsolationIsSerializable());
+
+ /* We'd better not already be on the cleanup list. */
+ Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
+
+ topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * We don't hold a lock here, assuming that TransactionId is atomic!
+ *
+ * If this value is changing, we don't care that much whether we get the
+ * old or new value -- it is just used to determine how far
+ * GlobalSerizableXmin must advance before this transaction can be cleaned
+ * fully cleaned up. The worst that could happen is we wait for ome more
+ * transaction to complete before freeing some RAM; correctness of visible
+ * behavior is not affected.
+ */
+ MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
+
+ /*
+ * If it's not a commit it's a rollback, and we can clear our locks
+ * immediately.
+ */
+ if (isCommit)
+ {
+ MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
+ MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+ /* Recognize implicit read-only transaction (commit without write). */
+ if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
+ MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
+ }
+ else
+ {
+ MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
+ }
+
+ if (!topLevelIsDeclaredReadOnly)
+ {
+ Assert(PredXact->WritableSxactCount > 0);
+ if (--(PredXact->WritableSxactCount) == 0)
+ {
+ /*
+ * Release predicate locks and rw-conflicts in for all committed
+ * transactions. There are no longer any transactions which might
+ * conflict with the locks and no chance for new transactions to
+ * overlap. Similarly, existing conflicts in can't cause pivots,
+ * and any conflicts in which could have completed a dangerous
+ * structure would already have caused a rollback, so any
+ * remaining ones must be benign.
+ */
+ PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
+ }
+ }
+ else
+ {
+ /*
+ * Read-only transactions: clear the list of transactions that might
+ * make us unsafe. Note that we use 'inLink' for the iteration as
+ * opposed to 'outLink' for the r/w xacts.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, inLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
+
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+
+ /* Check for conflict out to old committed transactions. */
+ if (isCommit
+ && !SxactIsReadOnly(MySerializableXact)
+ && SxactHasSummaryConflictOut(MySerializableXact))
+ {
+ MySerializableXact->SeqNo.earliestOutConflictCommit =
+ FirstNormalSerCommitSeqNo;
+ MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
+ }
+
+ /*
+ * Release all outConflicts to committed transactions. If we're rolling
+ * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
+ * previously committed transactions.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ (SHM_QUEUE *) &MySerializableXact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ if (isCommit
+ && !SxactIsReadOnly(MySerializableXact)
+ && SxactIsCommitted(conflict->sxactIn))
+ {
+ if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
+ || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
+ MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
+ MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
+ }
+
+ if (!isCommit
+ || SxactIsCommitted(conflict->sxactIn)
+ || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+
+ /*
+ * Release all inConflicts from committed and read-only transactions. If
+ * we're rolling back, clear them all.
+ */
+ conflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+
+ if (!isCommit
+ || SxactIsCommitted(conflict->sxactOut)
+ || SxactIsReadOnly(conflict->sxactOut))
+ ReleaseRWConflict(conflict);
+
+ conflict = nextConflict;
+ }
+
+ if (!topLevelIsDeclaredReadOnly)
+ {
+ /*
+ * Remove ourselves from the list of possible conflicts for concurrent
+ * READ ONLY transactions, flagging them as unsafe if we have a
+ * conflict out. If any are waiting DEFERRABLE transactions, wake them
+ * up if they are known safe or known unsafe.
+ */
+ possibleUnsafeConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ (SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ offsetof(RWConflictData, outLink));
+ while (possibleUnsafeConflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->possibleUnsafeConflicts,
+ &possibleUnsafeConflict->outLink,
+ offsetof(RWConflictData, outLink));
+
+ roXact = possibleUnsafeConflict->sxactIn;
+ Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
+ Assert(SxactIsReadOnly(roXact));
+
+ /* Mark conflicted if necessary. */
+ if (isCommit
+ && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE)
+ && SxactHasConflictOut(MySerializableXact)
+ && (MySerializableXact->SeqNo.earliestOutConflictCommit
+ <= roXact->SeqNo.lastCommitBeforeSnapshot))
+ {
+ /*
+ * This releases possibleUnsafeConflict (as well as all other
+ * possible conflicts for roXact)
+ */
+ FlagSxactUnsafe(roXact);
+ }
+ else
+ {
+ ReleaseRWConflict(possibleUnsafeConflict);
+
+ /*
+ * If we were the last possible conflict, flag it safe. The
+ * transaction can now safely release its predicate locks (but
+ * that transaction's backend has to do that itself).
+ */
+ if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ roXact->flags |= SXACT_FLAG_RO_SAFE;
+ }
+
+ /*
+ * Wake up the process for a waiting DEFERRABLE transaction if we
+ * now know it's either safe or conflicted.
+ */
+ if (SxactIsDeferrableWaiting(roXact) &&
+ (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+ ProcSendSignal(roXact->pid);
+
+ possibleUnsafeConflict = nextConflict;
+ }
+ }
+
+ /*
+ * Check whether it's time to clean up old transactions. This can only be
+ * done when the last serializable transaction with the oldest xmin among
+ * serializable transactions completes. We then find the "new oldest"
+ * xmin and purge any transactions which finished before this transaction
+ * was launched.
+ */
+ needToClear = false;
+ if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ if (--(PredXact->SxactGlobalXminCount) == 0)
+ {
+ SetNewSxactGlobalXmin();
+ needToClear = true;
+ }
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+
+ /* Add this to the list of transactions to check for later cleanup. */
+ if (isCommit)
+ SHMQueueInsertBefore(FinishedSerializableTransactions,
+ (SHM_QUEUE *) &(MySerializableXact->finishedLink));
+
+ if (!isCommit)
+ ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact,
+ false, false);
+
+ LWLockRelease(SerializableFinishedListLock);
+
+ if (needToClear)
+ ClearOldPredicateLocks();
+
+ MySerializableXact = InvalidSerializableXact;
+
+ /* Delete per-transaction lock table */
+ if (LocalPredicateLockHash != NULL)
+ {
+ hash_destroy(LocalPredicateLockHash);
+ LocalPredicateLockHash = NULL;
+ }
+}
+
+/*
+ * ReleasePredicateLocksIfROSafe
+ * Check if the current transaction is read only and operating on
+ * a safe snapshot. If so, release predicate locks and return
+ * true.
+ *
+ * A transaction is flagged as RO_SAFE if all concurrent R/W
+ * transactions commit without having conflicts out to an earlier
+ * snapshot, thus ensuring that no conflicts are possible for this
+ * transaction. Thus, we call this function as part of the
+ * SkipSerialization check on all public interface methods.
+ */
+static bool
+ReleasePredicateLocksIfROSafe(void)
+{
+ if (SxactIsROSafe(MySerializableXact))
+ {
+ ReleasePredicateLocks(false);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*
+ * Clear old predicate locks.
+ */
+static void
+ClearOldPredicateLocks(void)
+{
+ SERIALIZABLEXACT *finishedSxact;
+ PREDICATELOCK *predlock;
+ int i;
+ HASH_SEQ_STATUS seqstat;
+ PREDICATELOCKTARGET *locktarget;
+
+ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
+ finishedSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ FinishedSerializableTransactions,
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (finishedSxact)
+ {
+ SERIALIZABLEXACT *nextSxact;
+
+ nextSxact = (SERIALIZABLEXACT *)
+ SHMQueueNext(FinishedSerializableTransactions,
+ &(finishedSxact->finishedLink),
+ offsetof(SERIALIZABLEXACT, finishedLink));
+ if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
+ || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
+ PredXact->SxactGlobalXmin))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ SHMQueueDelete(&(finishedSxact->finishedLink));
+ ReleaseOneSerializableXact(finishedSxact, false, false);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
+ && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ReleaseOneSerializableXact(finishedSxact,
+ !SxactIsReadOnly(finishedSxact),
+ false);
+ PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ break;
+ finishedSxact = nextSxact;
+ }
+ LWLockRelease(SerializableXactHashLock);
+
+ /*
+ * Loop through predicate locks on dummy transaction for summarized data.
+ */
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&OldCommittedSxact->predicateLocks,
+ &OldCommittedSxact->predicateLocks,
+ offsetof(PREDICATELOCK, xactLink));
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ while (predlock)
+ {
+ PREDICATELOCK *nextpredlock;
+ bool canDoPartialCleanup;
+
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&OldCommittedSxact->predicateLocks,
+ &predlock->xactLink,
+ offsetof(PREDICATELOCK, xactLink));
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
+ LWLockRelease(SerializableXactHashLock);
+
+ if (canDoPartialCleanup)
+ {
+ PREDICATELOCKTAG tag;
+ SHM_QUEUE *targetLink;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+
+ tag = predlock->tag;
+ targetLink = &(predlock->targetLink);
+ target = tag.myTarget;
+ targettag = target->tag;
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(targetLink);
+ SHMQueueDelete(&(predlock->xactLink));
+
+ hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_REMOVE, NULL);
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(partitionLock);
+ }
+
+ predlock = nextpredlock;
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+ LWLockRelease(SerializableFinishedListLock);
+
+ if (!PredXact->NeedTargetLinkCleanup)
+ return;
+
+ /*
+ * Clean up any targets which were disconnected from a prior version with
+ * no predicate locks attached.
+ */
+ for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
+
+ hash_seq_init(&seqstat, PredicateLockTargetHash);
+ while ((locktarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
+ {
+ if (SHMQueueEmpty(&locktarget->predicateLocks)
+ && locktarget->priorVersionOfRow == NULL
+ && locktarget->nextVersionOfRow == NULL)
+ {
+ hash_search(PredicateLockTargetHash, &locktarget->tag,
+ HASH_REMOVE, NULL);
+ }
+ }
+
+ PredXact->NeedTargetLinkCleanup = false;
+
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ LWLockRelease(FirstPredicateLockMgrLock + i);
+}
+
+/*
+ * This is the normal way to delete anything from any of the predicate
+ * locking hash tables. Given a transaction which we know can be deleted:
+ * delete all predicate locks held by that transaction and any predicate
+ * lock targets which are now unreferenced by a lock; delete all conflicts
+ * for the transaction; delete all xid values for the transaction; then
+ * delete the transaction.
+ *
+ * When the partial flag is set, we can release all predicate locks and
+ * out-conflict information -- we've established that there are no longer
+ * any overlapping read write transactions for which this transaction could
+ * matter.
+ *
+ * When the summarize flag is set, we've run short of room for sxact data
+ * and must summarize to the SLRU. Predicate locks are transferred to a
+ * dummy "old" transaction, with duplicate locks on a single target
+ * collapsing to a single lock with the "latest" commitSeqNo from among
+ * the conflicting locks..
+ */
+static void
+ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
+ bool summarize)
+{
+ PREDICATELOCK *predlock;
+ SERIALIZABLEXIDTAG sxidtag;
+ RWConflict conflict,
+ nextConflict;
+
+ Assert(sxact != NULL);
+ Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
+ Assert(LWLockHeldByMe(SerializableFinishedListLock));
+
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+ while (predlock)
+ {
+ PREDICATELOCK *nextpredlock;
+ PREDICATELOCKTAG tag;
+ SHM_QUEUE *targetLink;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCKTARGETTAG targettag;
+ uint32 targettaghash;
+ LWLockId partitionLock;
+
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(predlock->xactLink),
+ offsetof(PREDICATELOCK, xactLink));
+
+ tag = predlock->tag;
+ targetLink = &(predlock->targetLink);
+ target = tag.myTarget;
+ targettag = target->tag;
+ targettaghash = PredicateLockTargetTagHashCode(&targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+
+ SHMQueueDelete(targetLink);
+
+ hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_REMOVE, NULL);
+ if (summarize)
+ {
+ bool found;
+
+ /* Fold into dummy transaction list. */
+ tag.myXact = OldCommittedSxact;
+ predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
+ PredicateLockHashCodeFromTargetHashCode(&tag,
+ targettaghash),
+ HASH_ENTER, &found);
+ if (!predlock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You might need to increase max_predicate_locks_per_transaction.")));
+ if (found)
+ {
+ if (predlock->commitSeqNo < sxact->commitSeqNo)
+ predlock->commitSeqNo = sxact->commitSeqNo;
+ }
+ else
+ {
+ SHMQueueInsertBefore(&(target->predicateLocks),
+ &(predlock->targetLink));
+ SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
+ &(predlock->xactLink));
+ predlock->commitSeqNo = sxact->commitSeqNo;
+ }
+ }
+ else
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(partitionLock);
+
+ predlock = nextpredlock;
+ }
+
+ /*
+ * Rather than retail removal, just re-init the head after we've run
+ * through the list.
+ */
+ SHMQueueInit(&sxact->predicateLocks);
+
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ sxidtag.xid = sxact->topXid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ if (!partial)
+ {
+ /* Release all outConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->outConflicts,
+ &sxact->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ if (summarize)
+ conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+ }
+
+ /* Release all inConflicts. */
+ conflict = (RWConflict)
+ SHMQueueNext(&sxact->inConflicts,
+ &sxact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ nextConflict = (RWConflict)
+ SHMQueueNext(&sxact->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ if (summarize)
+ conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ ReleaseRWConflict(conflict);
+ conflict = nextConflict;
+ }
+
+ if (!partial)
+ {
+ /* Get rid of the xid and the record of the transaction itself. */
+ if (sxidtag.xid != InvalidTransactionId)
+ hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
+ ReleasePredXact(sxact);
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*
+ * Tests whether the given top level transaction is concurrent with
+ * (overlaps) our current transaction.
+ *
+ * We need to identify the top level transaction for SSI, anyway, so pass
+ * that to this function to save the overhead of checking the snapshot's
+ * subxip array.
+ */
+static bool
+XidIsConcurrent(TransactionId xid)
+{
+ Snapshot snap;
+ uint32 i;
+
+ Assert(TransactionIdIsValid(xid));
+ Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+ snap = GetTransactionSnapshot();
+
+ if (TransactionIdPrecedes(xid, snap->xmin))
+ return false;
+
+ if (TransactionIdFollowsOrEquals(xid, snap->xmax))
+ return true;
+
+ for (i = 0; i < snap->xcnt; i++)
+ {
+ if (xid == snap->xip[i])
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * CheckForSerializableConflictOut
+ * We are reading a tuple which has been modified. If it is visible to
+ * us but has been deleted, that indicates a rw-conflict out. If it's
+ * not visible and was created by a concurrent (overlapping)
+ * serializable transaction, that is also a rw-conflict out,
+ *
+ * We will determine the top level xid of the writing transaction with which
+ * we may be in conflict, and check for overlap with our own transaction.
+ * If the transactions overlap (i.e., they cannot see each other's writes),
+ * then we have a conflict out.
+ *
+ * This function should be called just about anywhere in heapam.c that a
+ * tuple has been read. There is currently no known reason to call this
+ * function from an index AM.
+ */
+void
+CheckForSerializableConflictOut(const bool visible, const Relation relation,
+ const HeapTuple tuple, const Buffer buffer)
+{
+ TransactionId xid;
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXACT *sxact;
+ HTSV_Result htsvResult;
+
+ if (SkipSerialization(relation))
+ return;
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during conflict out checking."),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ /*
+ * Check to see whether the tuple has been written to by a concurrent
+ * transaction, either to create it not visible to us, or to delete it
+ * while it is visible to us. The "visible" bool indicates whether the
+ * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
+ * is going on with it.
+ */
+ htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
+ switch (htsvResult)
+ {
+ case HEAPTUPLE_LIVE:
+ if (visible)
+ return;
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ break;
+ case HEAPTUPLE_RECENTLY_DEAD:
+ if (!visible)
+ return;
+ xid = HeapTupleHeaderGetXmax(tuple->t_data);
+ break;
+ case HEAPTUPLE_DELETE_IN_PROGRESS:
+ xid = HeapTupleHeaderGetXmax(tuple->t_data);
+ break;
+ case HEAPTUPLE_INSERT_IN_PROGRESS:
+ xid = HeapTupleHeaderGetXmin(tuple->t_data);
+ break;
+ case HEAPTUPLE_DEAD:
+ return;
+ default:
+
+ /*
+ * The only way to get to this default clause is if a new value is
+ * added to the enum type without adding it to this switch
+ * statement. That's a bug, so elog.
+ */
+ elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
+
+ /*
+ * In spite of having all enum values covered and calling elog on
+ * this default, some compilers think this is a code path which
+ * allows xid to be used below without initialization. Silence
+ * that warning.
+ */
+ xid = InvalidTransactionId;
+ }
+ Assert(TransactionIdIsValid(xid));
+ Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+ /*
+ * Find top level xid. Bail out if xid is too early to be a conflict, or
+ * if it's our own xid.
+ */
+ if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
+ return;
+ xid = SubTransGetTopmostTransaction(xid);
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return;
+ if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
+ return;
+
+ /*
+ * Find sxact or summarized info for the top level xid.
+ */
+ sxidtag.xid = xid;
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ if (!sxid)
+ {
+ /*
+ * Transaction not found in "normal" SSI structures. Check whether it
+ * got pushed out to SLRU storage for "old committed" transactions.
+ */
+ SerCommitSeqNo conflictCommitSeqNo;
+
+ conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
+ if (conflictCommitSeqNo != 0)
+ {
+ if (conflictCommitSeqNo != InvalidSerCommitSeqNo
+ && (!SxactIsReadOnly(MySerializableXact)
+ || conflictCommitSeqNo
+ <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to old pivot %u.", xid),
+ errhint("The transaction might succeed if retried.")));
+
+ if (SxactHasSummaryConflictIn(MySerializableXact)
+ || !SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
+ errhint("The transaction might succeed if retried.")));
+
+ MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ }
+
+ /* It's not serializable or otherwise not important. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ sxact = sxid->myXact;
+ Assert(TransactionIdEquals(sxact->topXid, xid));
+ if (sxact == MySerializableXact
+ || SxactIsRolledBack(sxact)
+ || SxactIsMarkedForDeath(sxact))
+ {
+ /* We can't conflict with our own transaction or one rolled back. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * We have a conflict out to a transaction which has a conflict out to a
+ * summarized transaction. That summarized transaction must have
+ * committed first, and we can't tell when it committed in relation to our
+ * snapshot acquisition, so something needs to be cancelled.
+ */
+ if (SxactHasSummaryConflictOut(sxact))
+ {
+ if (!SxactIsPrepared(sxact))
+ {
+ sxact->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+ else
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to old pivot."),
+ errhint("The transaction might succeed if retried.")));
+ }
+ }
+
+ /*
+ * If this is a read-only transaction and the writing transaction has
+ * committed, and it doesn't have a rw-conflict to a transaction which
+ * committed before it, no conflict.
+ */
+ if (SxactIsReadOnly(MySerializableXact)
+ && SxactIsCommitted(sxact)
+ && !SxactHasSummaryConflictOut(sxact)
+ && (!SxactHasConflictOut(sxact)
+ || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
+ {
+ /* Read-only transaction will appear to run first. No conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ if (!XidIsConcurrent(xid))
+ {
+ /* This write was already in our snapshot; no conflict. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
+ {
+ /* We don't want duplicate conflict records in the list. */
+ LWLockRelease(SerializableXactHashLock);
+ return;
+ }
+
+ /*
+ * Flag the conflict. But first, if this conflict creates a dangerous
+ * structure, ereport an error.
+ */
+ FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*
+ * Check a particular target for rw-dependency conflict in. This will
+ * also check prior versions of a tuple, if any.
+ */
+static void
+CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
+{
+ PREDICATELOCKTARGETTAG nexttargettag;
+ PREDICATELOCKTARGETTAG thistargettag;
+
+ for (;;)
+ {
+ if (!CheckSingleTargetForConflictsIn(targettag, &nexttargettag))
+ break;
+ thistargettag = nexttargettag;
+ targettag = &thistargettag;
+ }
+}
+
+/*
+ * Check a particular target for rw-dependency conflict in. If the tuple
+ * has prior versions, returns true and *nexttargettag is set to the tag
+ * of the prior tuple version.
+ */
+static bool
+CheckSingleTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag,
+ PREDICATELOCKTARGETTAG *nexttargettag)
+{
+ uint32 targettaghash;
+ LWLockId partitionLock;
+ PREDICATELOCKTARGET *target;
+ PREDICATELOCK *predlock;
+ bool hasnexttarget = false;
+
+ Assert(MySerializableXact != InvalidSerializableXact);
+
+ /*
+ * The same hash and LW lock apply to the lock target and the lock itself.
+ */
+ targettaghash = PredicateLockTargetTagHashCode(targettag);
+ partitionLock = PredicateLockHashPartitionLock(targettaghash);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ LWLockAcquire(PredicateLockNextRowLinkLock, LW_SHARED);
+ target = (PREDICATELOCKTARGET *)
+ hash_search_with_hash_value(PredicateLockTargetHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ if (!target)
+ {
+ /* Nothing has this target locked; we're done here. */
+ LWLockRelease(PredicateLockNextRowLinkLock);
+ LWLockRelease(partitionLock);
+ return false;
+ }
+
+ /*
+ * If the target is linked to a prior version of the row, save the tag so
+ * that it can be used for iterative calls to this function.
+ */
+ if (target->priorVersionOfRow != NULL)
+ {
+ *nexttargettag = target->priorVersionOfRow->tag;
+ hasnexttarget = true;
+ }
+ LWLockRelease(PredicateLockNextRowLinkLock);
+
+ /*
+ * Each lock for an overlapping transaction represents a conflict: a
+ * rw-dependency in to this transaction.
+ */
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ while (predlock)
+ {
+ SHM_QUEUE *predlocktargetlink;
+ PREDICATELOCK *nextpredlock;
+ SERIALIZABLEXACT *sxact;
+
+ predlocktargetlink = &(predlock->targetLink);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ predlocktargetlink,
+ offsetof(PREDICATELOCK, targetLink));
+
+ sxact = predlock->tag.myXact;
+ if (sxact == MySerializableXact)
+ {
+ /*
+ * If we're getting a write lock on the tuple, we don't need a
+ * predicate (SIREAD) lock. At this point our transaction already
+ * has an ExclusiveRowLock on the relation, so we are OK to drop
+ * the predicate lock on the tuple, if found, without fearing that
+ * another write against the tuple will occur before the MVCC
+ * information makes it to the buffer.
+ */
+ if (GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
+ {
+ uint32 predlockhashcode;
+ PREDICATELOCKTARGET *rmtarget = NULL;
+ PREDICATELOCK *rmpredlock;
+ LOCALPREDICATELOCK *locallock,
+ *rmlocallock;
+
+ /*
+ * This is a tuple on which we have a tuple predicate lock. We
+ * only have shared LW locks now; release those, and get
+ * exclusive locks only while we modify things.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove the predicate lock from shared memory, if it wasn't
+ * removed while the locks were released. One way that could
+ * happen is from autovacuum cleaning up an index.
+ */
+ predlockhashcode = PredicateLockHashCodeFromTargetHashCode
+ (&(predlock->tag), targettaghash);
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_FIND, NULL);
+ if (rmpredlock)
+ {
+ Assert(rmpredlock == predlock);
+
+ SHMQueueDelete(predlocktargetlink);
+ SHMQueueDelete(&(predlock->xactLink));
+
+ rmpredlock = (PREDICATELOCK *)
+ hash_search_with_hash_value(PredicateLockHash,
+ &(predlock->tag),
+ predlockhashcode,
+ HASH_REMOVE, NULL);
+ Assert(rmpredlock == predlock);
+
+ RemoveTargetIfNoLongerUsed(target, targettaghash);
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+
+ locallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_FIND, NULL);
+ Assert(locallock != NULL);
+ Assert(locallock->held);
+ locallock->held = false;
+
+ if (locallock->childLocks == 0)
+ {
+ rmlocallock = (LOCALPREDICATELOCK *)
+ hash_search_with_hash_value(LocalPredicateLockHash,
+ targettag, targettaghash,
+ HASH_REMOVE, NULL);
+ Assert(rmlocallock == locallock);
+ }
+
+ DecrementParentLocks(targettag);
+
+ /*
+ * If we've cleaned up the last of the predicate locks for
+ * the target, bail out before re-acquiring the locks.
+ */
+ if (rmtarget)
+ return hasnexttarget;
+
+ /*
+ * The list has been altered. Start over at the front.
+ */
+ LWLockAcquire(partitionLock, LW_SHARED);
+ nextpredlock = (PREDICATELOCK *)
+ SHMQueueNext(&(target->predicateLocks),
+ &(target->predicateLocks),
+ offsetof(PREDICATELOCK, targetLink));
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ else
+ {
+ /*
+ * The predicate lock was cleared while we were attempting
+ * to upgrade our lightweight locks. Revert to the shared
+ * locks.
+ */
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+ LWLockRelease(SerializablePredicateLockListLock);
+ LWLockAcquire(partitionLock, LW_SHARED);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+ }
+ }
+ else if (!SxactIsRolledBack(sxact)
+ && (!SxactIsCommitted(sxact)
+ || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
+ sxact->finishedBefore))
+ && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
+
+ LWLockRelease(SerializableXactHashLock);
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ }
+
+ predlock = nextpredlock;
+ }
+ LWLockRelease(SerializableXactHashLock);
+ LWLockRelease(partitionLock);
+
+ return hasnexttarget;
+}
+
+/*
+ * CheckForSerializableConflictIn
+ * We are writing the given tuple. If that indicates a rw-conflict
+ * in from another serializable transaction, take appropriate action.
+ *
+ * Skip checking for any granularity for which a parameter is missing.
+ *
+ * A tuple update or delete is in conflict if we have a predicate lock
+ * against the relation or page in which the tuple exists, or against the
+ * tuple itself.
+ */
+void
+CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
+ const Buffer buffer)
+{
+ PREDICATELOCKTARGETTAG targettag;
+
+ if (SkipSerialization(relation))
+ return;
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during conflict in checking."),
+ errhint("The transaction might succeed if retried.")));
+
+ MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
+
+ /*
+ * It is important that we check for locks from the finest granularity to
+ * the coarsest granularity, so that granularity promotion doesn't cause
+ * us to miss a lock. The new (coarser) lock will be acquired before the
+ * old (finer) locks are released.
+ *
+ * It is not possible to take and hold a lock across the checks for all
+ * granularities because each target could be in a separate partition.
+ */
+ if (tuple != NULL)
+ {
+ SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
+ ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ if (BufferIsValid(buffer))
+ {
+ SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id,
+ BufferGetBlockNumber(buffer));
+ CheckTargetForConflictsIn(&targettag);
+ }
+
+ SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
+ relation->rd_node.dbNode,
+ relation->rd_id);
+ CheckTargetForConflictsIn(&targettag);
+}
+
+/*
+ * Flag a rw-dependency between two serializable transactions.
+ *
+ * The caller is responsible for ensuring that we have a LW lock on
+ * the transaction hash table.
+ */
+static void
+FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
+{
+ Assert(reader != writer);
+
+ /* First, see if this conflict causes failure. */
+ OnConflict_CheckForSerializationFailure(reader, writer);
+
+ /* Actually do the conflict flagging. */
+ if (reader == OldCommittedSxact)
+ writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ else if (writer == OldCommittedSxact)
+ reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+ else
+ SetRWConflict(reader, writer);
+}
+
+/*
+ * Check whether we should roll back one of these transactions
+ * instead of flagging a new rw-conflict.
+ */
+static void
+OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
+ SERIALIZABLEXACT *writer)
+{
+ bool failure;
+ RWConflict conflict;
+
+ Assert(LWLockHeldByMe(SerializableXactHashLock));
+
+ failure = false;
+
+ /*
+ * Check for already-committed writer with rw-conflict out flagged. This
+ * means that the reader must immediately fail.
+ */
+ if (SxactIsCommitted(writer)
+ && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
+ failure = true;
+
+ /*
+ * Check whether the reader has become a pivot with a committed writer. If
+ * so, we must roll back unless every in-conflict either committed before
+ * the writer committed or is READ ONLY and overlaps the writer.
+ */
+ if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
+ {
+ if (SxactHasSummaryConflictIn(reader))
+ {
+ failure = true;
+ conflict = NULL;
+ }
+ else
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &reader->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (conflict)
+ {
+ if (!SxactIsRolledBack(conflict->sxactOut)
+ && (!SxactIsCommitted(conflict->sxactOut)
+ || conflict->sxactOut->commitSeqNo >= writer->commitSeqNo)
+ && (!SxactIsReadOnly(conflict->sxactOut)
+ || conflict->sxactOut->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&reader->inConflicts,
+ &conflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ }
+
+ /*
+ * Check whether the writer has become a pivot with an out-conflict
+ * committed transaction, while neither reader nor writer is committed. If
+ * the reader is a READ ONLY transaction, there is only a serialization
+ * failure if an out-conflict transaction causing the pivot committed
+ * before the reader acquired its snapshot. (That is, the reader must not
+ * have been concurrent with the out-conflict transaction.)
+ */
+ if (!failure && !SxactIsCommitted(writer))
+ {
+ if (SxactHasSummaryConflictOut(reader))
+ {
+ failure = true;
+ conflict = NULL;
+ }
+ else
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &writer->outConflicts,
+ offsetof(RWConflictData, outLink));
+ while (conflict)
+ {
+ if ((reader == conflict->sxactIn && SxactIsCommitted(reader))
+ || (SxactIsCommitted(conflict->sxactIn)
+ && !SxactIsCommitted(reader)
+ && (!SxactIsReadOnly(reader)
+ || conflict->sxactIn->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)))
+ {
+ failure = true;
+ break;
+ }
+ conflict = (RWConflict)
+ SHMQueueNext(&writer->outConflicts,
+ &conflict->outLink,
+ offsetof(RWConflictData, outLink));
+ }
+ }
+
+ if (failure)
+ {
+ if (MySerializableXact == writer)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as pivot, during write."),
+ errhint("The transaction might succeed if retried.")));
+ }
+ else if (SxactIsPrepared(writer))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on conflict out to pivot %u, during read.", writer->topXid),
+ errhint("The transaction might succeed if retried.")));
+ }
+ writer->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ }
+}
+
+/*
+ * PreCommit_CheckForSerializableConflicts
+ * Check for dangerous structures in a serializable transaction
+ * at commit.
+ *
+ * We're checking for a dangerous structure as each conflict is recorded.
+ * The only way we could have a problem at commit is if this is the "out"
+ * side of a pivot, and neither the "in" side nor the pivot has yet
+ * committed.
+ *
+ * If a dangerous structure is found, the pivot (the near conflict) is
+ * marked for death, because rolling back another transaction might mean
+ * that we flail without ever making progress. This transaction is
+ * committing writes, so letting it commit ensures progress. If we
+ * cancelled the far conflict, it might immediately fail again on retry.
+ */
+void
+PreCommit_CheckForSerializationFailure(void)
+{
+ RWConflict nearConflict;
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ Assert(IsolationIsSerializable());
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ if (SxactIsMarkedForDeath(MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to read/write dependencies among transactions"),
+ errdetail("Cancelled on identification as a pivot, during commit attempt."),
+ errhint("The transaction might succeed if retried.")));
+ }
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ (SHM_QUEUE *) &MySerializableXact->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (nearConflict)
+ {
+ if (!SxactIsCommitted(nearConflict->sxactOut)
+ && !SxactIsRolledBack(nearConflict->sxactOut)
+ && !SxactIsMarkedForDeath(nearConflict->sxactOut))
+ {
+ RWConflict farConflict;
+
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &nearConflict->sxactOut->inConflicts,
+ offsetof(RWConflictData, inLink));
+ while (farConflict)
+ {
+ if (farConflict->sxactOut == MySerializableXact
+ || (!SxactIsCommitted(farConflict->sxactOut)
+ && !SxactIsReadOnly(farConflict->sxactOut)
+ && !SxactIsRolledBack(farConflict->sxactOut)
+ && !SxactIsMarkedForDeath(farConflict->sxactOut)))
+ {
+ nearConflict->sxactOut->flags |= SXACT_FLAG_MARKED_FOR_DEATH;
+ break;
+ }
+ farConflict = (RWConflict)
+ SHMQueueNext(&nearConflict->sxactOut->inConflicts,
+ &farConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+ }
+
+ nearConflict = (RWConflict)
+ SHMQueueNext((SHM_QUEUE *) &MySerializableXact->inConflicts,
+ &nearConflict->inLink,
+ offsetof(RWConflictData, inLink));
+ }
+
+ MySerializableXact->flags |= SXACT_FLAG_PREPARED;
+
+ LWLockRelease(SerializableXactHashLock);
+}
+
+/*------------------------------------------------------------------------*/
+
+/*
+ * Two-phase commit support
+ */
+
+/*
+ * AtPrepare_Locks
+ * Do the preparatory work for a PREPARE: make 2PC state file
+ * records for all predicate locks currently held.
+ */
+void
+AtPrepare_PredicateLocks(void)
+{
+ PREDICATELOCK *predlock;
+ SERIALIZABLEXACT *sxact;
+ TwoPhasePredicateRecord record;
+ TwoPhasePredicateXactRecord *xactRecord;
+ TwoPhasePredicateLockRecord *lockRecord;
+
+ sxact = (SERIALIZABLEXACT *) MySerializableXact;
+ xactRecord = &(record.data.xactRecord);
+ lockRecord = &(record.data.lockRecord);
+
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ /* Generate a xact record for our SERIALIZABLEXACT */
+ record.type = TWOPHASEPREDICATERECORD_XACT;
+ xactRecord->xmin = MySerializableXact->xmin;
+ xactRecord->flags = MySerializableXact->flags;
+
+ /*
+ * Tweak the flags. Since we're not going to output the inConflicts and
+ * outConflicts lists, if they're non-empty we'll represent that by
+ * setting the appropriate summary conflict flags.
+ */
+ if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->inConflicts))
+ xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
+ if (!SHMQueueEmpty((SHM_QUEUE *) &MySerializableXact->outConflicts))
+ xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
+ &record, sizeof(record));
+
+ /*
+ * Generate a lock record for each lock.
+ *
+ * To do this, we need to walk the predicate lock list in our sxact rather
+ * than using the local predicate lock table because the latter is not
+ * guaranteed to be accurate.
+ */
+ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(sxact->predicateLocks),
+ offsetof(PREDICATELOCK, xactLink));
+
+ while (predlock != NULL)
+ {
+ record.type = TWOPHASEPREDICATERECORD_LOCK;
+ lockRecord->target = predlock->tag.myTarget->tag;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
+ &record, sizeof(record));
+
+ predlock = (PREDICATELOCK *)
+ SHMQueueNext(&(sxact->predicateLocks),
+ &(predlock->xactLink),
+ offsetof(PREDICATELOCK, xactLink));
+ }
+
+ LWLockRelease(SerializablePredicateLockListLock);
+}
+
+/*
+ * PostPrepare_Locks
+ * Clean up after successful PREPARE. Unlike the non-predicate
+ * lock manager, we do not need to transfer locks to a dummy
+ * PGPROC because our SERIALIZABLEXACT will stay around
+ * anyway. We only need to clean up our local state.
+ */
+void
+PostPrepare_PredicateLocks(TransactionId xid)
+{
+ if (MySerializableXact == InvalidSerializableXact)
+ return;
+
+ Assert(SxactIsPrepared(MySerializableXact));
+
+ MySerializableXact->pid = 0;
+
+ hash_destroy(LocalPredicateLockHash);
+ LocalPredicateLockHash = NULL;
+
+ MySerializableXact = InvalidSerializableXact;
+}
+
+/*
+ * PredicateLockTwoPhaseFinish
+ * Release a prepared transaction's predicate locks once it
+ * commits or aborts.
+ */
+void
+PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
+{
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXIDTAG sxidtag;
+
+ sxidtag.xid = xid;
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ LWLockRelease(SerializableXactHashLock);
+
+ /* xid will not be found if it wasn't a serializable transaction */
+ if (sxid == NULL)
+ return;
+
+ /* Release its locks */
+ MySerializableXact = sxid->myXact;
+ ReleasePredicateLocks(isCommit);
+}
+
+/*
+ * Re-acquire a predicate lock belonging to a transaction that was prepared.
+ */
+void
+predicatelock_twophase_recover(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+{
+ TwoPhasePredicateRecord *record;
+
+ Assert(len == sizeof(TwoPhasePredicateRecord));
+
+ record = (TwoPhasePredicateRecord *) recdata;
+
+ Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
+ (record->type == TWOPHASEPREDICATERECORD_LOCK));
+
+ if (record->type == TWOPHASEPREDICATERECORD_XACT)
+ {
+ /* Per-transaction record. Set up a SERIALIZABLEXACT. */
+ TwoPhasePredicateXactRecord *xactRecord;
+ SERIALIZABLEXACT *sxact;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXIDTAG sxidtag;
+ bool found;
+
+ xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ sxact = CreatePredXact();
+ if (!sxact)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+
+ /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
+ sxact->vxid.backendId = InvalidBackendId;
+ sxact->vxid.localTransactionId = (LocalTransactionId) xid;
+ sxact->pid = 0;
+
+ /* a prepared xact hasn't committed yet */
+ sxact->commitSeqNo = InvalidSerCommitSeqNo;
+ sxact->finishedBefore = InvalidTransactionId;
+
+ sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
+
+
+ /*
+ * We don't need the details of a prepared transaction's conflicts,
+ * just whether it had conflicts in or out (which we get from the
+ * flags)
+ */
+ SHMQueueInit(&(sxact->outConflicts));
+ SHMQueueInit(&(sxact->inConflicts));
+
+ /*
+ * Don't need to track this; no transactions running at the time the
+ * recovered xact started are still active, except possibly other
+ * prepared xacts and we don't care whether those are RO_SAFE or not.
+ */
+ SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+
+ SHMQueueInit(&(sxact->predicateLocks));
+ SHMQueueElemInit(&(sxact->finishedLink));
+
+ sxact->topXid = xid;
+ sxact->xmin = xactRecord->xmin;
+ sxact->flags = xactRecord->flags;
+ Assert(SxactIsPrepared(sxact));
+ if (!SxactIsReadOnly(sxact))
+ {
+ ++(PredXact->WritableSxactCount);
+ Assert(PredXact->WritableSxactCount <=
+ (MaxBackends + max_prepared_xacts));
+ }
+
+ /* Register the transaction's xid */
+ sxidtag.xid = xid;
+ sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
+ &sxidtag,
+ HASH_ENTER, &found);
+ if (!sxid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ Assert(!found);
+ sxid->myXact = (SERIALIZABLEXACT *) sxact;
+
+ /*
+ * Update global xmin. Note that this is a special case compared to
+ * registering a normal transaction, because the global xmin might go
+ * backwards. That's OK, because until recovery is over we're not
+ * going to complete any transactions or create any non-prepared
+ * transactions, so there's no danger of throwing away.
+ */
+ if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
+ (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
+ {
+ PredXact->SxactGlobalXmin = sxact->xmin;
+ PredXact->SxactGlobalXminCount = 1;
+ OldSerXidSetActiveSerXmin(sxact->xmin);
+ }
+ else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
+ {
+ Assert(PredXact->SxactGlobalXminCount > 0);
+ PredXact->SxactGlobalXminCount++;
+ }
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+ else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
+ {
+ /* Lock record. Recreate the PREDICATELOCK */
+ TwoPhasePredicateLockRecord *lockRecord;
+ SERIALIZABLEXID *sxid;
+ SERIALIZABLEXACT *sxact;
+ SERIALIZABLEXIDTAG sxidtag;
+ uint32 targettaghash;
+
+ lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
+ targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
+
+ LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ sxidtag.xid = xid;
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ LWLockRelease(SerializableXactHashLock);
+
+ Assert(sxid != NULL);
+ sxact = sxid->myXact;
+ Assert(sxact != InvalidSerializableXact);
+
+ CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
+ }
+}