aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/storage/ipc/standby.c164
1 files changed, 87 insertions, 77 deletions
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 58699a16451..4558d6fd325 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -28,6 +28,8 @@
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/standby.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
@@ -37,7 +39,7 @@ int vacuum_defer_cleanup_age;
int max_standby_archive_delay = 30 * 1000;
int max_standby_streaming_delay = 30 * 1000;
-static List *RecoveryLockList;
+static HTAB *RecoveryLockLists;
static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
ProcSignalReason reason);
@@ -46,6 +48,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+/*
+ * Keep track of all the locks owned by a given transaction.
+ */
+typedef struct RecoveryLockListsEntry
+{
+ TransactionId xid;
+ List *locks;
+} RecoveryLockListsEntry;
/*
* InitRecoveryTransactionEnvironment
@@ -63,6 +73,19 @@ void
InitRecoveryTransactionEnvironment(void)
{
VirtualTransactionId vxid;
+ HASHCTL hash_ctl;
+
+ /*
+ * Initialize the hash table for tracking the list of locks held by each
+ * transaction.
+ */
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
+ RecoveryLockLists = hash_create("RecoveryLockLists",
+ 64,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
/*
* Initialize shared invalidation management for Startup process, being
@@ -107,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
/* Release all locks the tracked transactions were holding */
StandbyReleaseAllLocks();
+ /* Destroy the hash table of locks. */
+ hash_destroy(RecoveryLockLists);
+ RecoveryLockLists = NULL;
+
/* Cleanup our VirtualTransaction */
VirtualXactLockTableCleanup();
}
@@ -549,8 +576,8 @@ StandbyTimeoutHandler(void)
* We only keep track of AccessExclusiveLocks, which are only ever held by
* one transaction on one relation, and don't worry about lock queuing.
*
- * We keep a single dynamically expandible list of locks in local memory,
- * RecoveryLockList, so we can keep track of the various entries made by
+ * We keep a hash table of lists of locks in local memory keyed by xid,
+ * RecoveryLockLists, so we can keep track of the various entries made by
* the Startup process's virtual xid in the shared lock table.
*
* We record the lock against the top-level xid, rather than individual
@@ -568,8 +595,10 @@ StandbyTimeoutHandler(void)
void
StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
{
+ RecoveryLockListsEntry *entry;
xl_standby_lock *newlock;
LOCKTAG locktag;
+ bool found;
/* Already processed? */
if (!TransactionIdIsValid(xid) ||
@@ -583,11 +612,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
/* dbOid is InvalidOid when we are locking a shared relation. */
Assert(OidIsValid(relOid));
+ /* Create a new list for this xid, if we don't have one already. */
+ entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
+ if (!found)
+ {
+ entry->xid = xid;
+ entry->locks = NIL;
+ }
+
newlock = palloc(sizeof(xl_standby_lock));
newlock->xid = xid;
newlock->dbOid = dbOid;
newlock->relOid = relOid;
- RecoveryLockList = lappend(RecoveryLockList, newlock);
+ entry->locks = lappend(entry->locks, newlock);
/*
* Attempt to acquire the lock as requested, if not resolve conflict
@@ -600,46 +637,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
}
static void
-StandbyReleaseLocks(TransactionId xid)
+StandbyReleaseLockList(List *locks)
{
- ListCell *cell,
- *prev,
- *next;
-
- /*
- * Release all matching locks and remove them from list
- */
- prev = NULL;
- for (cell = list_head(RecoveryLockList); cell; cell = next)
+ while (locks)
{
- xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
+ LOCKTAG locktag;
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %u rel %u",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ {
+ elog(LOG,
+ "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+ lock->xid, lock->dbOid, lock->relOid);
+ Assert(false);
+ }
+ pfree(lock);
+ locks = list_delete_first(locks);
+ }
+}
- next = lnext(cell);
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+ RecoveryLockListsEntry *entry;
- if (!TransactionIdIsValid(xid) || lock->xid == xid)
+ if (TransactionIdIsValid(xid))
+ {
+ if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
{
- LOCKTAG locktag;
-
- elog(trace_recovery(DEBUG4),
- "releasing recovery lock: xid %u db %u rel %u",
- lock->xid, lock->dbOid, lock->relOid);
- SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
- if (!LockRelease(&locktag, AccessExclusiveLock, true))
- elog(LOG,
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
- lock->xid, lock->dbOid, lock->relOid);
-
- RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
- pfree(lock);
+ StandbyReleaseLockList(entry->locks);
+ hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
}
- else
- prev = cell;
}
+ else
+ StandbyReleaseAllLocks();
}
/*
* Release locks for a transaction tree, starting at xid down, from
- * RecoveryLockList.
+ * RecoveryLockLists.
*
* Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
* to remove any AccessExclusiveLocks requested by a transaction.
@@ -661,30 +700,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
void
StandbyReleaseAllLocks(void)
{
- ListCell *cell,
- *prev,
- *next;
- LOCKTAG locktag;
+ HASH_SEQ_STATUS status;
+ RecoveryLockListsEntry *entry;
elog(trace_recovery(DEBUG2), "release all standby locks");
- prev = NULL;
- for (cell = list_head(RecoveryLockList); cell; cell = next)
+ hash_seq_init(&status, RecoveryLockLists);
+ while ((entry = hash_seq_search(&status)))
{
- xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-
- next = lnext(cell);
-
- elog(trace_recovery(DEBUG4),
- "releasing recovery lock: xid %u db %u rel %u",
- lock->xid, lock->dbOid, lock->relOid);
- SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
- if (!LockRelease(&locktag, AccessExclusiveLock, true))
- elog(LOG,
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
- lock->xid, lock->dbOid, lock->relOid);
- RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
- pfree(lock);
+ StandbyReleaseLockList(entry->locks);
+ hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
}
}
@@ -696,22 +721,17 @@ StandbyReleaseAllLocks(void)
void
StandbyReleaseOldLocks(int nxids, TransactionId *xids)
{
- ListCell *cell,
- *prev,
- *next;
- LOCKTAG locktag;
+ HASH_SEQ_STATUS status;
+ RecoveryLockListsEntry *entry;
- prev = NULL;
- for (cell = list_head(RecoveryLockList); cell; cell = next)
+ hash_seq_init(&status, RecoveryLockLists);
+ while ((entry = hash_seq_search(&status)))
{
- xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
bool remove = false;
- next = lnext(cell);
+ Assert(TransactionIdIsValid(entry->xid));
- Assert(TransactionIdIsValid(lock->xid));
-
- if (StandbyTransactionIdIsPrepared(lock->xid))
+ if (StandbyTransactionIdIsPrepared(entry->xid))
remove = false;
else
{
@@ -720,7 +740,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
for (i = 0; i < nxids; i++)
{
- if (lock->xid == xids[i])
+ if (entry->xid == xids[i])
{
found = true;
break;
@@ -736,19 +756,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
if (remove)
{
- elog(trace_recovery(DEBUG4),
- "releasing recovery lock: xid %u db %u rel %u",
- lock->xid, lock->dbOid, lock->relOid);
- SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
- if (!LockRelease(&locktag, AccessExclusiveLock, true))
- elog(LOG,
- "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
- lock->xid, lock->dbOid, lock->relOid);
- RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
- pfree(lock);
+ StandbyReleaseLockList(entry->locks);
+ hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
}
- else
- prev = cell;
}
}