diff options
Diffstat (limited to 'src/backend/storage/ipc/procarray.c')
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 787 |
1 files changed, 787 insertions, 0 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c new file mode 100644 index 00000000000..85ae3c762ba --- /dev/null +++ b/src/backend/storage/ipc/procarray.c @@ -0,0 +1,787 @@ +/*------------------------------------------------------------------------- + * + * procarray.c + * POSTGRES process array code. + * + * + * This module maintains an unsorted array of the PGPROC structures for all + * active backends. Although there are several uses for this, the principal + * one is as a means of determining the set of currently running transactions. + * + * Because of various subtle race conditions it is critical that a backend + * hold the correct locks while setting or clearing its MyProc->xid field. + * See notes in GetSnapshotData. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.1 2005/05/19 21:35:46 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/subtrans.h" +#include "miscadmin.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "utils/tqual.h" + + +/* Our shared memory area */ +typedef struct ProcArrayStruct +{ + int numProcs; /* number of valid procs entries */ + int maxProcs; /* allocated size of procs array */ + + /* + * We declare procs[] as 1 entry because C wants a fixed-size array, + * but actually it is maxProcs entries long. + */ + PGPROC *procs[1]; /* VARIABLE LENGTH ARRAY */ +} ProcArrayStruct; + +static ProcArrayStruct *procArray; + + +#ifdef XIDCACHE_DEBUG + +/* counters for XidCache measurement */ +static long xc_by_recent_xmin = 0; +static long xc_by_main_xid = 0; +static long xc_by_child_xid = 0; +static long xc_slow_answer = 0; + +#define xc_by_recent_xmin_inc() (xc_by_recent_xmin++) +#define xc_by_main_xid_inc() (xc_by_main_xid++) +#define xc_by_child_xid_inc() (xc_by_child_xid++) +#define xc_slow_answer_inc() (xc_slow_answer++) + +static void DisplayXidCache(void); + +#else /* !XIDCACHE_DEBUG */ + +#define xc_by_recent_xmin_inc() ((void) 0) +#define xc_by_main_xid_inc() ((void) 0) +#define xc_by_child_xid_inc() ((void) 0) +#define xc_slow_answer_inc() ((void) 0) + +#endif /* XIDCACHE_DEBUG */ + + +/* + * Report shared-memory space needed by CreateSharedProcArray. + */ +int +ProcArrayShmemSize(int maxBackends) +{ + /* sizeof(ProcArrayStruct) includes the first array element */ + return MAXALIGN(sizeof(ProcArrayStruct) + + (maxBackends - 1) * sizeof(PGPROC *)); +} + +/* + * Initialize the shared PGPROC array during postmaster startup. + */ +void +CreateSharedProcArray(int maxBackends) +{ + bool found; + + /* Create or attach to the ProcArray shared structure */ + procArray = (ProcArrayStruct *) + ShmemInitStruct("Proc Array", ProcArrayShmemSize(maxBackends), + &found); + + if (!found) + { + /* + * We're the first - initialize. + */ + procArray->numProcs = 0; + procArray->maxProcs = maxBackends; + } +} + +/* + * Add my own PGPROC (found in the global MyProc) to the shared array. + * + * This must be called during backend startup, after fully initializing + * the contents of MyProc. + */ +void +ProcArrayAddMyself(void) +{ + ProcArrayStruct *arrayP = procArray; + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + if (arrayP->numProcs >= arrayP->maxProcs) + { + /* + * Ooops, no room. (This really shouldn't happen, since there is + * a fixed supply of PGPROC structs too, and so we should have + * failed earlier.) + */ + LWLockRelease(ProcArrayLock); + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many clients already"))); + } + + arrayP->procs[arrayP->numProcs] = MyProc; + arrayP->numProcs++; + + LWLockRelease(ProcArrayLock); +} + +/* + * Remove my own PGPROC (found in the global MyProc) from the shared array. + * + * This must be called during backend shutdown. + */ +void +ProcArrayRemoveMyself(void) +{ + ProcArrayStruct *arrayP = procArray; + int index; + +#ifdef XIDCACHE_DEBUG + DisplayXidCache(); +#endif + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + for (index = 0; index < arrayP->numProcs; index++) + { + if (arrayP->procs[index] == MyProc) + { + arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1]; + arrayP->numProcs--; + LWLockRelease(ProcArrayLock); + return; + } + } + + /* Ooops */ + LWLockRelease(ProcArrayLock); + + elog(LOG, "failed to find my own proc %p in ProcArray", MyProc); +} + + +/* + * TransactionIdIsInProgress -- is given transaction running in some backend + * + * There are three possibilities for finding a running transaction: + * + * 1. the given Xid is a main transaction Id. We will find this out cheaply + * by looking at the PGPROC struct for each backend. + * + * 2. the given Xid is one of the cached subxact Xids in the PGPROC array. + * We can find this out cheaply too. + * + * 3. Search the SubTrans tree to find the Xid's topmost parent, and then + * see if that is running according to PGPROC. This is the slowest, but + * sadly it has to be done always if the other two failed, unless we see + * that the cached subxact sets are complete (none have overflowed). + * + * ProcArrayLock has to be held while we do 1 and 2. If we save the top Xids + * while doing 1, we can release the ProcArrayLock while we do 3. This buys + * back some concurrency (we can't retrieve the main Xids from PGPROC again + * anyway; see GetNewTransactionId). + */ +bool +TransactionIdIsInProgress(TransactionId xid) +{ + bool result = false; + ProcArrayStruct *arrayP = procArray; + int i, + j; + int nxids = 0; + TransactionId *xids; + TransactionId topxid; + bool locked; + + /* + * Don't bother checking a transaction older than RecentXmin; it + * could not possibly still be running. + */ + if (TransactionIdPrecedes(xid, RecentXmin)) + { + xc_by_recent_xmin_inc(); + return false; + } + + /* Get workspace to remember main XIDs in */ + xids = (TransactionId *) palloc(sizeof(TransactionId) * arrayP->maxProcs); + + LWLockAcquire(ProcArrayLock, LW_SHARED); + locked = true; + + for (i = 0; i < arrayP->numProcs; i++) + { + PGPROC *proc = arrayP->procs[i]; + + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId pxid = proc->xid; + + if (!TransactionIdIsValid(pxid)) + continue; + + /* + * Step 1: check the main Xid + */ + if (TransactionIdEquals(pxid, xid)) + { + xc_by_main_xid_inc(); + result = true; + goto result_known; + } + + /* + * We can ignore main Xids that are younger than the target + * Xid, since the target could not possibly be their child. + */ + if (TransactionIdPrecedes(xid, pxid)) + continue; + + /* + * Step 2: check the cached child-Xids arrays + */ + for (j = proc->subxids.nxids - 1; j >= 0; j--) + { + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId cxid = proc->subxids.xids[j]; + + if (TransactionIdEquals(cxid, xid)) + { + xc_by_child_xid_inc(); + result = true; + goto result_known; + } + } + + /* + * Save the main Xid for step 3. We only need to remember + * main Xids that have uncached children. (Note: there is no + * race condition here because the overflowed flag cannot be + * cleared, only set, while we hold ProcArrayLock. So we can't + * miss an Xid that we need to worry about.) + */ + if (proc->subxids.overflowed) + xids[nxids++] = pxid; + } + + LWLockRelease(ProcArrayLock); + locked = false; + + /* + * If none of the relevant caches overflowed, we know the Xid is not + * running without looking at pg_subtrans. + */ + if (nxids == 0) + goto result_known; + + /* + * Step 3: have to check pg_subtrans. + * + * At this point, we know it's either a subtransaction of one of the Xids + * in xids[], or it's not running. If it's an already-failed + * subtransaction, we want to say "not running" even though its parent + * may still be running. So first, check pg_clog to see if it's been + * aborted. + */ + xc_slow_answer_inc(); + + if (TransactionIdDidAbort(xid)) + goto result_known; + + /* + * It isn't aborted, so check whether the transaction tree it belongs + * to is still running (or, more precisely, whether it was running + * when this routine started -- note that we already released + * ProcArrayLock). + */ + topxid = SubTransGetTopmostTransaction(xid); + Assert(TransactionIdIsValid(topxid)); + if (!TransactionIdEquals(topxid, xid)) + { + for (i = 0; i < nxids; i++) + { + if (TransactionIdEquals(xids[i], topxid)) + { + result = true; + break; + } + } + } + +result_known: + if (locked) + LWLockRelease(ProcArrayLock); + + pfree(xids); + + return result; +} + +/* + * GetOldestXmin -- returns oldest transaction that was running + * when any current transaction was started. + * + * If allDbs is TRUE then all backends are considered; if allDbs is FALSE + * then only backends running in my own database are considered. + * + * This is used by VACUUM to decide which deleted tuples must be preserved + * in a table. allDbs = TRUE is needed for shared relations, but allDbs = + * FALSE is sufficient for non-shared relations, since only backends in my + * own database could ever see the tuples in them. + * + * This is also used to determine where to truncate pg_subtrans. allDbs + * must be TRUE for that case. + * + * Note: we include the currently running xids in the set of considered xids. + * This ensures that if a just-started xact has not yet set its snapshot, + * when it does set the snapshot it cannot set xmin less than what we compute. + */ +TransactionId +GetOldestXmin(bool allDbs) +{ + ProcArrayStruct *arrayP = procArray; + TransactionId result; + int index; + + /* + * Normally we start the min() calculation with our own XID. But if + * called by checkpointer, we will not be inside a transaction, so use + * next XID as starting point for min() calculation. (Note that if + * there are no xacts running at all, that will be the subtrans + * truncation point!) + */ + if (IsTransactionState()) + result = GetTopTransactionId(); + else + result = ReadNewTransactionId(); + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + + if (allDbs || proc->databaseId == MyDatabaseId) + { + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId xid = proc->xid; + + if (TransactionIdIsNormal(xid)) + { + if (TransactionIdPrecedes(xid, result)) + result = xid; + xid = proc->xmin; + if (TransactionIdIsNormal(xid)) + if (TransactionIdPrecedes(xid, result)) + result = xid; + } + } + } + + LWLockRelease(ProcArrayLock); + + return result; +} + +/*---------- + * GetSnapshotData -- returns information about running transactions. + * + * The returned snapshot includes xmin (lowest still-running xact ID), + * xmax (next xact ID to be assigned), and a list of running xact IDs + * in the range xmin <= xid < xmax. It is used as follows: + * All xact IDs < xmin are considered finished. + * All xact IDs >= xmax are considered still running. + * For an xact ID xmin <= xid < xmax, consult list to see whether + * it is considered running or not. + * This ensures that the set of transactions seen as "running" by the + * current xact will not change after it takes the snapshot. + * + * Note that only top-level XIDs are included in the snapshot. We can + * still apply the xmin and xmax limits to subtransaction XIDs, but we + * need to work a bit harder to see if XIDs in [xmin..xmax) are running. + * + * We also update the following backend-global variables: + * TransactionXmin: the oldest xmin of any snapshot in use in the + * current transaction (this is the same as MyProc->xmin). This + * is just the xmin computed for the first, serializable snapshot. + * RecentXmin: the xmin computed for the most recent snapshot. XIDs + * older than this are known not running any more. + * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all + * running transactions). This is the same computation done by + * GetOldestXmin(TRUE). + *---------- + */ +Snapshot +GetSnapshotData(Snapshot snapshot, bool serializable) +{ + ProcArrayStruct *arrayP = procArray; + TransactionId xmin; + TransactionId xmax; + TransactionId globalxmin; + int index; + int count = 0; + + Assert(snapshot != NULL); + + /* Serializable snapshot must be computed before any other... */ + Assert(serializable ? + !TransactionIdIsValid(MyProc->xmin) : + TransactionIdIsValid(MyProc->xmin)); + + /* + * Allocating space for MaxBackends xids is usually overkill; + * lastBackend would be sufficient. But it seems better to do the + * malloc while not holding the lock, so we can't look at lastBackend. + * + * This does open a possibility for avoiding repeated malloc/free: since + * MaxBackends does not change at runtime, we can simply reuse the + * previous xip array if any. (This relies on the fact that all + * callers pass static SnapshotData structs.) + */ + if (snapshot->xip == NULL) + { + /* + * First call for this snapshot + */ + snapshot->xip = (TransactionId *) + malloc(MaxBackends * sizeof(TransactionId)); + if (snapshot->xip == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + globalxmin = xmin = GetTopTransactionId(); + + /* + * If we are going to set MyProc->xmin then we'd better get exclusive + * lock; if not, this is a read-only operation so it can be shared. + */ + LWLockAcquire(ProcArrayLock, serializable ? LW_EXCLUSIVE : LW_SHARED); + + /*-------------------- + * Unfortunately, we have to call ReadNewTransactionId() after acquiring + * ProcArrayLock above. It's not good because ReadNewTransactionId() does + * LWLockAcquire(XidGenLock), but *necessary*. We need to be sure that + * no transactions exit the set of currently-running transactions + * between the time we fetch xmax and the time we finish building our + * snapshot. Otherwise we could have a situation like this: + * + * 1. Tx Old is running (in Read Committed mode). + * 2. Tx S reads new transaction ID into xmax, then + * is swapped out before acquiring ProcArrayLock. + * 3. Tx New gets new transaction ID (>= S' xmax), + * makes changes and commits. + * 4. Tx Old changes some row R changed by Tx New and commits. + * 5. Tx S finishes getting its snapshot data. It sees Tx Old as + * done, but sees Tx New as still running (since New >= xmax). + * + * Now S will see R changed by both Tx Old and Tx New, *but* does not + * see other changes made by Tx New. If S is supposed to be in + * Serializable mode, this is wrong. + * + * By locking ProcArrayLock before we read xmax, we ensure that TX Old + * cannot exit the set of running transactions seen by Tx S. Therefore + * both Old and New will be seen as still running => no inconsistency. + *-------------------- + */ + + xmax = ReadNewTransactionId(); + + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId xid = proc->xid; + + /* + * Ignore my own proc (dealt with my xid above), procs not + * running a transaction, and xacts started since we read the + * next transaction ID. There's no need to store XIDs above + * what we got from ReadNewTransactionId, since we'll treat + * them as running anyway. We also assume that such xacts + * can't compute an xmin older than ours, so they needn't be + * considered in computing globalxmin. + */ + if (proc == MyProc || + !TransactionIdIsNormal(xid) || + TransactionIdFollowsOrEquals(xid, xmax)) + continue; + + if (TransactionIdPrecedes(xid, xmin)) + xmin = xid; + snapshot->xip[count] = xid; + count++; + + /* Update globalxmin to be the smallest valid xmin */ + xid = proc->xmin; + if (TransactionIdIsNormal(xid)) + if (TransactionIdPrecedes(xid, globalxmin)) + globalxmin = xid; + } + + if (serializable) + MyProc->xmin = TransactionXmin = xmin; + + LWLockRelease(ProcArrayLock); + + /* + * Update globalxmin to include actual process xids. This is a + * slightly different way of computing it than GetOldestXmin uses, but + * should give the same result. + */ + if (TransactionIdPrecedes(xmin, globalxmin)) + globalxmin = xmin; + + /* Update global variables too */ + RecentGlobalXmin = globalxmin; + RecentXmin = xmin; + + snapshot->xmin = xmin; + snapshot->xmax = xmax; + snapshot->xcnt = count; + + snapshot->curcid = GetCurrentCommandId(); + + return snapshot; +} + +/* + * DatabaseHasActiveBackends -- are there any backends running in the given DB + * + * If 'ignoreMyself' is TRUE, ignore this particular backend while checking + * for backends in the target database. + * + * This function is used to interlock DROP DATABASE against there being + * any active backends in the target DB --- dropping the DB while active + * backends remain would be a Bad Thing. Note that we cannot detect here + * the possibility of a newly-started backend that is trying to connect + * to the doomed database, so additional interlocking is needed during + * backend startup. + */ +bool +DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself) +{ + bool result = false; + ProcArrayStruct *arrayP = procArray; + int index; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + + if (proc->databaseId == databaseId) + { + if (ignoreMyself && proc == MyProc) + continue; + + result = true; + break; + } + } + + LWLockRelease(ProcArrayLock); + + return result; +} + +/* + * BackendPidGetProc -- get a backend's PGPROC given its PID + */ +struct PGPROC * +BackendPidGetProc(int pid) +{ + PGPROC *result = NULL; + ProcArrayStruct *arrayP = procArray; + int index; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + + if (proc->pid == pid) + { + result = proc; + break; + } + } + + LWLockRelease(ProcArrayLock); + + return result; +} + +/* + * IsBackendPid -- is a given pid a running backend + */ +bool +IsBackendPid(int pid) +{ + return (BackendPidGetProc(pid) != NULL); +} + +/* + * CountActiveBackends --- count backends (other than myself) that are in + * active transactions. This is used as a heuristic to decide if + * a pre-XLOG-flush delay is worthwhile during commit. + * + * An active transaction is something that has written at least one XLOG + * record; read-only transactions don't count. Also, do not count backends + * that are blocked waiting for locks, since they are not going to get to + * run until someone else commits. + */ +int +CountActiveBackends(void) +{ + ProcArrayStruct *arrayP = procArray; + int count = 0; + int index; + + /* + * Note: for speed, we don't acquire ProcArrayLock. This is a little bit + * bogus, but since we are only testing xrecoff for zero or nonzero, + * it should be OK. The result is only used for heuristic purposes + * anyway... + */ + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + + if (proc == MyProc) + continue; /* do not count myself */ + if (proc->logRec.xrecoff == 0) + continue; /* do not count if not in a transaction */ + if (proc->waitLock != NULL) + continue; /* do not count if blocked on a lock */ + count++; + } + + return count; +} + +/* + * CountEmptyBackendSlots - count empty slots in backend process table + * + * Acquiring the lock here is almost certainly overkill, but just in + * case fetching an int is not atomic on your machine ... + */ +int +CountEmptyBackendSlots(void) +{ + int count; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + count = procArray->maxProcs - procArray->numProcs; + + LWLockRelease(ProcArrayLock); + + return count; +} + +#define XidCacheRemove(i) \ + do { \ + MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \ + MyProc->subxids.nxids--; \ + } while (0) + +/* + * XidCacheRemoveRunningXids + * + * Remove a bunch of TransactionIds from the list of known-running + * subtransactions for my backend. Both the specified xid and those in + * the xids[] array (of length nxids) are removed from the subxids cache. + */ +void +XidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids) +{ + int i, + j; + + Assert(!TransactionIdEquals(xid, InvalidTransactionId)); + + /* + * We must hold ProcArrayLock exclusively in order to remove transactions + * from the PGPROC array. (See notes in GetSnapshotData.) It's + * possible this could be relaxed since we know this routine is only + * used to abort subtransactions, but pending closer analysis we'd + * best be conservative. + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + /* + * Under normal circumstances xid and xids[] will be in increasing + * order, as will be the entries in subxids. Scan backwards to avoid + * O(N^2) behavior when removing a lot of xids. + */ + for (i = nxids - 1; i >= 0; i--) + { + TransactionId anxid = xids[i]; + + for (j = MyProc->subxids.nxids - 1; j >= 0; j--) + { + if (TransactionIdEquals(MyProc->subxids.xids[j], anxid)) + { + XidCacheRemove(j); + break; + } + } + /* + * Ordinarily we should have found it, unless the cache has overflowed. + * However it's also possible for this routine to be invoked multiple + * times for the same subtransaction, in case of an error during + * AbortSubTransaction. So instead of Assert, emit a debug warning. + */ + if (j < 0 && !MyProc->subxids.overflowed) + elog(WARNING, "did not find subXID %u in MyProc", anxid); + } + + for (j = MyProc->subxids.nxids - 1; j >= 0; j--) + { + if (TransactionIdEquals(MyProc->subxids.xids[j], xid)) + { + XidCacheRemove(j); + break; + } + } + /* Ordinarily we should have found it, unless the cache has overflowed */ + if (j < 0 && !MyProc->subxids.overflowed) + elog(WARNING, "did not find subXID %u in MyProc", xid); + + LWLockRelease(ProcArrayLock); +} + +#ifdef XIDCACHE_DEBUG + +/* + * Print stats about effectiveness of XID cache + */ +static void +DisplayXidCache(void) +{ + fprintf(stderr, + "XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n", + xc_by_recent_xmin, + xc_by_main_xid, + xc_by_child_xid, + xc_slow_answer); +} + +#endif /* XIDCACHE_DEBUG */ |