aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/ipc/sinval.c59
-rw-r--r--src/backend/storage/ipc/sinvaladt.c921
-rw-r--r--src/include/storage/lock.h4
-rw-r--r--src/include/storage/sinvaladt.h152
4 files changed, 316 insertions, 820 deletions
diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c
index e993cef74aa..c1a557033b6 100644
--- a/src/backend/storage/ipc/sinval.c
+++ b/src/backend/storage/ipc/sinval.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.17 1999/09/04 18:36:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.18 1999/09/06 19:37:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -21,12 +21,6 @@
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
-extern SISeg *shmInvalBuffer; /* the shared buffer segment, set by
- * SISegmentAttach()
- */
-extern BackendId MyBackendId;
-extern BackendTag MyBackendTag;
-
SPINLOCK SInvalLock = (SPINLOCK) NULL;
/****************************************************************************/
@@ -39,11 +33,6 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
{
int status;
- /*
- * REMOVED SISyncKill(IPCKeyGetSIBufferMemorySemaphoreKey(key));
- * SISyncInit(IPCKeyGetSIBufferMemorySemaphoreKey(key));
- */
-
/* SInvalLock gets set in spin.c, during spinlock init */
status = SISegmentInit(true, IPCKeyGetSIBufferMemoryBlock(key),
maxBackends);
@@ -53,9 +42,9 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
}
/****************************************************************************/
-/* AttachSharedInvalidationState(key) Attach a buffer segment */
+/* AttachSharedInvalidationState(key) Attach to existing buffer segment */
/* */
-/* should be called only by the POSTMASTER */
+/* should be called by each backend during startup */
/****************************************************************************/
void
AttachSharedInvalidationState(IPCKey key)
@@ -74,6 +63,11 @@ AttachSharedInvalidationState(IPCKey key)
elog(FATAL, "AttachSharedInvalidationState: failed segment init");
}
+/*
+ * InitSharedInvalidationState
+ * Initialize new backend's state info in buffer segment.
+ * Must be called after AttachSharedInvalidationState().
+ */
void
InitSharedInvalidationState(void)
{
@@ -88,24 +82,19 @@ InitSharedInvalidationState(void)
/*
* RegisterSharedInvalid
- * Returns a new local cache invalidation state containing a new entry.
+ * Add a shared-cache-invalidation message to the global SI message queue.
*
* Note:
* Assumes hash index is valid.
* Assumes item pointer is valid.
*/
-/****************************************************************************/
-/* RegisterSharedInvalid(cacheId, hashIndex, pointer) */
-/* */
-/* register a message in the buffer */
-/* should be called by a backend */
-/****************************************************************************/
void
RegisterSharedInvalid(int cacheId, /* XXX */
Index hashIndex,
ItemPointer pointer)
{
- SharedInvalidData newInvalid;
+ SharedInvalidData newInvalid;
+ bool insertOK;
/*
* This code has been hacked to accept two types of messages. This
@@ -127,34 +116,16 @@ RegisterSharedInvalid(int cacheId, /* XXX */
ItemPointerSetInvalid(&newInvalid.pointerData);
SpinAcquire(SInvalLock);
- while (!SISetDataEntry(shmInvalBuffer, &newInvalid))
- {
- /* buffer full */
- /* release a message, mark process cache states to be invalid */
- SISetProcStateInvalid(shmInvalBuffer);
-
- if (!SIDelDataEntries(shmInvalBuffer, 1))
- {
- /* inconsistent buffer state -- shd never happen */
- SpinRelease(SInvalLock);
- elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state");
- }
-
- /* loop around to try write again */
- }
+ insertOK = SIInsertDataEntry(shmInvalBuffer, &newInvalid);
SpinRelease(SInvalLock);
+ if (! insertOK)
+ elog(NOTICE, "RegisterSharedInvalid: SI buffer overflow");
}
/*
* InvalidateSharedInvalid
- * Processes all entries in a shared cache invalidation state.
+ * Process shared-cache-invalidation messages waiting for this backend
*/
-/****************************************************************************/
-/* InvalidateSharedInvalid(invalFunction, resetFunction) */
-/* */
-/* invalidate a message in the buffer (read and clean up) */
-/* should be called by a backend */
-/****************************************************************************/
void
InvalidateSharedInvalid(void (*invalFunction) (),
void (*resetFunction) ())
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2e64d027f31..99426693cd1 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.25 1999/09/06 19:37:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,648 +16,313 @@
#include "postgres.h"
+#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/lmgr.h"
+#include "storage/sinvaladt.h"
#include "utils/trace.h"
-/* ----------------
- * global variable notes
- *
- * SharedInvalidationSemaphore
- *
- * shmInvalBuffer
- * the shared buffer segment, set by SISegmentAttach()
- *
- * MyBackendId
- * might be removed later, used only for
- * debugging in debug routines (end of file)
- *
- * SIDbId
- * identification of buffer (disappears)
- *
- * SIRelId \
- * SIDummyOid \ identification of buffer
- * SIXidData /
- * SIXid /
- *
- * XXX This file really needs to be cleaned up. We switched to using
- * spinlocks to protect critical sections (as opposed to using fake
- * relations and going through the lock manager) and some of the old
- * cruft was 'ifdef'ed out, while other parts (now unused) are still
- * compiled into the system. -mer 5/24/92
- * ----------------
- */
-#ifdef HAS_TEST_AND_SET
-int SharedInvalidationLockId;
-
-#else
-IpcSemaphoreId SharedInvalidationSemaphore;
-
-#endif
-
SISeg *shmInvalBuffer;
-extern BackendId MyBackendId;
-static void CleanupInvalidationState(int status, SISeg *segInOutP);
-static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag);
-static int SIGetNumEntries(SISeg *segP);
+static void SISegmentAttach(IpcMemoryId shmid);
+static void SISegInit(SISeg *segP, int maxBackends);
+static void CleanupInvalidationState(int status, SISeg *segP);
+static void SISetProcStateInvalid(SISeg *segP);
-/************************************************************************/
-/* SISetActiveProcess(segP, backendId) set the backend status active */
-/* should be called only by the postmaster when creating a backend */
-/************************************************************************/
-/* XXX I suspect that the segP parameter is extraneous. -hirohama */
-static void
-SISetActiveProcess(SISeg *segInOutP, BackendId backendId)
-{
- /* mark all messages as read */
-
- /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */
-
- segInOutP->procState[backendId - 1].resetState = false;
- segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP);
-}
-
-/****************************************************************************/
-/* SIBackendInit() initializes a backend to operate on the buffer */
-/****************************************************************************/
+/*
+ * SISegmentInit
+ * Create a new SI memory segment, or attach to an existing one
+ *
+ * This is called with createNewSegment = true by the postmaster (or by
+ * a standalone backend), and subsequently with createNewSegment = false
+ * by backends started by the postmaster.
+ *
+ * Note: maxBackends param is only valid when createNewSegment is true
+ */
int
-SIBackendInit(SISeg *segInOutP)
+SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends)
{
- LockRelId LtCreateRelId();
- TransactionId LMITransactionIdCopy();
-
- Assert(MyBackendTag > 0);
-
- MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag);
- if (MyBackendId == InvalidBackendTag)
- return 0;
-
-#ifdef INVALIDDEBUG
- elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
- MyBackendTag, MyBackendId);
-#endif /* INVALIDDEBUG */
+ int segSize;
+ IpcMemoryId shmId;
- SISetActiveProcess(segInOutP, MyBackendId);
- on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP);
- return 1;
-}
+ if (createNewSegment)
+ {
+ /* Kill existing segment, if any */
+ IpcMemoryKill(key);
-/* ----------------
- * SIAssignBackendId
- * ----------------
- */
-static BackendId
-SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag)
-{
- Index index;
- ProcState *stateP = NULL;
+ /* Figure space needed.
+ * Note sizeof(SISeg) includes the first ProcState entry.
+ */
+ segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
- for (index = 0; index < segInOutP->maxBackends; index++)
- {
- if (segInOutP->procState[index].tag == InvalidBackendTag ||
- segInOutP->procState[index].tag == backendTag)
+ /* Get a shared segment */
+ shmId = IpcMemoryCreate(key, segSize, IPCProtection);
+ if (shmId < 0)
{
- stateP = &segInOutP->procState[index];
- break;
+ perror("SISegmentInit: segment create failed");
+ return -1; /* an error */
}
- if (!PointerIsValid(stateP) ||
- (segInOutP->procState[index].resetState &&
- (!stateP->resetState ||
- stateP->tag < backendTag)) ||
- (!stateP->resetState &&
- (segInOutP->procState[index].limit <
- stateP->limit ||
- stateP->tag < backendTag)))
- stateP = &segInOutP->procState[index];
- }
-
- /* verify that all "procState" entries checked for matching tags */
+ /* Attach to the shared cache invalidation segment */
+ /* sets the global variable shmInvalBuffer */
+ SISegmentAttach(shmId);
- for (index++; index < segInOutP->maxBackends; index++)
- {
- if (segInOutP->procState[index].tag == backendTag)
- elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag);
+ /* Init shared memory contents */
+ SISegInit(shmInvalBuffer, maxBackends);
}
-
- Assert(stateP);
-
- if (stateP->tag != InvalidBackendTag)
+ else
{
- if (stateP->tag == backendTag)
- elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag);
- else
+ /* find existing segment */
+ shmId = IpcMemoryIdGet(key, 0);
+ if (shmId < 0)
{
- elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag);
- return InvalidBackendTag;
+ perror("SISegmentInit: segment get failed");
+ return -1; /* an error */
}
- }
-
- stateP->tag = backendTag;
-
- return 1 + stateP - &segInOutP->procState[0];
-}
-
-
-/************************************************************************/
-/* The following function should be called only by the postmaster !! */
-/************************************************************************/
-/************************************************************************/
-/* SISetDeadProcess(segP, backendId) set the backend status DEAD */
-/* should be called only by the postmaster when a backend died */
-/************************************************************************/
-static void
-SISetDeadProcess(SISeg *segP, int backendId)
-{
- /* XXX call me.... */
-
- segP->procState[backendId - 1].resetState = false;
- segP->procState[backendId - 1].limit = -1;
- segP->procState[backendId - 1].tag = InvalidBackendTag;
+ /* Attach to the shared cache invalidation segment */
+ /* sets the global variable shmInvalBuffer */
+ SISegmentAttach(shmId);
+ }
+ return 1;
}
/*
- * CleanupInvalidationState
- * Note:
- * This is a temporary hack. ExitBackend should call this instead
- * of exit (via on_shmem_exit).
+ * SISegmentAttach
+ * Attach to specified shared memory segment
*/
static void
-CleanupInvalidationState(int status, /* XXX */
- SISeg *segInOutP) /* XXX style */
-{
- Assert(PointerIsValid(segInOutP));
-
- SISetDeadProcess(segInOutP, MyBackendId);
-}
-
-
-/************************************************************************/
-/* SIComputeSize() - compute size and offsets for SI segment */
-/************************************************************************/
-static void
-SIComputeSize(SISegOffsets *oP, int maxBackends)
-{
- int A,
- B,
- a,
- b,
- totalSize;
-
- A = 0;
- /* sizeof(SISeg) includes the first ProcState entry */
- a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
- a = MAXALIGN(a); /* offset to first data entry */
- b = sizeof(SISegEntry) * MAXNUMMESSAGES;
- B = A + a + b;
- B = MAXALIGN(B);
- totalSize = B - A;
-
- oP->startSegment = A;
- oP->offsetToFirstEntry = a; /* relative to A */
- oP->offsetToEndOfSegment = totalSize; /* relative to A */
-}
-
-
-/************************************************************************/
-/* SISetStartEntrySection(segP, offset) - sets the offset */
-/************************************************************************/
-static void
-SISetStartEntrySection(SISeg *segP, Offset offset)
-{
- segP->startEntrySection = offset;
-}
-
-/************************************************************************/
-/* SIGetStartEntrySection(segP) - returnss the offset */
-/************************************************************************/
-static Offset
-SIGetStartEntrySection(SISeg *segP)
+SISegmentAttach(IpcMemoryId shmid)
{
- return segP->startEntrySection;
-}
+ shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid);
-
-/************************************************************************/
-/* SISetEndEntrySection(segP, offset) - sets the offset */
-/************************************************************************/
-static void
-SISetEndEntrySection(SISeg *segP, Offset offset)
-{
- segP->endEntrySection = offset;
+ if (shmInvalBuffer == IpcMemAttachFailed)
+ {
+ /* XXX use validity function */
+ elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
+ }
}
-/************************************************************************/
-/* SISetEndEntryChain(segP, offset) - sets the offset */
-/************************************************************************/
+/*
+ * SISegInit
+ * Initialize contents of a new shared memory sinval segment
+ */
static void
-SISetEndEntryChain(SISeg *segP, Offset offset)
+SISegInit(SISeg *segP, int maxBackends)
{
- segP->endEntryChain = offset;
-}
-
-/************************************************************************/
-/* SIGetEndEntryChain(segP) - returnss the offset */
-/************************************************************************/
-static Offset
-SIGetEndEntryChain(SISeg *segP)
-{
- return segP->endEntryChain;
-}
+ int i;
-/************************************************************************/
-/* SISetStartEntryChain(segP, offset) - sets the offset */
-/************************************************************************/
-static void
-SISetStartEntryChain(SISeg *segP, Offset offset)
-{
- segP->startEntryChain = offset;
-}
+ /* Clear message counters, save size of procState array */
+ segP->minMsgNum = 0;
+ segP->maxMsgNum = 0;
+ segP->maxBackends = maxBackends;
-/************************************************************************/
-/* SIGetStartEntryChain(segP) - returns the offset */
-/************************************************************************/
-static Offset
-SIGetStartEntryChain(SISeg *segP)
-{
- return segP->startEntryChain;
-}
+ /* The buffer[] array is initially all unused, so we need not fill it */
-/************************************************************************/
-/* SISetNumEntries(segP, num) sets the current nuber of entries */
-/************************************************************************/
-static bool
-SISetNumEntries(SISeg *segP, int num)
-{
- if (num <= MAXNUMMESSAGES)
- {
- segP->numEntries = num;
- return true;
- }
- else
+ /* Mark all backends inactive */
+ for (i = 0; i < maxBackends; i++)
{
- return false; /* table full */
+ segP->procState[i].nextMsgNum = -1; /* inactive */
+ segP->procState[i].resetState = false;
+ segP->procState[i].tag = InvalidBackendTag;
}
}
-/************************************************************************/
-/* SIGetNumEntries(segP) - returns the current nuber of entries */
-/************************************************************************/
-static int
-SIGetNumEntries(SISeg *segP)
-{
- return segP->numEntries;
-}
-
-
-/************************************************************************/
-/* SISetMaxNumEntries(segP, num) sets the maximal number of entries */
-/************************************************************************/
-static bool
-SISetMaxNumEntries(SISeg *segP, int num)
+/*
+ * SIBackendInit
+ * Initialize a new backend to operate on the sinval buffer
+ *
+ * NB: this routine, and all following ones, must be executed with the
+ * SInvalLock spinlock held, since there may be multiple backends trying
+ * to access the buffer.
+ */
+int
+SIBackendInit(SISeg *segP)
{
- if (num <= MAXNUMMESSAGES)
- {
- segP->maxNumEntries = num;
- return true;
- }
- else
- {
- return false; /* wrong number */
- }
-}
-
-
-/************************************************************************/
-/* SIGetProcStateLimit(segP, i) returns the limit of read messages */
-/************************************************************************/
-
-#define SIGetProcStateLimit(segP,i) \
- ((segP)->procState[i].limit)
+ Index index;
+ ProcState *stateP = NULL;
-/************************************************************************/
-/* SIIncNumEntries(segP, num) increments the current nuber of entries */
-/************************************************************************/
-static bool
-SIIncNumEntries(SISeg *segP, int num)
-{
+ Assert(MyBackendTag > 0);
- /*
- * Try to prevent table overflow. When the table is 70% full send a
- * SIGUSR2 to the postmaster which will send it back to all the
- * backends. This will be handled by Async_NotifyHandler() with a
- * StartTransactionCommand() which will flush unread SI entries for
- * each backend. dz - 27 Jan 1998
- */
- if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100))
+ /* Check for duplicate backend tags (should never happen) */
+ for (index = 0; index < segP->maxBackends; index++)
{
- TPRINTF(TRACE_VERBOSE,
- "SIIncNumEntries: table is 70%% full, signaling postmaster");
- kill(getppid(), SIGUSR2);
+ if (segP->procState[index].tag == MyBackendTag)
+ elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag);
}
- if ((segP->numEntries + num) <= MAXNUMMESSAGES)
- {
- segP->numEntries = segP->numEntries + num;
- return true;
- }
- else
+ /* Look for a free entry in the procState array */
+ for (index = 0; index < segP->maxBackends; index++)
{
- return false; /* table full */
+ if (segP->procState[index].tag == InvalidBackendTag)
+ {
+ stateP = &segP->procState[index];
+ break;
+ }
}
-}
-/************************************************************************/
-/* SIDecNumEntries(segP, num) decrements the current nuber of entries */
-/************************************************************************/
-static bool
-SIDecNumEntries(SISeg *segP, int num)
-{
- if ((segP->numEntries - num) >= 0)
- {
- segP->numEntries = segP->numEntries - num;
- return true;
- }
- else
+ /* elog() with spinlock held is probably not too cool, but these
+ * conditions should never happen anyway.
+ */
+ if (stateP == NULL)
{
- return false; /* not enough entries in table */
+ elog(NOTICE, "SIBackendInit: no free procState slot available");
+ MyBackendId = InvalidBackendTag;
+ return 0;
}
-}
-/************************************************************************/
-/* SISetStartFreeSpace(segP, offset) - sets the offset */
-/************************************************************************/
-static void
-SISetStartFreeSpace(SISeg *segP, Offset offset)
-{
- segP->startFreeSpace = offset;
-}
-
-/************************************************************************/
-/* SIGetStartFreeSpace(segP) - returns the offset */
-/************************************************************************/
-static Offset
-SIGetStartFreeSpace(SISeg *segP)
-{
- return segP->startFreeSpace;
-}
+ MyBackendId = (stateP - &segP->procState[0]) + 1;
+#ifdef INVALIDDEBUG
+ elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
+ MyBackendTag, MyBackendId);
+#endif /* INVALIDDEBUG */
+ /* mark myself active, with all extant messages already read */
+ stateP->tag = MyBackendTag;
+ stateP->resetState = false;
+ stateP->nextMsgNum = segP->maxMsgNum;
-/************************************************************************/
-/* SIGetFirstDataEntry(segP) returns first data entry */
-/************************************************************************/
-static SISegEntry *
-SIGetFirstDataEntry(SISeg *segP)
-{
- SISegEntry *eP;
- Offset startChain;
-
- startChain = SIGetStartEntryChain(segP);
-
- if (startChain == InvalidOffset)
- return NULL;
-
- eP = (SISegEntry *) ((Pointer) segP +
- SIGetStartEntrySection(segP) +
- startChain);
- return eP;
-}
-
-
-/************************************************************************/
-/* SIGetLastDataEntry(segP) returns last data entry in the chain */
-/************************************************************************/
-static SISegEntry *
-SIGetLastDataEntry(SISeg *segP)
-{
- SISegEntry *eP;
- Offset endChain;
-
- endChain = SIGetEndEntryChain(segP);
-
- if (endChain == InvalidOffset)
- return NULL;
-
- eP = (SISegEntry *) ((Pointer) segP +
- SIGetStartEntrySection(segP) +
- endChain);
- return eP;
-}
-
-/************************************************************************/
-/* SIGetNextDataEntry(segP, offset) returns next data entry */
-/************************************************************************/
-#define SIGetNextDataEntry(segP,offset) \
- (((offset) == InvalidOffset) ? (SISegEntry *) NULL : \
- (SISegEntry *) ((Pointer) (segP) + \
- (segP)->startEntrySection + \
- (Offset) (offset)))
-
-/************************************************************************/
-/* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */
-/************************************************************************/
-static SISegEntry *
-SIGetNthDataEntry(SISeg *segP,
- int n) /* must range from 1 to MaxMessages */
-{
- SISegEntry *eP;
- int i;
-
- if (n <= 0)
- return NULL;
-
- eP = SIGetFirstDataEntry(segP);
- for (i = 1; i < n; i++)
- {
- /* skip one and get the next */
- eP = SIGetNextDataEntry(segP, eP->next);
- }
-
- return eP;
-}
-
-/************************************************************************/
-/* SIEntryOffset(segP, entryP) returns the offset for an pointer */
-/************************************************************************/
-static Offset
-SIEntryOffset(SISeg *segP, SISegEntry *entryP)
-{
- /* relative to B !! */
- return ((Offset) ((Pointer) entryP -
- (Pointer) segP -
- SIGetStartEntrySection(segP)));
-}
-
+ /* register exit routine to mark my entry inactive at exit */
+ on_shmem_exit(CleanupInvalidationState, (caddr_t) segP);
-/************************************************************************/
-/* SISetDataEntry(segP, data) - sets a message in the segemnt */
-/************************************************************************/
-bool
-SISetDataEntry(SISeg *segP, SharedInvalidData *data)
-{
- Offset offsetToNewData;
- SISegEntry *eP,
- *lastP;
-
- if (!SIIncNumEntries(segP, 1))
- return false; /* no space */
-
- /* get a free entry */
- offsetToNewData = SIGetStartFreeSpace(segP);
- eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */
- SISetStartFreeSpace(segP, eP->next);
- /* fill it up */
- eP->entryData = *data;
- eP->isfree = false;
- eP->next = InvalidOffset;
-
- /* handle insertion point at the end of the chain !! */
- lastP = SIGetLastDataEntry(segP);
- if (lastP == NULL)
- {
- /* there is no chain, insert the first entry */
- SISetStartEntryChain(segP, SIEntryOffset(segP, eP));
- }
- else
- {
- /* there is a last entry in the chain */
- lastP->next = SIEntryOffset(segP, eP);
- }
- SISetEndEntryChain(segP, SIEntryOffset(segP, eP));
- return true;
+ return 1;
}
-
-/************************************************************************/
-/* SIDecProcLimit(segP, num) decrements all process limits */
-/************************************************************************/
+/*
+ * CleanupInvalidationState
+ * Mark the current backend as no longer active.
+ *
+ * This function is called via on_shmem_exit() during backend shutdown.
+ */
static void
-SIDecProcLimit(SISeg *segP, int num)
+CleanupInvalidationState(int status,
+ SISeg *segP)
{
- int i;
+ Assert(PointerIsValid(segP));
- for (i = 0; i < segP->maxBackends; i++)
- {
- /* decrement only, if there is a limit > 0 */
- if (segP->procState[i].limit > 0)
- {
- segP->procState[i].limit = segP->procState[i].limit - num;
- if (segP->procState[i].limit < 0)
- {
- /* limit was not high enough, reset to zero */
- /* negative means it's a dead backend */
- segP->procState[i].limit = 0;
- }
- }
- }
-}
+ /* XXX we probably oughta grab the SInval spinlock for this...
+ * but I think it is safe not to.
+ */
+ segP->procState[MyBackendId - 1].nextMsgNum = -1;
+ segP->procState[MyBackendId - 1].resetState = false;
+ segP->procState[MyBackendId - 1].tag = InvalidBackendTag;
+}
-/************************************************************************/
-/* SIDelDataEntries(segP, n) - free the FIRST n entries */
-/************************************************************************/
+/*
+ * SIInsertDataEntry
+ * Add a new invalidation message to the buffer.
+ *
+ * If we are unable to insert the message because the buffer is full,
+ * then clear the buffer and assert the "reset" flag to each backend.
+ * This will cause all the backends to discard *all* invalidatable state.
+ *
+ * Returns true for normal successful insertion, false if had to reset.
+ */
bool
-SIDelDataEntries(SISeg *segP, int n)
+SIInsertDataEntry(SISeg *segP, SharedInvalidData *data)
{
- int i;
-
- if (n <= 0)
- return false;
+ int numMsgs = segP->maxMsgNum - segP->minMsgNum;
- if (!SIDecNumEntries(segP, n))
+ /* Is the buffer full? */
+ if (numMsgs >= MAXNUMMESSAGES)
{
- /* not that many entries in buffer */
+ /* Yes, so force reset */
+ SISetProcStateInvalid(segP);
return false;
}
- for (i = 1; i <= n; i++)
+ /*
+ * Try to prevent table overflow. When the table is 70% full send a
+ * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
+ * send it back to all the backends. This will force idle backends to
+ * execute a transaction to look through pg_listener for NOTIFY messages,
+ * and as a byproduct of the transaction start they will read SI entries.
+ *
+ * This should never happen if all the backends are actively executing
+ * queries, but if a backend is sitting idle then it won't be starting
+ * transactions and so won't be reading SI entries.
+ *
+ * dz - 27 Jan 1998
+ */
+ if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
+ IsUnderPostmaster)
{
- SISegEntry *e1P = SIGetFirstDataEntry(segP);
- SISetStartEntryChain(segP, e1P->next);
- if (SIGetStartEntryChain(segP) == InvalidOffset)
- {
- /* it was the last entry */
- SISetEndEntryChain(segP, InvalidOffset);
- }
- /* free the entry */
- e1P->isfree = true;
- e1P->next = SIGetStartFreeSpace(segP);
- SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
+ TPRINTF(TRACE_VERBOSE,
+ "SIInsertDataEntry: table is 70%% full, signaling postmaster");
+ kill(getppid(), SIGUSR2);
}
- SIDecProcLimit(segP, n);
+ /*
+ * Insert new message into proper slot of circular buffer
+ */
+ segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
+ segP->maxMsgNum++;
+
return true;
}
-
-
-/************************************************************************/
-/* SISetProcStateInvalid(segP) checks and marks a backends state as */
-/* invalid */
-/************************************************************************/
-void
+/*
+ * SISetProcStateInvalid
+ * Flush pending messages from buffer, assert reset flag for each backend
+ *
+ * This is used only to recover from SI buffer overflow.
+ */
+static void
SISetProcStateInvalid(SISeg *segP)
{
int i;
+ segP->minMsgNum = 0;
+ segP->maxMsgNum = 0;
+
for (i = 0; i < segP->maxBackends; i++)
{
- if (segP->procState[i].limit == 0)
+ if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
{
- /* backend i didn't read any message */
segP->procState[i].resetState = true;
-
- /*
- * XXX signal backend that it has to reset its internal cache
- * ?
- */
+ segP->procState[i].nextMsgNum = 0;
}
}
}
-/************************************************************************/
-/* SIGetDataEntry(segP, backendId, data) */
-/* get next SI message for specified backend, if there is one */
-/* */
-/* Possible return values: */
-/* 0: no SI message available */
-/* 1: next SI message has been extracted into *data */
-/* (there may be more messages available after this one!) */
-/* -1: SI reset message extracted */
-/************************************************************************/
+/*
+ * SIGetDataEntry
+ * get next SI message for specified backend, if there is one
+ *
+ * Possible return values:
+ * 0: no SI message available
+ * 1: next SI message has been extracted into *data
+ * (there may be more messages available after this one!)
+ * -1: SI reset message extracted
+ */
int
SIGetDataEntry(SISeg *segP, int backendId,
SharedInvalidData *data)
{
- SISegEntry *msg;
+ ProcState *stateP = & segP->procState[backendId - 1];
- Assert(segP->procState[backendId - 1].tag == MyBackendTag);
+ Assert(stateP->tag == MyBackendTag);
- if (segP->procState[backendId - 1].resetState)
+ if (stateP->resetState)
{
- /* new valid state--mark all messages "read" */
- segP->procState[backendId - 1].resetState = false;
- segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
+ /* Force reset. We can say we have dealt with any messages added
+ * since the reset, as well...
+ */
+ stateP->resetState = false;
+ stateP->nextMsgNum = segP->maxMsgNum;
return -1;
}
- /* Get next message for this backend, if any */
-
- /* This is fairly inefficient if there are many messages,
- * but normally there should not be...
- */
- msg = SIGetNthDataEntry(segP,
- SIGetProcStateLimit(segP, backendId - 1) + 1);
-
- if (msg == NULL)
+ if (stateP->nextMsgNum >= segP->maxMsgNum)
return 0; /* nothing to read */
- *data = msg->entryData; /* return contents of message */
-
- segP->procState[backendId - 1].limit++; /* one more message read */
+ /*
+ * Retrieve message and advance my counter.
+ */
+ *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+ stateP->nextMsgNum++;
/* There may be other backends that haven't read the message,
* so we cannot delete it here.
@@ -666,9 +331,10 @@ SIGetDataEntry(SISeg *segP, int backendId,
return 1; /* got a message */
}
-/************************************************************************/
-/* SIDelExpiredDataEntries (segP) - removes irrelevant messages */
-/************************************************************************/
+/*
+ * SIDelExpiredDataEntries
+ * Remove messages that have been consumed by all active backends
+ */
void
SIDelExpiredDataEntries(SISeg *segP)
{
@@ -676,161 +342,34 @@ SIDelExpiredDataEntries(SISeg *segP)
i,
h;
- min = 9999999;
+ min = segP->maxMsgNum;
+ if (min == segP->minMsgNum)
+ return; /* fast path if no messages exist */
+
+ /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
+
for (i = 0; i < segP->maxBackends; i++)
{
- h = SIGetProcStateLimit(segP, i);
+ h = segP->procState[i].nextMsgNum;
if (h >= 0)
{ /* backend active */
if (h < min)
min = h;
}
}
- if (min < 9999999 && min > 0)
- {
- /* we can remove min messages */
- /* this adjusts also the state limits! */
- if (!SIDelDataEntries(segP, min))
- elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
- }
-}
-
-
-
-/************************************************************************/
-/* SISegInit(segP) - initializes the segment */
-/************************************************************************/
-static void
-SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends)
-{
- int i;
- SISegEntry *eP;
-
- /* set semaphore ids in the segment */
- /* XXX */
- SISetStartEntrySection(segP, oP->offsetToFirstEntry);
- SISetEndEntrySection(segP, oP->offsetToEndOfSegment);
- SISetStartFreeSpace(segP, 0);
- SISetStartEntryChain(segP, InvalidOffset);
- SISetEndEntryChain(segP, InvalidOffset);
- SISetNumEntries(segP, 0);
- SISetMaxNumEntries(segP, MAXNUMMESSAGES);
- segP->maxBackends = maxBackends;
- for (i = 0; i < segP->maxBackends; i++)
- {
- segP->procState[i].limit = -1; /* no backend active !! */
- segP->procState[i].resetState = false;
- segP->procState[i].tag = InvalidBackendTag;
- }
- /* construct a chain of free entries */
- for (i = 1; i < MAXNUMMESSAGES; i++)
- {
- eP = (SISegEntry *) ((Pointer) segP +
- SIGetStartEntrySection(segP) +
- (i - 1) * sizeof(SISegEntry));
- eP->isfree = true;
- eP->next = i * sizeof(SISegEntry); /* relative to B */
- }
- /* handle the last free entry separate */
- eP = (SISegEntry *) ((Pointer) segP +
- SIGetStartEntrySection(segP) +
- (MAXNUMMESSAGES - 1) * sizeof(SISegEntry));
- eP->isfree = true;
- eP->next = InvalidOffset; /* it's the end of the chain !! */
-}
-
-
-
-/************************************************************************/
-/* SISegmentKill(key) - kill any segment */
-/************************************************************************/
-static void
-SISegmentKill(int key) /* the corresponding key for the segment */
-{
- IpcMemoryKill(key);
-}
-
-
-/************************************************************************/
-/* SISegmentGet(key, size) - get a shared segment of size <size> */
-/* returns a segment id */
-/************************************************************************/
-static IpcMemoryId
-SISegmentGet(int key, /* the corresponding key for the segment */
- int size, /* size of segment in bytes */
- bool create)
-{
- IpcMemoryId shmid;
-
- if (create)
- shmid = IpcMemoryCreate(key, size, IPCProtection);
- else
- shmid = IpcMemoryIdGet(key, size);
- return shmid;
-}
-
-/************************************************************************/
-/* SISegmentAttach(shmid) - attach a shared segment with id shmid */
-/************************************************************************/
-static void
-SISegmentAttach(IpcMemoryId shmid)
-{
- shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid);
- if (shmInvalBuffer == IpcMemAttachFailed)
- {
- /* XXX use validity function */
- elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
- }
-}
-
-
-/************************************************************************/
-/* SISegmentInit() initialize SI segment */
-/* */
-/* NB: maxBackends param is only valid when killExistingSegment is true */
-/************************************************************************/
-int
-SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends)
-{
- SISegOffsets offsets;
- IpcMemoryId shmId;
- bool create;
-
- if (killExistingSegment)
- {
- /* Kill existing segment */
- /* set semaphore */
- SISegmentKill(key);
-
- /* Get a shared segment */
- SIComputeSize(&offsets, maxBackends);
- create = true;
- shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create);
- if (shmId < 0)
- {
- perror("SISegmentGet: failed");
- return -1; /* an error */
- }
+ segP->minMsgNum = min;
- /* Attach the shared cache invalidation segment */
- /* sets the global variable shmInvalBuffer */
- SISegmentAttach(shmId);
-
- /* Init shared memory table */
- SISegInit(shmInvalBuffer, &offsets, maxBackends);
- }
- else
+ /* When minMsgNum gets really large, decrement all message counters
+ * so as to forestall overflow of the counters.
+ */
+ if (min >= MSGNUMWRAPAROUND)
{
- /* use an existing segment */
- create = false;
- shmId = SISegmentGet(key, 0, create);
- if (shmId < 0)
+ segP->minMsgNum -= MSGNUMWRAPAROUND;
+ segP->maxMsgNum -= MSGNUMWRAPAROUND;
+ for (i = 0; i < segP->maxBackends; i++)
{
- perror("SISegmentGet: getting an existent segment failed");
- return -1; /* an error */
+ if (segP->procState[i].nextMsgNum >= 0)
+ segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
- /* Attach the shared cache invalidation segment */
- SISegmentAttach(shmId);
}
- return 1;
}
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 87b8538212f..8f0f834e0f4 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -6,16 +6,16 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: lock.h,v 1.33 1999/07/16 17:07:38 momjian Exp $
+ * $Id: lock.h,v 1.34 1999/09/06 19:37:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef LOCK_H_
#define LOCK_H_
+#include "storage/ipc.h"
#include "storage/itemptr.h"
#include "storage/shmem.h"
-#include "storage/sinvaladt.h"
#include "utils/array.h"
extern SPINLOCK LockMgrLock;
diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h
index e008e52d30f..b9d349a4c57 100644
--- a/src/include/storage/sinvaladt.h
+++ b/src/include/storage/sinvaladt.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: sinvaladt.h,v 1.17 1999/09/04 18:36:44 tgl Exp $
+ * $Id: sinvaladt.h,v 1.18 1999/09/06 19:37:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,120 +17,106 @@
#include "storage/itemptr.h"
/*
- * The structure of the shared cache invaidation segment
+ * The shared cache invalidation manager is responsible for transmitting
+ * invalidation messages between backends. Any message sent by any backend
+ * must be delivered to all already-running backends before it can be
+ * forgotten.
*
+ * Conceptually, the messages are stored in an infinite array, where
+ * maxMsgNum is the next array subscript to store a submitted message in,
+ * minMsgNum is the smallest array subscript containing a message not yet
+ * read by all backends, and we always have maxMsgNum >= minMsgNum. (They
+ * are equal when there are no messages pending.) For each active backend,
+ * there is a nextMsgNum pointer indicating the next message it needs to read;
+ * we have maxMsgNum >= nextMsgNum >= minMsgNum for every backend.
+ *
+ * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
+ * entries. We translate MsgNum values into circular-buffer indexes by
+ * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
+ * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
+ * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
+ * in the buffer. If the buffer does overflow, we reset it to empty and
+ * force each backend to "reset", ie, discard all its invalidatable state.
+ *
+ * We would have problems if the MsgNum values overflow an integer, so
+ * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
+ * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
+ * large so that we don't need to do this often. It must be a multiple of
+ * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
+ * to be moved when we do it.
*/
-/*
-A------------- Header info --------------
- criticalSectionSemaphoreId
- generalSemaphoreId
- startEntrySection (offset a)
- endEntrySection (offset a + b)
- startFreeSpace (offset relative to B)
- startEntryChain (offset relatiev to B)
- endEntryChain (offset relative to B)
- numEntries
- maxNumEntries
- maxBackends
- procState[maxBackends] --> limit
- resetState (bool)
-a tag (POSTID)
-B------------- Start entry section -------
- SISegEntry --> entryData --> ... (see SharedInvalidData!)
- isfree (bool)
- next (offset to next entry in chain )
-b .... (dynamically growing down)
-C----------------End shared segment -------
-*/
-/* Parameters (configurable) *******************************************/
-#define MAXNUMMESSAGES 4000 /* maximum number of messages in seg */
+/*
+ * Configurable parameters.
+ *
+ * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
+ * Must be a power of 2 for speed.
+ *
+ * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
+ * Must be a multiple of MAXNUMMESSAGES. Should be large.
+ */
+#define MAXNUMMESSAGES 4096
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
-#define InvalidOffset 1000000000 /* a invalid offset (End of
- * chain) */
+/* The content of one shared-invalidation message */
+typedef struct SharedInvalidData
+{
+ int cacheId; /* XXX */
+ Index hashIndex;
+ ItemPointerData pointerData;
+} SharedInvalidData;
+
+typedef SharedInvalidData *SharedInvalid;
+/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
- int limit; /* the number of read messages */
+ /* nextMsgNum is -1 in an inactive ProcState array entry. */
+ int nextMsgNum; /* next message number to read, or -1 */
bool resetState; /* true, if backend has to reset its state */
- int tag; /* special tag, recieved from the
- * postmaster */
+ int tag; /* backend tag received from postmaster */
} ProcState;
-
+/* Shared cache invalidation memory segment */
typedef struct SISeg
{
- IpcSemaphoreId criticalSectionSemaphoreId; /* semaphore id */
- IpcSemaphoreId generalSemaphoreId; /* semaphore id */
- Offset startEntrySection; /* (offset a) */
- Offset endEntrySection;/* (offset a + b) */
- Offset startFreeSpace; /* (offset relative to B) */
- Offset startEntryChain;/* (offset relative to B) */
- Offset endEntryChain; /* (offset relative to B) */
- int numEntries;
- int maxNumEntries;
+ /*
+ * General state information
+ */
+ int minMsgNum; /* oldest message still needed */
+ int maxMsgNum; /* next message number to be assigned */
int maxBackends; /* size of procState array */
/*
+ * Circular buffer holding shared-inval messages
+ */
+ SharedInvalidData buffer[MAXNUMMESSAGES];
+ /*
+ * Per-backend state info.
+ *
* We declare procState as 1 entry because C wants a fixed-size array,
* but actually it is maxBackends entries long.
*/
ProcState procState[1]; /* reflects the invalidation state */
- /*
- * The entry section begins after the end of the procState array.
- * Everything there is controlled by offsets.
- */
} SISeg;
-typedef struct SharedInvalidData
-{
- int cacheId; /* XXX */
- Index hashIndex;
- ItemPointerData pointerData;
-} SharedInvalidData;
-
-typedef SharedInvalidData *SharedInvalid;
-
-
-typedef struct SISegEntry
-{
- SharedInvalidData entryData;/* the message data */
- bool isfree; /* entry free? */
- Offset next; /* offset to next entry */
-} SISegEntry;
-
-typedef struct SISegOffsets
-{
- Offset startSegment; /* always 0 (for now) */
- Offset offsetToFirstEntry; /* A + a = B */
- Offset offsetToEndOfSegment; /* A + a + b */
-} SISegOffsets;
-
-
-/****************************************************************************/
-/* synchronization of the shared buffer access */
-/* access to the buffer is synchronized by the lock manager !! */
-/****************************************************************************/
-#define SI_LockStartValue 255
-#define SI_SharedLock (-1)
-#define SI_ExclusiveLock (-255)
+extern SISeg *shmInvalBuffer; /* pointer to the shared buffer segment,
+ * set by SISegmentAttach()
+ */
-extern SISeg *shmInvalBuffer;
/*
* prototypes for functions in sinvaladt.c
*/
-extern int SIBackendInit(SISeg *segInOutP);
-extern int SISegmentInit(bool killExistingSegment, IPCKey key,
+extern int SISegmentInit(bool createNewSegment, IPCKey key,
int maxBackends);
+extern int SIBackendInit(SISeg *segP);
-extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data);
-extern void SISetProcStateInvalid(SISeg *segP);
+extern bool SIInsertDataEntry(SISeg *segP, SharedInvalidData *data);
extern int SIGetDataEntry(SISeg *segP, int backendId,
SharedInvalidData *data);
-extern bool SIDelDataEntries(SISeg *segP, int n);
extern void SIDelExpiredDataEntries(SISeg *segP);
#endif /* SINVALADT_H */