aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c328
1 files changed, 166 insertions, 162 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c7b60de32a9..11c84e7f3c8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.154 2010/02/20 21:24:02 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.155 2010/02/26 02:00:37 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -149,7 +149,7 @@
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
- * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
+ * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
@@ -158,11 +158,11 @@
*/
typedef struct AsyncQueueEntry
{
- int length; /* total allocated length of entry */
- Oid dboid; /* sender's database OID */
- TransactionId xid; /* sender's XID */
- int32 srcPid; /* sender's PID */
- char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
+ int length; /* total allocated length of entry */
+ Oid dboid; /* sender's database OID */
+ TransactionId xid; /* sender's XID */
+ int32 srcPid; /* sender's PID */
+ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
/* Currently, no field of AsyncQueueEntry requires more than int alignment */
@@ -175,8 +175,8 @@ typedef struct AsyncQueueEntry
*/
typedef struct QueuePosition
{
- int page; /* SLRU page number */
- int offset; /* byte offset within page */
+ int page; /* SLRU page number */
+ int offset; /* byte offset within page */
} QueuePosition;
#define QUEUE_POS_PAGE(x) ((x).page)
@@ -202,11 +202,11 @@ typedef struct QueuePosition
*/
typedef struct QueueBackendStatus
{
- int32 pid; /* either a PID or InvalidPid */
- QueuePosition pos; /* backend has read queue up to here */
+ int32 pid; /* either a PID or InvalidPid */
+ QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
-#define InvalidPid (-1)
+#define InvalidPid (-1)
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
@@ -230,15 +230,15 @@ typedef struct QueueBackendStatus
*/
typedef struct AsyncQueueControl
{
- QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the
- tail of the "slowest" backend */
- TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
- QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
+ QueuePosition head; /* head points to the next free location */
+ QueuePosition tail; /* the global tail is equivalent to the tail
+ * of the "slowest" backend */
+ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
} AsyncQueueControl;
-static AsyncQueueControl *asyncQueueControl;
+static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
@@ -248,11 +248,11 @@ static AsyncQueueControl *asyncQueueControl;
/*
* The SLRU buffer area through which we access the notification queue
*/
-static SlruCtlData AsyncCtlData;
+static SlruCtlData AsyncCtlData;
#define AsyncCtl (&AsyncCtlData)
#define QUEUE_PAGESIZE BLCKSZ
-#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
+#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
* slru.c currently assumes that all filenames are four characters of hex
@@ -265,7 +265,7 @@ static SlruCtlData AsyncCtlData;
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
- * was a wraparound condition. With the default BLCKSZ this means there
+ * was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
@@ -309,7 +309,7 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
+ * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
* until and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction.
*
@@ -325,11 +325,11 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
*/
typedef struct Notification
{
- char *channel; /* channel name */
- char *payload; /* payload string (can be empty) */
+ char *channel; /* channel name */
+ char *payload; /* payload string (can be empty) */
} Notification;
-static List *pendingNotifies = NIL; /* list of Notifications */
+static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
@@ -348,8 +348,10 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
+
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+
/* has this backend executed its first LISTEN in the current transaction? */
static bool backendHasExecutedInitialListen = false;
@@ -380,8 +382,8 @@ static bool asyncQueueProcessPageEntries(QueuePosition *current,
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
- const char *payload,
- int32 srcPid);
+ const char *payload,
+ int32 srcPid);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
@@ -408,17 +410,17 @@ asyncQueuePagePrecedesLogically(int p, int q)
int diff;
/*
- * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should
- * be in the range 0..QUEUE_MAX_PAGE.
+ * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
+ * in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
diff = p - q;
- if (diff >= ((QUEUE_MAX_PAGE+1)/2))
- diff -= QUEUE_MAX_PAGE+1;
- else if (diff < -((QUEUE_MAX_PAGE+1)/2))
- diff += QUEUE_MAX_PAGE+1;
+ if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
+ diff -= QUEUE_MAX_PAGE + 1;
+ else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
+ diff += QUEUE_MAX_PAGE + 1;
return diff < 0;
}
@@ -428,7 +430,7 @@ asyncQueuePagePrecedesLogically(int p, int q)
Size
AsyncShmemSize(void)
{
- Size size;
+ Size size;
/* This had better match AsyncShmemInit */
size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
@@ -445,9 +447,9 @@ AsyncShmemSize(void)
void
AsyncShmemInit(void)
{
- bool found;
- int slotno;
- Size size;
+ bool found;
+ int slotno;
+ Size size;
/*
* Create or attach to the AsyncQueueControl structure.
@@ -468,7 +470,7 @@ AsyncShmemInit(void)
if (!found)
{
/* First time through, so initialize it */
- int i;
+ int i;
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
@@ -598,8 +600,8 @@ Async_Notify(const char *channel, const char *payload)
n->payload = "";
/*
- * We want to preserve the order so we need to append every
- * notification. See comments at AsyncExistsPendingNotify().
+ * We want to preserve the order so we need to append every notification.
+ * See comments at AsyncExistsPendingNotify().
*/
pendingNotifies = lappend(pendingNotifies, n);
@@ -698,13 +700,13 @@ Async_UnlistenAll(void)
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
- FuncCallContext *funcctx;
- ListCell **lcp;
+ FuncCallContext *funcctx;
+ ListCell **lcp;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
- MemoryContext oldcontext;
+ MemoryContext oldcontext;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
@@ -726,7 +728,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
while (*lcp != NULL)
{
- char *channel = (char *) lfirst(*lcp);
+ char *channel = (char *) lfirst(*lcp);
*lcp = lnext(*lcp);
SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
@@ -818,9 +820,9 @@ PreCommit_Notify(void)
/*
* Make sure that we have an XID assigned to the current transaction.
- * GetCurrentTransactionId is cheap if we already have an XID, but
- * not so cheap if we don't, and we'd prefer not to do that work
- * while holding AsyncQueueLock.
+ * GetCurrentTransactionId is cheap if we already have an XID, but not
+ * so cheap if we don't, and we'd prefer not to do that work while
+ * holding AsyncQueueLock.
*/
(void) GetCurrentTransactionId();
@@ -850,7 +852,7 @@ PreCommit_Notify(void)
while (nextNotify != NULL)
{
/*
- * Add the pending notifications to the queue. We acquire and
+ * Add the pending notifications to the queue. We acquire and
* release AsyncQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
@@ -866,7 +868,7 @@ PreCommit_Notify(void)
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("too many notifications in the NOTIFY queue")));
+ errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
LWLockRelease(AsyncQueueLock);
}
@@ -915,8 +917,8 @@ AtCommit_Notify(void)
}
/*
- * If we did an initial LISTEN, listenChannels now has the entry, so
- * we no longer need or want the flag to be set.
+ * If we did an initial LISTEN, listenChannels now has the entry, so we no
+ * longer need or want the flag to be set.
*/
backendHasExecutedInitialListen = false;
@@ -943,15 +945,15 @@ Exec_ListenPreCommit(void)
elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
/*
- * We need this variable to detect an aborted initial LISTEN.
- * In that case we would set up our pointer but not listen on any channel.
- * This flag gets cleared in AtCommit_Notify or AtAbort_Notify().
+ * We need this variable to detect an aborted initial LISTEN. In that case
+ * we would set up our pointer but not listen on any channel. This flag
+ * gets cleared in AtCommit_Notify or AtAbort_Notify().
*/
backendHasExecutedInitialListen = true;
/*
- * Before registering, make sure we will unlisten before dying.
- * (Note: this action does not get undone if we abort later.)
+ * Before registering, make sure we will unlisten before dying. (Note:
+ * this action does not get undone if we abort later.)
*/
if (!unlistenExitRegistered)
{
@@ -977,8 +979,8 @@ Exec_ListenPreCommit(void)
* already-committed notifications. Still, we could get notifications that
* have already committed before we started to LISTEN.
*
- * Note that we are not yet listening on anything, so we won't deliver
- * any notification to the frontend.
+ * Note that we are not yet listening on anything, so we won't deliver any
+ * notification to the frontend.
*
* This will also advance the global tail pointer if possible.
*/
@@ -1020,8 +1022,8 @@ Exec_ListenCommit(const char *channel)
static void
Exec_UnlistenCommit(const char *channel)
{
- ListCell *q;
- ListCell *prev;
+ ListCell *q;
+ ListCell *prev;
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
@@ -1029,7 +1031,7 @@ Exec_UnlistenCommit(const char *channel)
prev = NULL;
foreach(q, listenChannels)
{
- char *lchan = (char *) lfirst(q);
+ char *lchan = (char *) lfirst(q);
if (strcmp(lchan, channel) == 0)
{
@@ -1078,12 +1080,12 @@ Exec_UnlistenAllCommit(void)
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
* Note that we send signals and process the queue even if the transaction
- * eventually aborted. This is because we need to clean out whatever got
+ * eventually aborted. This is because we need to clean out whatever got
* added to the queue.
*
* NOTE: we are outside of any transaction here.
@@ -1098,9 +1100,9 @@ ProcessCompletedNotifies(void)
return;
/*
- * We reset the flag immediately; otherwise, if any sort of error
- * occurs below, we'd be locked up in an infinite loop, because
- * control will come right back here after error cleanup.
+ * We reset the flag immediately; otherwise, if any sort of error occurs
+ * below, we'd be locked up in an infinite loop, because control will come
+ * right back here after error cleanup.
*/
backendHasSentNotifications = false;
@@ -1108,8 +1110,8 @@ ProcessCompletedNotifies(void)
elog(DEBUG1, "ProcessCompletedNotifies");
/*
- * We must run asyncQueueReadAllNotifications inside a transaction,
- * else bad things happen if it gets an error.
+ * We must run asyncQueueReadAllNotifications inside a transaction, else
+ * bad things happen if it gets an error.
*/
StartTransactionCommand();
@@ -1125,11 +1127,11 @@ ProcessCompletedNotifies(void)
{
/*
* If we found no other listening backends, and we aren't listening
- * ourselves, then we must execute asyncQueueAdvanceTail to flush
- * the queue, because ain't nobody else gonna do it. This prevents
- * queue overflow when we're sending useless notifies to nobody.
- * (A new listener could have joined since we looked, but if so this
- * is harmless.)
+ * ourselves, then we must execute asyncQueueAdvanceTail to flush the
+ * queue, because ain't nobody else gonna do it. This prevents queue
+ * overflow when we're sending useless notifies to nobody. (A new
+ * listener could have joined since we looked, but if so this is
+ * harmless.)
*/
asyncQueueAdvanceTail();
}
@@ -1164,14 +1166,14 @@ IsListeningOn(const char *channel)
/*
* Remove our entry from the listeners array when we are no longer listening
- * on any channel. NB: must not fail if we're already not listening.
+ * on any channel. NB: must not fail if we're already not listening.
*/
static void
asyncQueueUnregister(void)
{
- bool advanceTail;
+ bool advanceTail;
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(listenChannels == NIL); /* else caller error */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* check if entry is valid and oldest ... */
@@ -1200,7 +1202,7 @@ asyncQueueIsFull(void)
/*
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
- * pointer would wrap around compared to the tail. We cannot create such
+ * pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (compare the truncation logic in
* asyncQueueAdvanceTail).
@@ -1219,15 +1221,15 @@ asyncQueueIsFull(void)
/*
* Advance the QueuePosition to the next entry, assuming that the current
- * entry is of length entryLength. If we jump to a new page the function
+ * entry is of length entryLength. If we jump to a new page the function
* returns true, else false.
*/
static bool
asyncQueueAdvance(QueuePosition *position, int entryLength)
{
- int pageno = QUEUE_POS_PAGE(*position);
- int offset = QUEUE_POS_OFFSET(*position);
- bool pageJump = false;
+ int pageno = QUEUE_POS_PAGE(*position);
+ int offset = QUEUE_POS_OFFSET(*position);
+ bool pageJump = false;
/*
* Move to the next writing position: First jump over what we have just
@@ -1245,7 +1247,7 @@ asyncQueueAdvance(QueuePosition *position, int entryLength)
{
pageno++;
if (pageno > QUEUE_MAX_PAGE)
- pageno = 0; /* wrap around */
+ pageno = 0; /* wrap around */
offset = 0;
pageJump = true;
}
@@ -1260,9 +1262,9 @@ asyncQueueAdvance(QueuePosition *position, int entryLength)
static void
asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
{
- size_t channellen = strlen(n->channel);
- size_t payloadlen = strlen(n->payload);
- int entryLength;
+ size_t channellen = strlen(n->channel);
+ size_t payloadlen = strlen(n->payload);
+ int entryLength;
Assert(channellen < NAMEDATALEN);
Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
@@ -1288,7 +1290,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* the last byte which simplifies reading the page later.
*
* We are passed the list cell containing the next notification to write
- * and return the first still-unwritten cell back. Eventually we will return
+ * and return the first still-unwritten cell back. Eventually we will return
* NULL indicating all is done.
*
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
@@ -1297,10 +1299,10 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
- AsyncQueueEntry qe;
- int pageno;
- int offset;
- int slotno;
+ AsyncQueueEntry qe;
+ int pageno;
+ int offset;
+ int slotno;
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
@@ -1313,7 +1315,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
while (nextNotify != NULL)
{
- Notification *n = (Notification *) lfirst(nextNotify);
+ Notification *n = (Notification *) lfirst(nextNotify);
/* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe);
@@ -1335,8 +1337,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
*/
qe.length = QUEUE_PAGESIZE - offset;
qe.dboid = InvalidOid;
- qe.data[0] = '\0'; /* empty channel */
- qe.data[1] = '\0'; /* empty payload */
+ qe.data[0] = '\0'; /* empty channel */
+ qe.data[1] = '\0'; /* empty payload */
}
/* Now copy qe into the shared buffer page */
@@ -1348,12 +1350,12 @@ asyncQueueAddEntries(ListCell *nextNotify)
if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
{
/*
- * Page is full, so we're done here, but first fill the next
- * page with zeroes. The reason to do this is to ensure that
- * slru.c's idea of the head page is always the same as ours,
- * which avoids boundary problems in SimpleLruTruncate. The
- * test in asyncQueueIsFull() ensured that there is room to
- * create this page without overrunning the queue.
+ * Page is full, so we're done here, but first fill the next page
+ * with zeroes. The reason to do this is to ensure that slru.c's
+ * idea of the head page is always the same as ours, which avoids
+ * boundary problems in SimpleLruTruncate. The test in
+ * asyncQueueIsFull() ensured that there is room to create this
+ * page without overrunning the queue.
*/
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
/* And exit the loop */
@@ -1377,24 +1379,24 @@ asyncQueueAddEntries(ListCell *nextNotify)
static void
asyncQueueFillWarning(void)
{
- int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
- int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
- int occupied;
- double fillDegree;
- TimestampTz t;
+ int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
+ int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ int occupied;
+ double fillDegree;
+ TimestampTz t;
occupied = headPage - tailPage;
if (occupied == 0)
return; /* fast exit for common case */
-
+
if (occupied < 0)
{
/* head has wrapped around, tail not yet */
- occupied += QUEUE_MAX_PAGE+1;
+ occupied += QUEUE_MAX_PAGE + 1;
}
- fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2);
+ fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
if (fillDegree < 0.5)
return;
@@ -1404,9 +1406,9 @@ asyncQueueFillWarning(void)
if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
t, QUEUE_FULL_WARN_INTERVAL))
{
- QueuePosition min = QUEUE_HEAD;
- int32 minPid = InvalidPid;
- int i;
+ QueuePosition min = QUEUE_HEAD;
+ int32 minPid = InvalidPid;
+ int i;
for (i = 1; i <= MaxBackends; i++)
{
@@ -1455,13 +1457,13 @@ SignalBackends(void)
int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date.
- * We don't want to send signals while holding the AsyncQueueLock, so
- * we just build a list of target PIDs.
+ * Identify all backends that are listening and not already up-to-date. We
+ * don't want to send signals while holding the AsyncQueueLock, so we just
+ * build a list of target PIDs.
*
- * XXX in principle these pallocs could fail, which would be bad.
- * Maybe preallocate the arrays? But in practice this is only run
- * in trivial transactions, so there should surely be space available.
+ * XXX in principle these pallocs could fail, which would be bad. Maybe
+ * preallocate the arrays? But in practice this is only run in trivial
+ * transactions, so there should surely be space available.
*/
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
@@ -1493,8 +1495,8 @@ SignalBackends(void)
/*
* Note: assuming things aren't broken, a signal failure here could
* only occur if the target backend exited since we released
- * AsyncQueueLock; which is unlikely but certainly possible.
- * So we just log a low-level debug message if it happens.
+ * AsyncQueueLock; which is unlikely but certainly possible. So we
+ * just log a low-level debug message if it happens.
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
@@ -1521,8 +1523,8 @@ AtAbort_Notify(void)
{
/*
* If we LISTEN but then roll back the transaction we have set our pointer
- * but have not made any entry in listenChannels. In that case, remove
- * our pointer again.
+ * but have not made any entry in listenChannels. In that case, remove our
+ * pointer again.
*/
if (backendHasExecutedInitialListen)
{
@@ -1778,7 +1780,7 @@ EnableNotifyInterrupt(void)
* is disabled until the next EnableNotifyInterrupt call.
*
* The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
- * so as to prevent conflicts if one signal interrupts the other. So we
+ * so as to prevent conflicts if one signal interrupts the other. So we
* must return the previous state of the flag.
*/
bool
@@ -1799,15 +1801,17 @@ DisableNotifyInterrupt(void)
static void
asyncQueueReadAllNotifications(void)
{
- QueuePosition pos;
- QueuePosition oldpos;
- QueuePosition head;
+ QueuePosition pos;
+ QueuePosition oldpos;
+ QueuePosition head;
bool advanceTail;
+
/* page_buffer must be adequately aligned, so use a union */
- union {
+ union
+ {
char buf[QUEUE_PAGESIZE];
AsyncQueueEntry align;
- } page_buffer;
+ } page_buffer;
/* Fetch current state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
@@ -1829,16 +1833,16 @@ asyncQueueReadAllNotifications(void)
* Especially we do not take into account different commit times.
* Consider the following example:
*
- * Backend 1: Backend 2:
+ * Backend 1: Backend 2:
*
* transaction starts
* NOTIFY foo;
* commit starts
- * transaction starts
- * LISTEN foo;
- * commit starts
+ * transaction starts
+ * LISTEN foo;
+ * commit starts
* commit to clog
- * commit to clog
+ * commit to clog
*
* It could happen that backend 2 sees the notification from backend 1 in
* the queue. Even though the notifying transaction committed before
@@ -1861,7 +1865,7 @@ asyncQueueReadAllNotifications(void)
{
bool reachedStop;
- do
+ do
{
int curpage = QUEUE_POS_PAGE(pos);
int curoffset = QUEUE_POS_OFFSET(pos);
@@ -1871,7 +1875,7 @@ asyncQueueReadAllNotifications(void)
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the AsyncCtlLock while we are examining the entries and
- * possibly transmitting them to our frontend. Copy only the part
+ * possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
@@ -1881,7 +1885,7 @@ asyncQueueReadAllNotifications(void)
/* we only want to read as far as head */
copysize = QUEUE_POS_OFFSET(head) - curoffset;
if (copysize < 0)
- copysize = 0; /* just for safety */
+ copysize = 0; /* just for safety */
}
else
{
@@ -1899,9 +1903,9 @@ asyncQueueReadAllNotifications(void)
* uncommitted message.
*
* Our stop position is what we found to be the head's position
- * when we entered this function. It might have changed
- * already. But if it has, we will receive (or have already
- * received and queued) another signal and come here again.
+ * when we entered this function. It might have changed already.
+ * But if it has, we will receive (or have already received and
+ * queued) another signal and come here again.
*
* We are not holding AsyncQueueLock here! The queue can only
* extend beyond the head pointer (see above) and we leave our
@@ -1945,7 +1949,7 @@ asyncQueueReadAllNotifications(void)
* and deliver relevant ones to my frontend.
*
* The current page must have been fetched into page_buffer from shared
- * memory. (We could access the page right in shared memory, but that
+ * memory. (We could access the page right in shared memory, but that
* would imply holding the AsyncCtlLock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
@@ -1963,11 +1967,11 @@ asyncQueueProcessPageEntries(QueuePosition *current,
{
bool reachedStop = false;
bool reachedEndOfPage;
- AsyncQueueEntry *qe;
+ AsyncQueueEntry *qe;
do
{
- QueuePosition thisentry = *current;
+ QueuePosition thisentry = *current;
if (QUEUE_POS_EQUAL(thisentry, stop))
break;
@@ -1975,9 +1979,9 @@ asyncQueueProcessPageEntries(QueuePosition *current,
qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
/*
- * Advance *current over this message, possibly to the next page.
- * As noted in the comments for asyncQueueReadAllNotifications, we
- * must do this before possibly failing while processing the message.
+ * Advance *current over this message, possibly to the next page. As
+ * noted in the comments for asyncQueueReadAllNotifications, we must
+ * do this before possibly failing while processing the message.
*/
reachedEndOfPage = asyncQueueAdvance(current, qe->length);
@@ -1987,12 +1991,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
if (TransactionIdDidCommit(qe->xid))
{
/* qe->data is the null-terminated channel name */
- char *channel = qe->data;
+ char *channel = qe->data;
if (IsListeningOn(channel))
{
/* payload follows channel name */
- char *payload = qe->data + strlen(channel) + 1;
+ char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
}
@@ -2008,12 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
{
/*
* The transaction has neither committed nor aborted so far,
- * so we can't process its message yet. Break out of the loop,
- * but first back up *current so we will reprocess the message
- * next time. (Note: it is unlikely but not impossible for
- * TransactionIdDidCommit to fail, so we can't really avoid
- * this advance-then-back-up behavior when dealing with an
- * uncommitted message.)
+ * so we can't process its message yet. Break out of the
+ * loop, but first back up *current so we will reprocess the
+ * message next time. (Note: it is unlikely but not
+ * impossible for TransactionIdDidCommit to fail, so we can't
+ * really avoid this advance-then-back-up behavior when
+ * dealing with an uncommitted message.)
*/
*current = thisentry;
reachedStop = true;
@@ -2037,11 +2041,11 @@ asyncQueueProcessPageEntries(QueuePosition *current,
static void
asyncQueueAdvanceTail(void)
{
- QueuePosition min;
- int i;
- int oldtailpage;
- int newtailpage;
- int boundary;
+ QueuePosition min;
+ int i;
+ int oldtailpage;
+ int newtailpage;
+ int boundary;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
@@ -2058,16 +2062,16 @@ asyncQueueAdvanceTail(void)
* We can truncate something if the global tail advanced across an SLRU
* segment boundary.
*
- * XXX it might be better to truncate only once every several segments,
- * to reduce the number of directory scans.
+ * XXX it might be better to truncate only once every several segments, to
+ * reduce the number of directory scans.
*/
newtailpage = QUEUE_POS_PAGE(min);
boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
{
/*
- * SimpleLruTruncate() will ask for AsyncCtlLock but will also
- * release the lock again.
+ * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
+ * the lock again.
*/
SimpleLruTruncate(AsyncCtl, newtailpage);
}
@@ -2104,8 +2108,8 @@ ProcessIncomingNotify(void)
notifyInterruptOccurred = 0;
/*
- * We must run asyncQueueReadAllNotifications inside a transaction,
- * else bad things happen if it gets an error.
+ * We must run asyncQueueReadAllNotifications inside a transaction, else
+ * bad things happen if it gets an error.
*/
StartTransactionCommand();