diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 328 |
1 files changed, 166 insertions, 162 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c7b60de32a9..11c84e7f3c8 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.154 2010/02/20 21:24:02 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.155 2010/02/26 02:00:37 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -149,7 +149,7 @@ * * This struct declaration has the maximal length, but in a real queue entry * the data area is only big enough for the actual channel and payload strings - * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible + * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible * entry size, if both channel and payload strings are empty (but note it * doesn't include alignment padding). * @@ -158,11 +158,11 @@ */ typedef struct AsyncQueueEntry { - int length; /* total allocated length of entry */ - Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ - int32 srcPid; /* sender's PID */ - char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; + int length; /* total allocated length of entry */ + Oid dboid; /* sender's database OID */ + TransactionId xid; /* sender's XID */ + int32 srcPid; /* sender's PID */ + char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; } AsyncQueueEntry; /* Currently, no field of AsyncQueueEntry requires more than int alignment */ @@ -175,8 +175,8 @@ typedef struct AsyncQueueEntry */ typedef struct QueuePosition { - int page; /* SLRU page number */ - int offset; /* byte offset within page */ + int page; /* SLRU page number */ + int offset; /* byte offset within page */ } QueuePosition; #define QUEUE_POS_PAGE(x) ((x).page) @@ -202,11 +202,11 @@ typedef struct QueuePosition */ typedef struct QueueBackendStatus { - int32 pid; /* either a PID or InvalidPid */ - QueuePosition pos; /* backend has read queue up to here */ + int32 pid; /* either a PID or InvalidPid */ + QueuePosition pos; /* backend has read queue up to here */ } QueueBackendStatus; -#define InvalidPid (-1) +#define InvalidPid (-1) /* * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) @@ -230,15 +230,15 @@ typedef struct QueueBackendStatus */ typedef struct AsyncQueueControl { - QueuePosition head; /* head points to the next free location */ - QueuePosition tail; /* the global tail is equivalent to the - tail of the "slowest" backend */ - TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ - QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */ + QueuePosition head; /* head points to the next free location */ + QueuePosition tail; /* the global tail is equivalent to the tail + * of the "slowest" backend */ + TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ + QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */ /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */ } AsyncQueueControl; -static AsyncQueueControl *asyncQueueControl; +static AsyncQueueControl *asyncQueueControl; #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) @@ -248,11 +248,11 @@ static AsyncQueueControl *asyncQueueControl; /* * The SLRU buffer area through which we access the notification queue */ -static SlruCtlData AsyncCtlData; +static SlruCtlData AsyncCtlData; #define AsyncCtl (&AsyncCtlData) #define QUEUE_PAGESIZE BLCKSZ -#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ +#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ /* * slru.c currently assumes that all filenames are four characters of hex @@ -265,7 +265,7 @@ static SlruCtlData AsyncCtlData; * * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 * pages, because more than that would confuse slru.c into thinking there - * was a wraparound condition. With the default BLCKSZ this means there + * was a wraparound condition. With the default BLCKSZ this means there * can be up to 8GB of queued-and-not-read data. * * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of @@ -309,7 +309,7 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* * State for outbound notifies consists of a list of all channels+payloads - * NOTIFYed in the current transaction. We do not actually perform a NOTIFY + * NOTIFYed in the current transaction. We do not actually perform a NOTIFY * until and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. * @@ -325,11 +325,11 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ */ typedef struct Notification { - char *channel; /* channel name */ - char *payload; /* payload string (can be empty) */ + char *channel; /* channel name */ + char *payload; /* payload string (can be empty) */ } Notification; -static List *pendingNotifies = NIL; /* list of Notifications */ +static List *pendingNotifies = NIL; /* list of Notifications */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ @@ -348,8 +348,10 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; + /* has this backend sent notifications in the current transaction? */ static bool backendHasSentNotifications = false; + /* has this backend executed its first LISTEN in the current transaction? */ static bool backendHasExecutedInitialListen = false; @@ -380,8 +382,8 @@ static bool asyncQueueProcessPageEntries(QueuePosition *current, static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(const char *channel, - const char *payload, - int32 srcPid); + const char *payload, + int32 srcPid); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); @@ -408,17 +410,17 @@ asyncQueuePagePrecedesLogically(int p, int q) int diff; /* - * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should - * be in the range 0..QUEUE_MAX_PAGE. + * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be + * in the range 0..QUEUE_MAX_PAGE. */ Assert(p >= 0 && p <= QUEUE_MAX_PAGE); Assert(q >= 0 && q <= QUEUE_MAX_PAGE); diff = p - q; - if (diff >= ((QUEUE_MAX_PAGE+1)/2)) - diff -= QUEUE_MAX_PAGE+1; - else if (diff < -((QUEUE_MAX_PAGE+1)/2)) - diff += QUEUE_MAX_PAGE+1; + if (diff >= ((QUEUE_MAX_PAGE + 1) / 2)) + diff -= QUEUE_MAX_PAGE + 1; + else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) + diff += QUEUE_MAX_PAGE + 1; return diff < 0; } @@ -428,7 +430,7 @@ asyncQueuePagePrecedesLogically(int p, int q) Size AsyncShmemSize(void) { - Size size; + Size size; /* This had better match AsyncShmemInit */ size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); @@ -445,9 +447,9 @@ AsyncShmemSize(void) void AsyncShmemInit(void) { - bool found; - int slotno; - Size size; + bool found; + int slotno; + Size size; /* * Create or attach to the AsyncQueueControl structure. @@ -468,7 +470,7 @@ AsyncShmemInit(void) if (!found) { /* First time through, so initialize it */ - int i; + int i; SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); @@ -598,8 +600,8 @@ Async_Notify(const char *channel, const char *payload) n->payload = ""; /* - * We want to preserve the order so we need to append every - * notification. See comments at AsyncExistsPendingNotify(). + * We want to preserve the order so we need to append every notification. + * See comments at AsyncExistsPendingNotify(). */ pendingNotifies = lappend(pendingNotifies, n); @@ -698,13 +700,13 @@ Async_UnlistenAll(void) Datum pg_listening_channels(PG_FUNCTION_ARGS) { - FuncCallContext *funcctx; - ListCell **lcp; + FuncCallContext *funcctx; + ListCell **lcp; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { - MemoryContext oldcontext; + MemoryContext oldcontext; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -726,7 +728,7 @@ pg_listening_channels(PG_FUNCTION_ARGS) while (*lcp != NULL) { - char *channel = (char *) lfirst(*lcp); + char *channel = (char *) lfirst(*lcp); *lcp = lnext(*lcp); SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); @@ -818,9 +820,9 @@ PreCommit_Notify(void) /* * Make sure that we have an XID assigned to the current transaction. - * GetCurrentTransactionId is cheap if we already have an XID, but - * not so cheap if we don't, and we'd prefer not to do that work - * while holding AsyncQueueLock. + * GetCurrentTransactionId is cheap if we already have an XID, but not + * so cheap if we don't, and we'd prefer not to do that work while + * holding AsyncQueueLock. */ (void) GetCurrentTransactionId(); @@ -850,7 +852,7 @@ PreCommit_Notify(void) while (nextNotify != NULL) { /* - * Add the pending notifications to the queue. We acquire and + * Add the pending notifications to the queue. We acquire and * release AsyncQueueLock once per page, which might be overkill * but it does allow readers to get in while we're doing this. * @@ -866,7 +868,7 @@ PreCommit_Notify(void) if (asyncQueueIsFull()) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("too many notifications in the NOTIFY queue"))); + errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); LWLockRelease(AsyncQueueLock); } @@ -915,8 +917,8 @@ AtCommit_Notify(void) } /* - * If we did an initial LISTEN, listenChannels now has the entry, so - * we no longer need or want the flag to be set. + * If we did an initial LISTEN, listenChannels now has the entry, so we no + * longer need or want the flag to be set. */ backendHasExecutedInitialListen = false; @@ -943,15 +945,15 @@ Exec_ListenPreCommit(void) elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); /* - * We need this variable to detect an aborted initial LISTEN. - * In that case we would set up our pointer but not listen on any channel. - * This flag gets cleared in AtCommit_Notify or AtAbort_Notify(). + * We need this variable to detect an aborted initial LISTEN. In that case + * we would set up our pointer but not listen on any channel. This flag + * gets cleared in AtCommit_Notify or AtAbort_Notify(). */ backendHasExecutedInitialListen = true; /* - * Before registering, make sure we will unlisten before dying. - * (Note: this action does not get undone if we abort later.) + * Before registering, make sure we will unlisten before dying. (Note: + * this action does not get undone if we abort later.) */ if (!unlistenExitRegistered) { @@ -977,8 +979,8 @@ Exec_ListenPreCommit(void) * already-committed notifications. Still, we could get notifications that * have already committed before we started to LISTEN. * - * Note that we are not yet listening on anything, so we won't deliver - * any notification to the frontend. + * Note that we are not yet listening on anything, so we won't deliver any + * notification to the frontend. * * This will also advance the global tail pointer if possible. */ @@ -1020,8 +1022,8 @@ Exec_ListenCommit(const char *channel) static void Exec_UnlistenCommit(const char *channel) { - ListCell *q; - ListCell *prev; + ListCell *q; + ListCell *prev; if (Trace_notify) elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); @@ -1029,7 +1031,7 @@ Exec_UnlistenCommit(const char *channel) prev = NULL; foreach(q, listenChannels) { - char *lchan = (char *) lfirst(q); + char *lchan = (char *) lfirst(q); if (strcmp(lchan, channel) == 0) { @@ -1078,12 +1080,12 @@ Exec_UnlistenAllCommit(void) * The reason that this is not done in AtCommit_Notify is that there is * a nonzero chance of errors here (for example, encoding conversion errors * while trying to format messages to our frontend). An error during - * AtCommit_Notify would be a PANIC condition. The timing is also arranged + * AtCommit_Notify would be a PANIC condition. The timing is also arranged * to ensure that a transaction's self-notifies are delivered to the frontend * before it gets the terminating ReadyForQuery message. * * Note that we send signals and process the queue even if the transaction - * eventually aborted. This is because we need to clean out whatever got + * eventually aborted. This is because we need to clean out whatever got * added to the queue. * * NOTE: we are outside of any transaction here. @@ -1098,9 +1100,9 @@ ProcessCompletedNotifies(void) return; /* - * We reset the flag immediately; otherwise, if any sort of error - * occurs below, we'd be locked up in an infinite loop, because - * control will come right back here after error cleanup. + * We reset the flag immediately; otherwise, if any sort of error occurs + * below, we'd be locked up in an infinite loop, because control will come + * right back here after error cleanup. */ backendHasSentNotifications = false; @@ -1108,8 +1110,8 @@ ProcessCompletedNotifies(void) elog(DEBUG1, "ProcessCompletedNotifies"); /* - * We must run asyncQueueReadAllNotifications inside a transaction, - * else bad things happen if it gets an error. + * We must run asyncQueueReadAllNotifications inside a transaction, else + * bad things happen if it gets an error. */ StartTransactionCommand(); @@ -1125,11 +1127,11 @@ ProcessCompletedNotifies(void) { /* * If we found no other listening backends, and we aren't listening - * ourselves, then we must execute asyncQueueAdvanceTail to flush - * the queue, because ain't nobody else gonna do it. This prevents - * queue overflow when we're sending useless notifies to nobody. - * (A new listener could have joined since we looked, but if so this - * is harmless.) + * ourselves, then we must execute asyncQueueAdvanceTail to flush the + * queue, because ain't nobody else gonna do it. This prevents queue + * overflow when we're sending useless notifies to nobody. (A new + * listener could have joined since we looked, but if so this is + * harmless.) */ asyncQueueAdvanceTail(); } @@ -1164,14 +1166,14 @@ IsListeningOn(const char *channel) /* * Remove our entry from the listeners array when we are no longer listening - * on any channel. NB: must not fail if we're already not listening. + * on any channel. NB: must not fail if we're already not listening. */ static void asyncQueueUnregister(void) { - bool advanceTail; + bool advanceTail; - Assert(listenChannels == NIL); /* else caller error */ + Assert(listenChannels == NIL); /* else caller error */ LWLockAcquire(AsyncQueueLock, LW_SHARED); /* check if entry is valid and oldest ... */ @@ -1200,7 +1202,7 @@ asyncQueueIsFull(void) /* * The queue is full if creating a new head page would create a page that * logically precedes the current global tail pointer, ie, the head - * pointer would wrap around compared to the tail. We cannot create such + * pointer would wrap around compared to the tail. We cannot create such * a head page for fear of confusing slru.c. For safety we round the tail * pointer back to a segment boundary (compare the truncation logic in * asyncQueueAdvanceTail). @@ -1219,15 +1221,15 @@ asyncQueueIsFull(void) /* * Advance the QueuePosition to the next entry, assuming that the current - * entry is of length entryLength. If we jump to a new page the function + * entry is of length entryLength. If we jump to a new page the function * returns true, else false. */ static bool asyncQueueAdvance(QueuePosition *position, int entryLength) { - int pageno = QUEUE_POS_PAGE(*position); - int offset = QUEUE_POS_OFFSET(*position); - bool pageJump = false; + int pageno = QUEUE_POS_PAGE(*position); + int offset = QUEUE_POS_OFFSET(*position); + bool pageJump = false; /* * Move to the next writing position: First jump over what we have just @@ -1245,7 +1247,7 @@ asyncQueueAdvance(QueuePosition *position, int entryLength) { pageno++; if (pageno > QUEUE_MAX_PAGE) - pageno = 0; /* wrap around */ + pageno = 0; /* wrap around */ offset = 0; pageJump = true; } @@ -1260,9 +1262,9 @@ asyncQueueAdvance(QueuePosition *position, int entryLength) static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) { - size_t channellen = strlen(n->channel); - size_t payloadlen = strlen(n->payload); - int entryLength; + size_t channellen = strlen(n->channel); + size_t payloadlen = strlen(n->payload); + int entryLength; Assert(channellen < NAMEDATALEN); Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); @@ -1288,7 +1290,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * the last byte which simplifies reading the page later. * * We are passed the list cell containing the next notification to write - * and return the first still-unwritten cell back. Eventually we will return + * and return the first still-unwritten cell back. Eventually we will return * NULL indicating all is done. * * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock @@ -1297,10 +1299,10 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) static ListCell * asyncQueueAddEntries(ListCell *nextNotify) { - AsyncQueueEntry qe; - int pageno; - int offset; - int slotno; + AsyncQueueEntry qe; + int pageno; + int offset; + int slotno; /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); @@ -1313,7 +1315,7 @@ asyncQueueAddEntries(ListCell *nextNotify) while (nextNotify != NULL) { - Notification *n = (Notification *) lfirst(nextNotify); + Notification *n = (Notification *) lfirst(nextNotify); /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); @@ -1335,8 +1337,8 @@ asyncQueueAddEntries(ListCell *nextNotify) */ qe.length = QUEUE_PAGESIZE - offset; qe.dboid = InvalidOid; - qe.data[0] = '\0'; /* empty channel */ - qe.data[1] = '\0'; /* empty payload */ + qe.data[0] = '\0'; /* empty channel */ + qe.data[1] = '\0'; /* empty payload */ } /* Now copy qe into the shared buffer page */ @@ -1348,12 +1350,12 @@ asyncQueueAddEntries(ListCell *nextNotify) if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) { /* - * Page is full, so we're done here, but first fill the next - * page with zeroes. The reason to do this is to ensure that - * slru.c's idea of the head page is always the same as ours, - * which avoids boundary problems in SimpleLruTruncate. The - * test in asyncQueueIsFull() ensured that there is room to - * create this page without overrunning the queue. + * Page is full, so we're done here, but first fill the next page + * with zeroes. The reason to do this is to ensure that slru.c's + * idea of the head page is always the same as ours, which avoids + * boundary problems in SimpleLruTruncate. The test in + * asyncQueueIsFull() ensured that there is room to create this + * page without overrunning the queue. */ slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); /* And exit the loop */ @@ -1377,24 +1379,24 @@ asyncQueueAddEntries(ListCell *nextNotify) static void asyncQueueFillWarning(void) { - int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); - int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); - int occupied; - double fillDegree; - TimestampTz t; + int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); + int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); + int occupied; + double fillDegree; + TimestampTz t; occupied = headPage - tailPage; if (occupied == 0) return; /* fast exit for common case */ - + if (occupied < 0) { /* head has wrapped around, tail not yet */ - occupied += QUEUE_MAX_PAGE+1; + occupied += QUEUE_MAX_PAGE + 1; } - fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2); + fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); if (fillDegree < 0.5) return; @@ -1404,9 +1406,9 @@ asyncQueueFillWarning(void) if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, t, QUEUE_FULL_WARN_INTERVAL)) { - QueuePosition min = QUEUE_HEAD; - int32 minPid = InvalidPid; - int i; + QueuePosition min = QUEUE_HEAD; + int32 minPid = InvalidPid; + int i; for (i = 1; i <= MaxBackends; i++) { @@ -1455,13 +1457,13 @@ SignalBackends(void) int32 pid; /* - * Identify all backends that are listening and not already up-to-date. - * We don't want to send signals while holding the AsyncQueueLock, so - * we just build a list of target PIDs. + * Identify all backends that are listening and not already up-to-date. We + * don't want to send signals while holding the AsyncQueueLock, so we just + * build a list of target PIDs. * - * XXX in principle these pallocs could fail, which would be bad. - * Maybe preallocate the arrays? But in practice this is only run - * in trivial transactions, so there should surely be space available. + * XXX in principle these pallocs could fail, which would be bad. Maybe + * preallocate the arrays? But in practice this is only run in trivial + * transactions, so there should surely be space available. */ pids = (int32 *) palloc(MaxBackends * sizeof(int32)); ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); @@ -1493,8 +1495,8 @@ SignalBackends(void) /* * Note: assuming things aren't broken, a signal failure here could * only occur if the target backend exited since we released - * AsyncQueueLock; which is unlikely but certainly possible. - * So we just log a low-level debug message if it happens. + * AsyncQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. */ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); @@ -1521,8 +1523,8 @@ AtAbort_Notify(void) { /* * If we LISTEN but then roll back the transaction we have set our pointer - * but have not made any entry in listenChannels. In that case, remove - * our pointer again. + * but have not made any entry in listenChannels. In that case, remove our + * pointer again. */ if (backendHasExecutedInitialListen) { @@ -1778,7 +1780,7 @@ EnableNotifyInterrupt(void) * is disabled until the next EnableNotifyInterrupt call. * * The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this, - * so as to prevent conflicts if one signal interrupts the other. So we + * so as to prevent conflicts if one signal interrupts the other. So we * must return the previous state of the flag. */ bool @@ -1799,15 +1801,17 @@ DisableNotifyInterrupt(void) static void asyncQueueReadAllNotifications(void) { - QueuePosition pos; - QueuePosition oldpos; - QueuePosition head; + QueuePosition pos; + QueuePosition oldpos; + QueuePosition head; bool advanceTail; + /* page_buffer must be adequately aligned, so use a union */ - union { + union + { char buf[QUEUE_PAGESIZE]; AsyncQueueEntry align; - } page_buffer; + } page_buffer; /* Fetch current state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); @@ -1829,16 +1833,16 @@ asyncQueueReadAllNotifications(void) * Especially we do not take into account different commit times. * Consider the following example: * - * Backend 1: Backend 2: + * Backend 1: Backend 2: * * transaction starts * NOTIFY foo; * commit starts - * transaction starts - * LISTEN foo; - * commit starts + * transaction starts + * LISTEN foo; + * commit starts * commit to clog - * commit to clog + * commit to clog * * It could happen that backend 2 sees the notification from backend 1 in * the queue. Even though the notifying transaction committed before @@ -1861,7 +1865,7 @@ asyncQueueReadAllNotifications(void) { bool reachedStop; - do + do { int curpage = QUEUE_POS_PAGE(pos); int curoffset = QUEUE_POS_OFFSET(pos); @@ -1871,7 +1875,7 @@ asyncQueueReadAllNotifications(void) /* * We copy the data from SLRU into a local buffer, so as to avoid * holding the AsyncCtlLock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part + * possibly transmitting them to our frontend. Copy only the part * of the page we will actually inspect. */ slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage, @@ -1881,7 +1885,7 @@ asyncQueueReadAllNotifications(void) /* we only want to read as far as head */ copysize = QUEUE_POS_OFFSET(head) - curoffset; if (copysize < 0) - copysize = 0; /* just for safety */ + copysize = 0; /* just for safety */ } else { @@ -1899,9 +1903,9 @@ asyncQueueReadAllNotifications(void) * uncommitted message. * * Our stop position is what we found to be the head's position - * when we entered this function. It might have changed - * already. But if it has, we will receive (or have already - * received and queued) another signal and come here again. + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. * * We are not holding AsyncQueueLock here! The queue can only * extend beyond the head pointer (see above) and we leave our @@ -1945,7 +1949,7 @@ asyncQueueReadAllNotifications(void) * and deliver relevant ones to my frontend. * * The current page must have been fetched into page_buffer from shared - * memory. (We could access the page right in shared memory, but that + * memory. (We could access the page right in shared memory, but that * would imply holding the AsyncCtlLock throughout this routine.) * * We stop if we reach the "stop" position, or reach a notification from an @@ -1963,11 +1967,11 @@ asyncQueueProcessPageEntries(QueuePosition *current, { bool reachedStop = false; bool reachedEndOfPage; - AsyncQueueEntry *qe; + AsyncQueueEntry *qe; do { - QueuePosition thisentry = *current; + QueuePosition thisentry = *current; if (QUEUE_POS_EQUAL(thisentry, stop)) break; @@ -1975,9 +1979,9 @@ asyncQueueProcessPageEntries(QueuePosition *current, qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); /* - * Advance *current over this message, possibly to the next page. - * As noted in the comments for asyncQueueReadAllNotifications, we - * must do this before possibly failing while processing the message. + * Advance *current over this message, possibly to the next page. As + * noted in the comments for asyncQueueReadAllNotifications, we must + * do this before possibly failing while processing the message. */ reachedEndOfPage = asyncQueueAdvance(current, qe->length); @@ -1987,12 +1991,12 @@ asyncQueueProcessPageEntries(QueuePosition *current, if (TransactionIdDidCommit(qe->xid)) { /* qe->data is the null-terminated channel name */ - char *channel = qe->data; + char *channel = qe->data; if (IsListeningOn(channel)) { /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; + char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); } @@ -2008,12 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current, { /* * The transaction has neither committed nor aborted so far, - * so we can't process its message yet. Break out of the loop, - * but first back up *current so we will reprocess the message - * next time. (Note: it is unlikely but not impossible for - * TransactionIdDidCommit to fail, so we can't really avoid - * this advance-then-back-up behavior when dealing with an - * uncommitted message.) + * so we can't process its message yet. Break out of the + * loop, but first back up *current so we will reprocess the + * message next time. (Note: it is unlikely but not + * impossible for TransactionIdDidCommit to fail, so we can't + * really avoid this advance-then-back-up behavior when + * dealing with an uncommitted message.) */ *current = thisentry; reachedStop = true; @@ -2037,11 +2041,11 @@ asyncQueueProcessPageEntries(QueuePosition *current, static void asyncQueueAdvanceTail(void) { - QueuePosition min; - int i; - int oldtailpage; - int newtailpage; - int boundary; + QueuePosition min; + int i; + int oldtailpage; + int newtailpage; + int boundary; LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; @@ -2058,16 +2062,16 @@ asyncQueueAdvanceTail(void) * We can truncate something if the global tail advanced across an SLRU * segment boundary. * - * XXX it might be better to truncate only once every several segments, - * to reduce the number of directory scans. + * XXX it might be better to truncate only once every several segments, to + * reduce the number of directory scans. */ newtailpage = QUEUE_POS_PAGE(min); boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); if (asyncQueuePagePrecedesLogically(oldtailpage, boundary)) { /* - * SimpleLruTruncate() will ask for AsyncCtlLock but will also - * release the lock again. + * SimpleLruTruncate() will ask for AsyncCtlLock but will also release + * the lock again. */ SimpleLruTruncate(AsyncCtl, newtailpage); } @@ -2104,8 +2108,8 @@ ProcessIncomingNotify(void) notifyInterruptOccurred = 0; /* - * We must run asyncQueueReadAllNotifications inside a transaction, - * else bad things happen if it gets an error. + * We must run asyncQueueReadAllNotifications inside a transaction, else + * bad things happen if it gets an error. */ StartTransactionCommand(); |