aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/lmgr/predicate.c701
-rw-r--r--src/include/storage/predicate_internals.h48
2 files changed, 249 insertions, 500 deletions
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 327adef5d3c..11decb74b2a 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -260,7 +260,7 @@
#define NPREDICATELOCKTARGETENTS() \
mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
-#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
+#define SxactIsOnFinishedList(sxact) (!dlist_node_is_detached(&(sxact)->finishedLink))
/*
* Note that a sxact is marked "prepared" once it has passed
@@ -392,7 +392,7 @@ static RWConflictPoolHeader RWConflictPool;
static HTAB *SerializableXidHash;
static HTAB *PredicateLockTargetHash;
static HTAB *PredicateLockHash;
-static SHM_QUEUE *FinishedSerializableTransactions;
+static dlist_head *FinishedSerializableTransactions;
/*
* Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
@@ -430,8 +430,6 @@ static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact;
static SERIALIZABLEXACT *CreatePredXact(void);
static void ReleasePredXact(SERIALIZABLEXACT *sxact);
-static SERIALIZABLEXACT *FirstPredXact(void);
-static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
@@ -579,69 +577,24 @@ SerializationNeededForWrite(Relation relation)
static SERIALIZABLEXACT *
CreatePredXact(void)
{
- PredXactListElement ptle;
+ SERIALIZABLEXACT *sxact;
- ptle = (PredXactListElement)
- SHMQueueNext(&PredXact->availableList,
- &PredXact->availableList,
- offsetof(PredXactListElementData, link));
- if (!ptle)
+ if (dlist_is_empty(&PredXact->availableList))
return NULL;
- SHMQueueDelete(&ptle->link);
- SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
- return &ptle->sxact;
+ sxact = dlist_container(SERIALIZABLEXACT, xactLink,
+ dlist_pop_head_node(&PredXact->availableList));
+ dlist_push_tail(&PredXact->activeList, &sxact->xactLink);
+ return sxact;
}
static void
ReleasePredXact(SERIALIZABLEXACT *sxact)
{
- PredXactListElement ptle;
-
Assert(ShmemAddrIsValid(sxact));
- ptle = (PredXactListElement)
- (((char *) sxact)
- - offsetof(PredXactListElementData, sxact)
- + offsetof(PredXactListElementData, link));
- SHMQueueDelete(&ptle->link);
- SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
-}
-
-static SERIALIZABLEXACT *
-FirstPredXact(void)
-{
- PredXactListElement ptle;
-
- ptle = (PredXactListElement)
- SHMQueueNext(&PredXact->activeList,
- &PredXact->activeList,
- offsetof(PredXactListElementData, link));
- if (!ptle)
- return NULL;
-
- return &ptle->sxact;
-}
-
-static SERIALIZABLEXACT *
-NextPredXact(SERIALIZABLEXACT *sxact)
-{
- PredXactListElement ptle;
-
- Assert(ShmemAddrIsValid(sxact));
-
- ptle = (PredXactListElement)
- (((char *) sxact)
- - offsetof(PredXactListElementData, sxact)
- + offsetof(PredXactListElementData, link));
- ptle = (PredXactListElement)
- SHMQueueNext(&PredXact->activeList,
- &ptle->link,
- offsetof(PredXactListElementData, link));
- if (!ptle)
- return NULL;
-
- return &ptle->sxact;
+ dlist_delete(&sxact->xactLink);
+ dlist_push_tail(&PredXact->availableList, &sxact->xactLink);
}
/*------------------------------------------------------------------------*/
@@ -652,30 +605,30 @@ NextPredXact(SERIALIZABLEXACT *sxact)
static bool
RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
{
- RWConflict conflict;
+ dlist_iter iter;
Assert(reader != writer);
/* Check the ends of the purported conflict first. */
if (SxactIsDoomed(reader)
|| SxactIsDoomed(writer)
- || SHMQueueEmpty(&reader->outConflicts)
- || SHMQueueEmpty(&writer->inConflicts))
+ || dlist_is_empty(&reader->outConflicts)
+ || dlist_is_empty(&writer->inConflicts))
return false;
- /* A conflict is possible; walk the list to find out. */
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &reader->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ /*
+ * A conflict is possible; walk the list to find out.
+ *
+ * The unconstify is needed as we have no const version of
+ * dlist_foreach().
+ */
+ dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->outConflicts)
{
+ RWConflict conflict =
+ dlist_container(RWConflictData, outLink, iter.cur);
+
if (conflict->sxactIn == writer)
return true;
- conflict = (RWConflict)
- SHMQueueNext(&reader->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
}
/* No conflict found. */
@@ -690,22 +643,19 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
Assert(reader != writer);
Assert(!RWConflictExists(reader, writer));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
+ if (dlist_is_empty(&RWConflictPool->availableList))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
errhint("You might need to run fewer transactions at a time or increase max_connections.")));
- SHMQueueDelete(&conflict->outLink);
+ conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList);
+ dlist_delete(&conflict->outLink);
conflict->sxactOut = reader;
conflict->sxactIn = writer;
- SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
- SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
+ dlist_push_tail(&reader->outConflicts, &conflict->outLink);
+ dlist_push_tail(&writer->inConflicts, &conflict->inLink);
}
static void
@@ -718,39 +668,33 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
Assert(SxactIsReadOnly(roXact));
Assert(!SxactIsReadOnly(activeXact));
- conflict = (RWConflict)
- SHMQueueNext(&RWConflictPool->availableList,
- &RWConflictPool->availableList,
- offsetof(RWConflictData, outLink));
- if (!conflict)
+ if (dlist_is_empty(&RWConflictPool->availableList))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
errhint("You might need to run fewer transactions at a time or increase max_connections.")));
- SHMQueueDelete(&conflict->outLink);
+ conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList);
+ dlist_delete(&conflict->outLink);
conflict->sxactOut = activeXact;
conflict->sxactIn = roXact;
- SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
- &conflict->outLink);
- SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
- &conflict->inLink);
+ dlist_push_tail(&activeXact->possibleUnsafeConflicts, &conflict->outLink);
+ dlist_push_tail(&roXact->possibleUnsafeConflicts, &conflict->inLink);
}
static void
ReleaseRWConflict(RWConflict conflict)
{
- SHMQueueDelete(&conflict->inLink);
- SHMQueueDelete(&conflict->outLink);
- SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
+ dlist_delete(&conflict->inLink);
+ dlist_delete(&conflict->outLink);
+ dlist_push_tail(&RWConflictPool->availableList, &conflict->outLink);
}
static void
FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
{
- RWConflict conflict,
- nextConflict;
+ dlist_mutable_iter iter;
Assert(SxactIsReadOnly(sxact));
Assert(!SxactIsROSafe(sxact));
@@ -761,23 +705,15 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
* We know this isn't a safe snapshot, so we can stop looking for other
* potential conflicts.
*/
- conflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ dlist_foreach_modify(iter, &sxact->possibleUnsafeConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
+ RWConflict conflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
Assert(!SxactIsReadOnly(conflict->sxactOut));
Assert(sxact == conflict->sxactIn);
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
}
@@ -1242,8 +1178,8 @@ InitPredicateLocks(void)
{
int i;
- SHMQueueInit(&PredXact->availableList);
- SHMQueueInit(&PredXact->activeList);
+ dlist_init(&PredXact->availableList);
+ dlist_init(&PredXact->activeList);
PredXact->SxactGlobalXmin = InvalidTransactionId;
PredXact->SxactGlobalXminCount = 0;
PredXact->WritableSxactCount = 0;
@@ -1251,27 +1187,26 @@ InitPredicateLocks(void)
PredXact->CanPartialClearThrough = 0;
PredXact->HavePartialClearedThrough = 0;
requestSize = mul_size((Size) max_table_size,
- PredXactListElementDataSize);
+ sizeof(SERIALIZABLEXACT));
PredXact->element = ShmemAlloc(requestSize);
/* Add all elements to available list, clean. */
memset(PredXact->element, 0, requestSize);
for (i = 0; i < max_table_size; i++)
{
- LWLockInitialize(&PredXact->element[i].sxact.perXactPredicateListLock,
+ LWLockInitialize(&PredXact->element[i].perXactPredicateListLock,
LWTRANCHE_PER_XACT_PREDICATE_LIST);
- SHMQueueInsertBefore(&(PredXact->availableList),
- &(PredXact->element[i].link));
+ dlist_push_tail(&PredXact->availableList, &PredXact->element[i].xactLink);
}
PredXact->OldCommittedSxact = CreatePredXact();
SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
PredXact->OldCommittedSxact->prepareSeqNo = 0;
PredXact->OldCommittedSxact->commitSeqNo = 0;
PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
- SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
- SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
- SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
- SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
- SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
+ dlist_init(&PredXact->OldCommittedSxact->outConflicts);
+ dlist_init(&PredXact->OldCommittedSxact->inConflicts);
+ dlist_init(&PredXact->OldCommittedSxact->predicateLocks);
+ dlist_node_init(&PredXact->OldCommittedSxact->finishedLink);
+ dlist_init(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
@@ -1317,7 +1252,7 @@ InitPredicateLocks(void)
{
int i;
- SHMQueueInit(&RWConflictPool->availableList);
+ dlist_init(&RWConflictPool->availableList);
requestSize = mul_size((Size) max_table_size,
RWConflictDataSize);
RWConflictPool->element = ShmemAlloc(requestSize);
@@ -1325,8 +1260,8 @@ InitPredicateLocks(void)
memset(RWConflictPool->element, 0, requestSize);
for (i = 0; i < max_table_size; i++)
{
- SHMQueueInsertBefore(&(RWConflictPool->availableList),
- &(RWConflictPool->element[i].outLink));
+ dlist_push_tail(&RWConflictPool->availableList,
+ &RWConflictPool->element[i].outLink);
}
}
@@ -1334,13 +1269,13 @@ InitPredicateLocks(void)
* Create or attach to the header for the list of finished serializable
* transactions.
*/
- FinishedSerializableTransactions = (SHM_QUEUE *)
+ FinishedSerializableTransactions = (dlist_head *)
ShmemInitStruct("FinishedSerializableTransactions",
- sizeof(SHM_QUEUE),
+ sizeof(dlist_head),
&found);
Assert(found == IsUnderPostmaster);
if (!found)
- SHMQueueInit(FinishedSerializableTransactions);
+ dlist_init(FinishedSerializableTransactions);
/*
* Initialize the SLRU storage for old committed serializable
@@ -1379,7 +1314,7 @@ PredicateLockShmemSize(void)
max_table_size *= 10;
size = add_size(size, PredXactListDataSize);
size = add_size(size, mul_size((Size) max_table_size,
- PredXactListElementDataSize));
+ sizeof(SERIALIZABLEXACT)));
/* transaction xid table */
size = add_size(size, hash_estimate_size(max_table_size,
@@ -1392,7 +1327,7 @@ PredicateLockShmemSize(void)
RWConflictDataSize));
/* Head for list of finished serializable transactions. */
- size = add_size(size, sizeof(SHM_QUEUE));
+ size = add_size(size, sizeof(dlist_head));
/* Shared memory structures for SLRU tracking of old committed xids. */
size = add_size(size, sizeof(SerialControlData));
@@ -1515,7 +1450,7 @@ SummarizeOldestCommittedSxact(void)
* that case, we have nothing to do here. The caller will find one of the
* slots released by the other backend when it retries.
*/
- if (SHMQueueEmpty(FinishedSerializableTransactions))
+ if (dlist_is_empty(FinishedSerializableTransactions))
{
LWLockRelease(SerializableFinishedListLock);
return;
@@ -1525,11 +1460,9 @@ SummarizeOldestCommittedSxact(void)
* Grab the first sxact off the finished list -- this will be the earliest
* commit. Remove it from the list.
*/
- sxact = (SERIALIZABLEXACT *)
- SHMQueueNext(FinishedSerializableTransactions,
- FinishedSerializableTransactions,
- offsetof(SERIALIZABLEXACT, finishedLink));
- SHMQueueDelete(&(sxact->finishedLink));
+ sxact = dlist_head_element(SERIALIZABLEXACT, finishedLink,
+ FinishedSerializableTransactions);
+ dlist_delete_thoroughly(&sxact->finishedLink);
/* Add to SLRU summary information. */
if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
@@ -1583,7 +1516,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* them marked us as conflicted.
*/
MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
- while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+ while (!(dlist_is_empty(&MySerializableXact->possibleUnsafeConflicts) ||
SxactIsROUnsafe(MySerializableXact)))
{
LWLockRelease(SerializableXactHashLock);
@@ -1629,13 +1562,16 @@ int
GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
{
int num_written = 0;
- SERIALIZABLEXACT *sxact;
+ dlist_iter iter;
+ SERIALIZABLEXACT *sxact = NULL;
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
/* Find blocked_pid's SERIALIZABLEXACT by linear search. */
- for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
+ dlist_foreach(iter, &PredXact->activeList)
{
+ sxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur);
+
if (sxact->pid == blocked_pid)
break;
}
@@ -1643,21 +1579,13 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
/* Did we find it, and is it currently waiting in GetSafeSnapshot? */
if (sxact != NULL && SxactIsDeferrableWaiting(sxact))
{
- RWConflict possibleUnsafeConflict;
-
/* Traverse the list of possible unsafe conflicts collecting PIDs. */
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &sxact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
-
- while (possibleUnsafeConflict != NULL && num_written < output_size)
+ dlist_foreach(iter, &sxact->possibleUnsafeConflicts)
{
+ RWConflict possibleUnsafeConflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
+
output[num_written++] = possibleUnsafeConflict->sxactOut->pid;
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&sxact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -1870,19 +1798,21 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
sxact->prepareSeqNo = InvalidSerCommitSeqNo;
sxact->commitSeqNo = InvalidSerCommitSeqNo;
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+ dlist_init(&(sxact->outConflicts));
+ dlist_init(&(sxact->inConflicts));
+ dlist_init(&(sxact->possibleUnsafeConflicts));
sxact->topXid = GetTopTransactionIdIfAny();
sxact->finishedBefore = InvalidTransactionId;
sxact->xmin = snapshot->xmin;
sxact->pid = MyProcPid;
sxact->pgprocno = MyProc->pgprocno;
- SHMQueueInit(&(sxact->predicateLocks));
- SHMQueueElemInit(&(sxact->finishedLink));
+ dlist_init(&sxact->predicateLocks);
+ dlist_node_init(&sxact->finishedLink);
sxact->flags = 0;
if (XactReadOnly)
{
+ dlist_iter iter;
+
sxact->flags |= SXACT_FLAG_READ_ONLY;
/*
@@ -1891,10 +1821,10 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
* transactions then this snapshot can be deemed safe (and we can run
* without tracking predicate locks).
*/
- for (othersxact = FirstPredXact();
- othersxact != NULL;
- othersxact = NextPredXact(othersxact))
+ dlist_foreach(iter, &PredXact->activeList)
{
+ othersxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur);
+
if (!SxactIsCommitted(othersxact)
&& !SxactIsDoomed(othersxact)
&& !SxactIsReadOnly(othersxact))
@@ -2171,7 +2101,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
Assert(LWLockHeldByMe(SerializablePredicateListLock));
/* Can't remove it until no locks at this target. */
- if (!SHMQueueEmpty(&target->predicateLocks))
+ if (!dlist_is_empty(&target->predicateLocks))
return;
/* Actually remove the target. */
@@ -2199,28 +2129,20 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
{
SERIALIZABLEXACT *sxact;
PREDICATELOCK *predlock;
+ dlist_mutable_iter iter;
LWLockAcquire(SerializablePredicateListLock, LW_SHARED);
sxact = MySerializableXact;
if (IsInParallelMode())
LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE);
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- &(sxact->predicateLocks),
- offsetof(PREDICATELOCK, xactLink));
- while (predlock)
+
+ dlist_foreach_modify(iter, &sxact->predicateLocks)
{
- SHM_QUEUE *predlocksxactlink;
- PREDICATELOCK *nextpredlock;
PREDICATELOCKTAG oldlocktag;
PREDICATELOCKTARGET *oldtarget;
PREDICATELOCKTARGETTAG oldtargettag;
- predlocksxactlink = &(predlock->xactLink);
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- predlocksxactlink,
- offsetof(PREDICATELOCK, xactLink));
+ predlock = dlist_container(PREDICATELOCK, xactLink, iter.cur);
oldlocktag = predlock->tag;
Assert(oldlocktag.myXact == sxact);
@@ -2238,8 +2160,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
- SHMQueueDelete(predlocksxactlink);
- SHMQueueDelete(&(predlock->targetLink));
+ dlist_delete(&predlock->xactLink);
+ dlist_delete(&predlock->targetLink);
rmpredlock = hash_search_with_hash_value
(PredicateLockHash,
&oldlocktag,
@@ -2254,8 +2176,6 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
DecrementParentLocks(&oldtargettag);
}
-
- predlock = nextpredlock;
}
if (IsInParallelMode())
LWLockRelease(&sxact->perXactPredicateListLock);
@@ -2472,7 +2392,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
errmsg("out of shared memory"),
errhint("You might need to increase max_pred_locks_per_transaction.")));
if (!found)
- SHMQueueInit(&(target->predicateLocks));
+ dlist_init(&target->predicateLocks);
/* We've got the sxact and target, make sure they're joined. */
locktag.myTarget = target;
@@ -2489,9 +2409,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
if (!found)
{
- SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
- SHMQueueInsertBefore(&(sxact->predicateLocks),
- &(lock->xactLink));
+ dlist_push_tail(&target->predicateLocks, &lock->targetLink);
+ dlist_push_tail(&sxact->predicateLocks, &lock->xactLink);
lock->commitSeqNo = InvalidSerCommitSeqNo;
}
@@ -2663,30 +2582,22 @@ PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot,
static void
DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
{
- PREDICATELOCK *predlock;
- SHM_QUEUE *predlocktargetlink;
- PREDICATELOCK *nextpredlock;
- bool found;
+ dlist_mutable_iter iter;
Assert(LWLockHeldByMeInMode(SerializablePredicateListLock,
LW_EXCLUSIVE));
Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- &(target->predicateLocks),
- offsetof(PREDICATELOCK, targetLink));
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
- while (predlock)
+
+ dlist_foreach_modify(iter, &target->predicateLocks)
{
- predlocktargetlink = &(predlock->targetLink);
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- predlocktargetlink,
- offsetof(PREDICATELOCK, targetLink));
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, targetLink, iter.cur);
+ bool found;
- SHMQueueDelete(&(predlock->xactLink));
- SHMQueueDelete(&(predlock->targetLink));
+ dlist_delete(&(predlock->xactLink));
+ dlist_delete(&(predlock->targetLink));
hash_search_with_hash_value
(PredicateLockHash,
@@ -2695,8 +2606,6 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
targettaghash),
HASH_REMOVE, &found);
Assert(found);
-
- predlock = nextpredlock;
}
LWLockRelease(SerializableXactHashLock);
@@ -2795,8 +2704,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
if (oldtarget)
{
PREDICATELOCKTARGET *newtarget;
- PREDICATELOCK *oldpredlock;
PREDICATELOCKTAG newpredlocktag;
+ dlist_mutable_iter iter;
newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
&newtargettag,
@@ -2812,7 +2721,7 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
/* If we created a new entry, initialize it */
if (!found)
- SHMQueueInit(&(newtarget->predicateLocks));
+ dlist_init(&newtarget->predicateLocks);
newpredlocktag.myTarget = newtarget;
@@ -2820,29 +2729,21 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
* Loop through all the locks on the old target, replacing them with
* locks on the new target.
*/
- oldpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(oldtarget->predicateLocks),
- &(oldtarget->predicateLocks),
- offsetof(PREDICATELOCK, targetLink));
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
- while (oldpredlock)
+
+ dlist_foreach_modify(iter, &oldtarget->predicateLocks)
{
- SHM_QUEUE *predlocktargetlink;
- PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *oldpredlock =
+ dlist_container(PREDICATELOCK, targetLink, iter.cur);
PREDICATELOCK *newpredlock;
SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
- predlocktargetlink = &(oldpredlock->targetLink);
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(oldtarget->predicateLocks),
- predlocktargetlink,
- offsetof(PREDICATELOCK, targetLink));
newpredlocktag.myXact = oldpredlock->tag.myXact;
if (removeOld)
{
- SHMQueueDelete(&(oldpredlock->xactLink));
- SHMQueueDelete(&(oldpredlock->targetLink));
+ dlist_delete(&(oldpredlock->xactLink));
+ dlist_delete(&(oldpredlock->targetLink));
hash_search_with_hash_value
(PredicateLockHash,
@@ -2870,10 +2771,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
}
if (!found)
{
- SHMQueueInsertBefore(&(newtarget->predicateLocks),
- &(newpredlock->targetLink));
- SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
- &(newpredlock->xactLink));
+ dlist_push_tail(&(newtarget->predicateLocks),
+ &(newpredlock->targetLink));
+ dlist_push_tail(&(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
newpredlock->commitSeqNo = oldCommitSeqNo;
}
else
@@ -2885,14 +2786,12 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
Assert(newpredlock->commitSeqNo != 0);
Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
|| (newpredlock->tag.myXact == OldCommittedSxact));
-
- oldpredlock = nextpredlock;
}
LWLockRelease(SerializableXactHashLock);
if (removeOld)
{
- Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
+ Assert(dlist_is_empty(&oldtarget->predicateLocks));
RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
}
}
@@ -3012,7 +2911,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
{
- PREDICATELOCK *oldpredlock;
+ dlist_mutable_iter iter;
/*
* Check whether this is a target which needs attention.
@@ -3047,29 +2946,21 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
heaptargettaghash,
HASH_ENTER, &found);
if (!found)
- SHMQueueInit(&heaptarget->predicateLocks);
+ dlist_init(&heaptarget->predicateLocks);
}
/*
* Loop through all the locks on the old target, replacing them with
* locks on the new target.
*/
- oldpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(oldtarget->predicateLocks),
- &(oldtarget->predicateLocks),
- offsetof(PREDICATELOCK, targetLink));
- while (oldpredlock)
+ dlist_foreach_modify(iter, &oldtarget->predicateLocks)
{
- PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *oldpredlock =
+ dlist_container(PREDICATELOCK, targetLink, iter.cur);
PREDICATELOCK *newpredlock;
SerCommitSeqNo oldCommitSeqNo;
SERIALIZABLEXACT *oldXact;
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(oldtarget->predicateLocks),
- &(oldpredlock->targetLink),
- offsetof(PREDICATELOCK, targetLink));
-
/*
* Remove the old lock first. This avoids the chance of running
* out of lock structure entries for the hash table.
@@ -3077,7 +2968,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
oldCommitSeqNo = oldpredlock->commitSeqNo;
oldXact = oldpredlock->tag.myXact;
- SHMQueueDelete(&(oldpredlock->xactLink));
+ dlist_delete(&(oldpredlock->xactLink));
/*
* No need for retail delete from oldtarget list, we're removing
@@ -3103,10 +2994,10 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
&found);
if (!found)
{
- SHMQueueInsertBefore(&(heaptarget->predicateLocks),
- &(newpredlock->targetLink));
- SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
- &(newpredlock->xactLink));
+ dlist_push_tail(&(heaptarget->predicateLocks),
+ &(newpredlock->targetLink));
+ dlist_push_tail(&(newpredlocktag.myXact->predicateLocks),
+ &(newpredlock->xactLink));
newpredlock->commitSeqNo = oldCommitSeqNo;
}
else
@@ -3119,8 +3010,6 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
|| (newpredlock->tag.myXact == OldCommittedSxact));
}
-
- oldpredlock = nextpredlock;
}
hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
@@ -3275,15 +3164,18 @@ PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
static void
SetNewSxactGlobalXmin(void)
{
- SERIALIZABLEXACT *sxact;
+ dlist_iter iter;
Assert(LWLockHeldByMe(SerializableXactHashLock));
PredXact->SxactGlobalXmin = InvalidTransactionId;
PredXact->SxactGlobalXminCount = 0;
- for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
+ dlist_foreach(iter, &PredXact->activeList)
{
+ SERIALIZABLEXACT *sxact =
+ dlist_container(SERIALIZABLEXACT, xactLink, iter.cur);
+
if (!SxactIsRolledBack(sxact)
&& !SxactIsCommitted(sxact)
&& sxact != OldCommittedSxact)
@@ -3334,10 +3226,8 @@ void
ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
{
bool needToClear;
- RWConflict conflict,
- nextConflict,
- possibleUnsafeConflict;
SERIALIZABLEXACT *roXact;
+ dlist_mutable_iter iter;
/*
* We can't trust XactReadOnly here, because a transaction which started
@@ -3525,23 +3415,15 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* make us unsafe. Note that we use 'inLink' for the iteration as
* opposed to 'outLink' for the r/w xacts.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, inLink));
- while (possibleUnsafeConflict)
+ dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->inLink,
- offsetof(RWConflictData, inLink));
+ RWConflict possibleUnsafeConflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
ReleaseRWConflict(possibleUnsafeConflict);
-
- possibleUnsafeConflict = nextConflict;
}
}
@@ -3564,16 +3446,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to
* previously committed transactions.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &MySerializableXact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ dlist_foreach_modify(iter, &MySerializableXact->outConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
+ RWConflict conflict =
+ dlist_container(RWConflictData, outLink, iter.cur);
if (isCommit
&& !SxactIsReadOnly(MySerializableXact)
@@ -3589,31 +3465,21 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
|| SxactIsCommitted(conflict->sxactIn)
|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
/*
* Release all inConflicts from committed and read-only transactions. If
* we're rolling back, clear them all.
*/
- conflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ dlist_foreach_modify(iter, &MySerializableXact->inConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
+ RWConflict conflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
if (!isCommit
|| SxactIsCommitted(conflict->sxactOut)
|| SxactIsReadOnly(conflict->sxactOut))
ReleaseRWConflict(conflict);
-
- conflict = nextConflict;
}
if (!topLevelIsDeclaredReadOnly)
@@ -3624,16 +3490,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* conflict out. If any are waiting DEFERRABLE transactions, wake them
* up if they are known safe or known unsafe.
*/
- possibleUnsafeConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &MySerializableXact->possibleUnsafeConflicts,
- offsetof(RWConflictData, outLink));
- while (possibleUnsafeConflict)
+ dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
- &possibleUnsafeConflict->outLink,
- offsetof(RWConflictData, outLink));
+ RWConflict possibleUnsafeConflict =
+ dlist_container(RWConflictData, outLink, iter.cur);
roXact = possibleUnsafeConflict->sxactIn;
Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
@@ -3661,7 +3521,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
* transaction can now safely release its predicate locks (but
* that transaction's backend has to do that itself).
*/
- if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
+ if (dlist_is_empty(&roXact->possibleUnsafeConflicts))
roXact->flags |= SXACT_FLAG_RO_SAFE;
}
@@ -3672,8 +3532,6 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
if (SxactIsDeferrableWaiting(roXact) &&
(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
ProcSendSignal(roXact->pgprocno);
-
- possibleUnsafeConflict = nextConflict;
}
}
@@ -3701,8 +3559,8 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
/* Add this to the list of transactions to check for later cleanup. */
if (isCommit)
- SHMQueueInsertBefore(FinishedSerializableTransactions,
- &MySerializableXact->finishedLink);
+ dlist_push_tail(FinishedSerializableTransactions,
+ &MySerializableXact->finishedLink);
/*
* If we're releasing a RO_SAFE transaction in parallel mode, we'll only
@@ -3744,27 +3602,19 @@ ReleasePredicateLocksLocal(void)
static void
ClearOldPredicateLocks(void)
{
- SERIALIZABLEXACT *finishedSxact;
- PREDICATELOCK *predlock;
+ dlist_mutable_iter iter;
/*
* Loop through finished transactions. They are in commit order, so we can
* stop as soon as we find one that's still interesting.
*/
LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
- finishedSxact = (SERIALIZABLEXACT *)
- SHMQueueNext(FinishedSerializableTransactions,
- FinishedSerializableTransactions,
- offsetof(SERIALIZABLEXACT, finishedLink));
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
- while (finishedSxact)
+ dlist_foreach_modify(iter, FinishedSerializableTransactions)
{
- SERIALIZABLEXACT *nextSxact;
+ SERIALIZABLEXACT *finishedSxact =
+ dlist_container(SERIALIZABLEXACT, finishedLink, iter.cur);
- nextSxact = (SERIALIZABLEXACT *)
- SHMQueueNext(FinishedSerializableTransactions,
- &(finishedSxact->finishedLink),
- offsetof(SERIALIZABLEXACT, finishedLink));
if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
|| TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
PredXact->SxactGlobalXmin))
@@ -3774,7 +3624,7 @@ ClearOldPredicateLocks(void)
* took its snapshot. It's no longer interesting.
*/
LWLockRelease(SerializableXactHashLock);
- SHMQueueDelete(&(finishedSxact->finishedLink));
+ dlist_delete_thoroughly(&finishedSxact->finishedLink);
ReleaseOneSerializableXact(finishedSxact, false, false);
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
}
@@ -3791,7 +3641,7 @@ ClearOldPredicateLocks(void)
if (SxactIsReadOnly(finishedSxact))
{
/* A read-only transaction can be removed entirely */
- SHMQueueDelete(&(finishedSxact->finishedLink));
+ dlist_delete_thoroughly(&(finishedSxact->finishedLink));
ReleaseOneSerializableXact(finishedSxact, false, false);
}
else
@@ -3812,7 +3662,6 @@ ClearOldPredicateLocks(void)
/* Still interesting. */
break;
}
- finishedSxact = nextSxact;
}
LWLockRelease(SerializableXactHashLock);
@@ -3820,20 +3669,12 @@ ClearOldPredicateLocks(void)
* Loop through predicate locks on dummy transaction for summarized data.
*/
LWLockAcquire(SerializablePredicateListLock, LW_SHARED);
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&OldCommittedSxact->predicateLocks,
- &OldCommittedSxact->predicateLocks,
- offsetof(PREDICATELOCK, xactLink));
- while (predlock)
+ dlist_foreach_modify(iter, &OldCommittedSxact->predicateLocks)
{
- PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, xactLink, iter.cur);
bool canDoPartialCleanup;
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&OldCommittedSxact->predicateLocks,
- &predlock->xactLink,
- offsetof(PREDICATELOCK, xactLink));
-
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
Assert(predlock->commitSeqNo != 0);
Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
@@ -3860,8 +3701,8 @@ ClearOldPredicateLocks(void)
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
- SHMQueueDelete(&(predlock->targetLink));
- SHMQueueDelete(&(predlock->xactLink));
+ dlist_delete(&(predlock->targetLink));
+ dlist_delete(&(predlock->xactLink));
hash_search_with_hash_value(PredicateLockHash, &tag,
PredicateLockHashCodeFromTargetHashCode(&tag,
@@ -3871,8 +3712,6 @@ ClearOldPredicateLocks(void)
LWLockRelease(partitionLock);
}
-
- predlock = nextpredlock;
}
LWLockRelease(SerializablePredicateListLock);
@@ -3902,10 +3741,8 @@ static void
ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
bool summarize)
{
- PREDICATELOCK *predlock;
SERIALIZABLEXIDTAG sxidtag;
- RWConflict conflict,
- nextConflict;
+ dlist_mutable_iter iter;
Assert(sxact != NULL);
Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
@@ -3919,27 +3756,17 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
LWLockAcquire(SerializablePredicateListLock, LW_SHARED);
if (IsInParallelMode())
LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE);
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- &(sxact->predicateLocks),
- offsetof(PREDICATELOCK, xactLink));
- while (predlock)
+ dlist_foreach_modify(iter, &sxact->predicateLocks)
{
- PREDICATELOCK *nextpredlock;
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, xactLink, iter.cur);
PREDICATELOCKTAG tag;
- SHM_QUEUE *targetLink;
PREDICATELOCKTARGET *target;
PREDICATELOCKTARGETTAG targettag;
uint32 targettaghash;
LWLock *partitionLock;
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- &(predlock->xactLink),
- offsetof(PREDICATELOCK, xactLink));
-
tag = predlock->tag;
- targetLink = &(predlock->targetLink);
target = tag.myTarget;
targettag = target->tag;
targettaghash = PredicateLockTargetTagHashCode(&targettag);
@@ -3947,7 +3774,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
- SHMQueueDelete(targetLink);
+ dlist_delete(&predlock->targetLink);
hash_search_with_hash_value(PredicateLockHash, &tag,
PredicateLockHashCodeFromTargetHashCode(&tag,
@@ -3977,10 +3804,10 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
}
else
{
- SHMQueueInsertBefore(&(target->predicateLocks),
- &(predlock->targetLink));
- SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
- &(predlock->xactLink));
+ dlist_push_tail(&target->predicateLocks,
+ &predlock->targetLink);
+ dlist_push_tail(&OldCommittedSxact->predicateLocks,
+ &predlock->xactLink);
predlock->commitSeqNo = sxact->commitSeqNo;
}
}
@@ -3988,15 +3815,13 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
RemoveTargetIfNoLongerUsed(target, targettaghash);
LWLockRelease(partitionLock);
-
- predlock = nextpredlock;
}
/*
* Rather than retail removal, just re-init the head after we've run
* through the list.
*/
- SHMQueueInit(&sxact->predicateLocks);
+ dlist_init(&sxact->predicateLocks);
if (IsInParallelMode())
LWLockRelease(&sxact->perXactPredicateListLock);
@@ -4008,38 +3833,26 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
/* Release all outConflicts (unless 'partial' is true) */
if (!partial)
{
- conflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &sxact->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ dlist_foreach_modify(iter, &sxact->outConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
+ RWConflict conflict =
+ dlist_container(RWConflictData, outLink, iter.cur);
+
if (summarize)
conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
}
/* Release all inConflicts. */
- conflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &sxact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
+ dlist_foreach_modify(iter, &sxact->inConflicts)
{
- nextConflict = (RWConflict)
- SHMQueueNext(&sxact->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
+ RWConflict conflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
+
if (summarize)
conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
ReleaseRWConflict(conflict);
- conflict = nextConflict;
}
/* Finally, get rid of the xid and the record of the transaction itself. */
@@ -4165,7 +3978,7 @@ CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot s
errhint("The transaction might succeed if retried.")));
if (SxactHasSummaryConflictIn(MySerializableXact)
- || !SHMQueueEmpty(&MySerializableXact->inConflicts))
+ || !dlist_is_empty(&MySerializableXact->inConflicts))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to read/write dependencies among transactions"),
@@ -4261,9 +4074,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
uint32 targettaghash;
LWLock *partitionLock;
PREDICATELOCKTARGET *target;
- PREDICATELOCK *predlock;
PREDICATELOCK *mypredlock = NULL;
PREDICATELOCKTAG mypredlocktag;
+ dlist_mutable_iter iter;
Assert(MySerializableXact != InvalidSerializableXact);
@@ -4288,24 +4101,14 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
* Each lock for an overlapping transaction represents a conflict: a
* rw-dependency in to this transaction.
*/
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- &(target->predicateLocks),
- offsetof(PREDICATELOCK, targetLink));
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
- while (predlock)
- {
- SHM_QUEUE *predlocktargetlink;
- PREDICATELOCK *nextpredlock;
- SERIALIZABLEXACT *sxact;
- predlocktargetlink = &(predlock->targetLink);
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- predlocktargetlink,
- offsetof(PREDICATELOCK, targetLink));
+ dlist_foreach_modify(iter, &target->predicateLocks)
+ {
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, targetLink, iter.cur);
+ SERIALIZABLEXACT *sxact = predlock->tag.myXact;
- sxact = predlock->tag.myXact;
if (sxact == MySerializableXact)
{
/*
@@ -4350,8 +4153,6 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
LWLockRelease(SerializableXactHashLock);
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
}
-
- predlock = nextpredlock;
}
LWLockRelease(SerializableXactHashLock);
LWLockRelease(partitionLock);
@@ -4391,8 +4192,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
{
Assert(rmpredlock == mypredlock);
- SHMQueueDelete(&(mypredlock->targetLink));
- SHMQueueDelete(&(mypredlock->xactLink));
+ dlist_delete(&(mypredlock->targetLink));
+ dlist_delete(&(mypredlock->xactLink));
rmpredlock = (PREDICATELOCK *)
hash_search_with_hash_value(PredicateLockHash,
@@ -4562,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation)
while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
{
- PREDICATELOCK *predlock;
+ dlist_mutable_iter iter;
/*
* Check whether this is a target which needs attention.
@@ -4575,26 +4376,16 @@ CheckTableForSerializableConflictIn(Relation relation)
/*
* Loop through locks for this target and flag conflicts.
*/
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- &(target->predicateLocks),
- offsetof(PREDICATELOCK, targetLink));
- while (predlock)
+ dlist_foreach_modify(iter, &target->predicateLocks)
{
- PREDICATELOCK *nextpredlock;
-
- nextpredlock = (PREDICATELOCK *)
- SHMQueueNext(&(target->predicateLocks),
- &(predlock->targetLink),
- offsetof(PREDICATELOCK, targetLink));
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, targetLink, iter.cur);
if (predlock->tag.myXact != MySerializableXact
&& !RWConflictExists(predlock->tag.myXact, MySerializableXact))
{
FlagRWConflict(predlock->tag.myXact, MySerializableXact);
}
-
- predlock = nextpredlock;
}
}
@@ -4652,7 +4443,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
SERIALIZABLEXACT *writer)
{
bool failure;
- RWConflict conflict;
Assert(LWLockHeldByMe(SerializableXactHashLock));
@@ -4692,20 +4482,16 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
* to abort.
*------------------------------------------------------------------------
*/
- if (!failure)
+ if (!failure && SxactHasSummaryConflictOut(writer))
+ failure = true;
+ else if (!failure)
{
- if (SxactHasSummaryConflictOut(writer))
- {
- failure = true;
- conflict = NULL;
- }
- else
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &writer->outConflicts,
- offsetof(RWConflictData, outLink));
- while (conflict)
+ dlist_iter iter;
+
+ dlist_foreach(iter, &writer->outConflicts)
{
+ RWConflict conflict =
+ dlist_container(RWConflictData, outLink, iter.cur);
SERIALIZABLEXACT *t2 = conflict->sxactIn;
if (SxactIsPrepared(t2)
@@ -4719,10 +4505,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
failure = true;
break;
}
- conflict = (RWConflict)
- SHMQueueNext(&writer->outConflicts,
- &conflict->outLink,
- offsetof(RWConflictData, outLink));
}
}
@@ -4744,30 +4526,31 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
if (SxactHasSummaryConflictIn(reader))
{
failure = true;
- conflict = NULL;
}
else
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &reader->inConflicts,
- offsetof(RWConflictData, inLink));
- while (conflict)
{
- SERIALIZABLEXACT *t0 = conflict->sxactOut;
+ dlist_iter iter;
- if (!SxactIsDoomed(t0)
- && (!SxactIsCommitted(t0)
- || t0->commitSeqNo >= writer->prepareSeqNo)
- && (!SxactIsReadOnly(t0)
- || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ /*
+ * The unconstify is needed as we have no const version of
+ * dlist_foreach().
+ */
+ dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->inConflicts)
{
- failure = true;
- break;
+ const RWConflict conflict =
+ dlist_container(RWConflictData, inLink, iter.cur);
+ const SERIALIZABLEXACT *t0 = conflict->sxactOut;
+
+ if (!SxactIsDoomed(t0)
+ && (!SxactIsCommitted(t0)
+ || t0->commitSeqNo >= writer->prepareSeqNo)
+ && (!SxactIsReadOnly(t0)
+ || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
+ {
+ failure = true;
+ break;
+ }
}
- conflict = (RWConflict)
- SHMQueueNext(&reader->inConflicts,
- &conflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
@@ -4825,7 +4608,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
void
PreCommit_CheckForSerializationFailure(void)
{
- RWConflict nearConflict;
+ dlist_iter near_iter;
if (MySerializableXact == InvalidSerializableXact)
return;
@@ -4846,23 +4629,21 @@ PreCommit_CheckForSerializationFailure(void)
errhint("The transaction might succeed if retried.")));
}
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &MySerializableXact->inConflicts,
- offsetof(RWConflictData, inLink));
- while (nearConflict)
+ dlist_foreach(near_iter, &MySerializableXact->inConflicts)
{
+ RWConflict nearConflict =
+ dlist_container(RWConflictData, inLink, near_iter.cur);
+
if (!SxactIsCommitted(nearConflict->sxactOut)
&& !SxactIsDoomed(nearConflict->sxactOut))
{
- RWConflict farConflict;
+ dlist_iter far_iter;
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &nearConflict->sxactOut->inConflicts,
- offsetof(RWConflictData, inLink));
- while (farConflict)
+ dlist_foreach(far_iter, &nearConflict->sxactOut->inConflicts)
{
+ RWConflict farConflict =
+ dlist_container(RWConflictData, inLink, far_iter.cur);
+
if (farConflict->sxactOut == MySerializableXact
|| (!SxactIsCommitted(farConflict->sxactOut)
&& !SxactIsReadOnly(farConflict->sxactOut)
@@ -4886,17 +4667,8 @@ PreCommit_CheckForSerializationFailure(void)
nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
break;
}
- farConflict = (RWConflict)
- SHMQueueNext(&nearConflict->sxactOut->inConflicts,
- &farConflict->inLink,
- offsetof(RWConflictData, inLink));
}
}
-
- nearConflict = (RWConflict)
- SHMQueueNext(&MySerializableXact->inConflicts,
- &nearConflict->inLink,
- offsetof(RWConflictData, inLink));
}
MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
@@ -4919,11 +4691,11 @@ PreCommit_CheckForSerializationFailure(void)
void
AtPrepare_PredicateLocks(void)
{
- PREDICATELOCK *predlock;
SERIALIZABLEXACT *sxact;
TwoPhasePredicateRecord record;
TwoPhasePredicateXactRecord *xactRecord;
TwoPhasePredicateLockRecord *lockRecord;
+ dlist_iter iter;
sxact = MySerializableXact;
xactRecord = &(record.data.xactRecord);
@@ -4963,23 +4735,16 @@ AtPrepare_PredicateLocks(void)
*/
Assert(!IsParallelWorker() && !ParallelContextActive());
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- &(sxact->predicateLocks),
- offsetof(PREDICATELOCK, xactLink));
-
- while (predlock != NULL)
+ dlist_foreach(iter, &sxact->predicateLocks)
{
+ PREDICATELOCK *predlock =
+ dlist_container(PREDICATELOCK, xactLink, iter.cur);
+
record.type = TWOPHASEPREDICATERECORD_LOCK;
lockRecord->target = predlock->tag.myTarget->tag;
RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
&record, sizeof(record));
-
- predlock = (PREDICATELOCK *)
- SHMQueueNext(&(sxact->predicateLocks),
- &(predlock->xactLink),
- offsetof(PREDICATELOCK, xactLink));
}
LWLockRelease(SerializablePredicateListLock);
@@ -5091,10 +4856,10 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* recovered xact started are still active, except possibly other
* prepared xacts and we don't care whether those are RO_SAFE or not.
*/
- SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+ dlist_init(&(sxact->possibleUnsafeConflicts));
- SHMQueueInit(&(sxact->predicateLocks));
- SHMQueueElemInit(&(sxact->finishedLink));
+ dlist_init(&(sxact->predicateLocks));
+ dlist_node_init(&sxact->finishedLink);
sxact->topXid = xid;
sxact->xmin = xactRecord->xmin;
@@ -5112,8 +4877,8 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
* we'll conservatively assume that it had both a conflict in and a
* conflict out, and represent that with the summary conflict flags.
*/
- SHMQueueInit(&(sxact->outConflicts));
- SHMQueueInit(&(sxact->inConflicts));
+ dlist_init(&(sxact->outConflicts));
+ dlist_init(&(sxact->inConflicts));
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 9e42e1a72c5..142a195d0e0 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -14,6 +14,7 @@
#ifndef PREDICATE_INTERNALS_H
#define PREDICATE_INTERNALS_H
+#include "lib/ilist.h"
#include "storage/lock.h"
#include "storage/lwlock.h"
@@ -84,13 +85,14 @@ typedef struct SERIALIZABLEXACT
SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or
* no conflict out */
} SeqNo;
- SHM_QUEUE outConflicts; /* list of write transactions whose data we
+ dlist_head outConflicts; /* list of write transactions whose data we
* couldn't read. */
- SHM_QUEUE inConflicts; /* list of read transactions which couldn't
+ dlist_head inConflicts; /* list of read transactions which couldn't
* see our write. */
- SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */
- SHM_QUEUE finishedLink; /* list link in
+ dlist_head predicateLocks; /* list of associated PREDICATELOCK objects */
+ dlist_node finishedLink; /* list link in
* FinishedSerializableTransactions */
+ dlist_node xactLink; /* PredXact->activeList/availableList */
/*
* perXactPredicateListLock is only used in parallel queries: it protects
@@ -103,7 +105,7 @@ typedef struct SERIALIZABLEXACT
* for r/o transactions: list of concurrent r/w transactions that we could
* potentially have conflicts with, and vice versa for r/w transactions
*/
- SHM_QUEUE possibleUnsafeConflicts;
+ dlist_head possibleUnsafeConflicts;
TransactionId topXid; /* top level xid for the transaction, if one
* exists; else invalid */
@@ -139,28 +141,10 @@ typedef struct SERIALIZABLEXACT
*/
#define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800
-/*
- * The following types are used to provide an ad hoc list for holding
- * SERIALIZABLEXACT objects. An HTAB is overkill, since there is no need to
- * access these by key -- there are direct pointers to these objects where
- * needed. If a shared memory list is created, these types can probably be
- * eliminated in favor of using the general solution.
- */
-typedef struct PredXactListElementData
-{
- SHM_QUEUE link;
- SERIALIZABLEXACT sxact;
-} PredXactListElementData;
-
-typedef struct PredXactListElementData *PredXactListElement;
-
-#define PredXactListElementDataSize \
- ((Size)MAXALIGN(sizeof(PredXactListElementData)))
-
typedef struct PredXactListData
{
- SHM_QUEUE availableList;
- SHM_QUEUE activeList;
+ dlist_head availableList;
+ dlist_head activeList;
/*
* These global variables are maintained when registering and cleaning up
@@ -187,7 +171,7 @@ typedef struct PredXactListData
* seq no */
SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */
- PredXactListElement element;
+ SERIALIZABLEXACT *element;
} PredXactListData;
typedef struct PredXactListData *PredXactList;
@@ -208,8 +192,8 @@ typedef struct PredXactListData *PredXactList;
*/
typedef struct RWConflictData
{
- SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */
- SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */
+ dlist_node outLink; /* link for list of conflicts out from a sxact */
+ dlist_node inLink; /* link for list of conflicts in to a sxact */
SERIALIZABLEXACT *sxactOut;
SERIALIZABLEXACT *sxactIn;
} RWConflictData;
@@ -221,7 +205,7 @@ typedef struct RWConflictData *RWConflict;
typedef struct RWConflictPoolHeaderData
{
- SHM_QUEUE availableList;
+ dlist_head availableList;
RWConflict element;
} RWConflictPoolHeaderData;
@@ -303,7 +287,7 @@ typedef struct PREDICATELOCKTARGET
PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */
/* data */
- SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with
+ dlist_head predicateLocks; /* list of PREDICATELOCK objects assoc. with
* predicate lock target */
} PREDICATELOCKTARGET;
@@ -336,9 +320,9 @@ typedef struct PREDICATELOCK
PREDICATELOCKTAG tag; /* unique identifier of lock */
/* data */
- SHM_QUEUE targetLink; /* list link in PREDICATELOCKTARGET's list of
+ dlist_node targetLink; /* list link in PREDICATELOCKTARGET's list of
* predicate locks */
- SHM_QUEUE xactLink; /* list link in SERIALIZABLEXACT's list of
+ dlist_node xactLink; /* list link in SERIALIZABLEXACT's list of
* predicate locks */
SerCommitSeqNo commitSeqNo; /* only used for summarized predicate locks */
} PREDICATELOCK;