aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/async.c39
-rw-r--r--src/backend/utils/time/tqual.c9
-rw-r--r--src/include/utils/tqual.h1
3 files changed, 34 insertions, 15 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f642223d9f3..74e9ed23c67 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,7 +133,9 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
#include "utils/timestamp.h"
+#include "utils/tqual.h"
/*
@@ -386,7 +388,8 @@ static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer);
+ char *page_buffer,
+ Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
@@ -797,7 +800,7 @@ PreCommit_Notify(void)
}
}
- /* Queue any pending notifies */
+ /* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
ListCell *nextNotify;
@@ -986,7 +989,9 @@ Exec_ListenPreCommit(void)
* have already committed before we started to LISTEN.
*
* Note that we are not yet listening on anything, so we won't deliver any
- * notification to the frontend.
+ * notification to the frontend. Also, although our transaction might
+ * have executed NOTIFY, those message(s) aren't queued yet so we can't
+ * see them in the queue.
*
* This will also advance the global tail pointer if possible.
*/
@@ -1835,6 +1840,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
+ Snapshot snapshot;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
@@ -1858,6 +1864,9 @@ asyncQueueReadAllNotifications(void)
return;
}
+ /* Get snapshot we'll use to decide which xacts are still in progress */
+ snapshot = RegisterSnapshot(GetLatestSnapshot());
+
/*----------
* Note that we deliver everything that we see in the queue and that
* matches our _current_ listening state.
@@ -1945,7 +1954,8 @@ asyncQueueReadAllNotifications(void)
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf);
+ page_buffer.buf,
+ snapshot);
} while (!reachedStop);
}
PG_CATCH();
@@ -1973,6 +1983,9 @@ asyncQueueReadAllNotifications(void)
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
+
+ /* Done with snapshot */
+ UnregisterSnapshot(snapshot);
}
/*
@@ -1994,7 +2007,8 @@ asyncQueueReadAllNotifications(void)
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
- char *page_buffer)
+ char *page_buffer,
+ Snapshot snapshot)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -2019,7 +2033,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
- if (TransactionIdIsInProgress(qe->xid))
+ if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
* The source transaction is still in progress, so we can't
@@ -2030,10 +2044,15 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
- * Note that we must test TransactionIdIsInProgress before we
- * test TransactionIdDidCommit, else we might return a message
- * from a transaction that is not yet visible to snapshots;
- * compare the comments at the head of tqual.c.
+ * Note that we must test XidInMVCCSnapshot before we test
+ * TransactionIdDidCommit, else we might return a message from
+ * a transaction that is not yet visible to snapshots; compare
+ * the comments at the head of tqual.c.
+ *
+ * Also, while our own xact won't be listed in the snapshot,
+ * we need not check for TransactionIdIsCurrentTransactionId
+ * because our transaction cannot (yet) have queued any
+ * messages.
*/
*current = thisentry;
reachedStop = true;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index 2d98f377349..568f5176e97 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -73,8 +73,6 @@ SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
-/* local functions */
-static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
/*
* SetHintBits()
@@ -1404,10 +1402,11 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
*
* Note: GetSnapshotData never stores either top xid or subxids of our own
* backend into a snapshot, so these xids will not be reported as "running"
- * by this function. This is OK for current uses, because we actually only
- * apply this for known-committed XIDs.
+ * by this function. This is OK for current uses, because we always check
+ * TransactionIdIsCurrentTransactionId first, except when it's known the
+ * XID could not be ours anyway.
*/
-static bool
+bool
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index e68ef0a4fdb..9e878a3df1b 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -85,6 +85,7 @@ extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup,
TransactionId OldestXmin, Buffer buffer);
extern bool HeapTupleIsSurelyDead(HeapTuple htup,
TransactionId OldestXmin);
+extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
uint16 infomask, TransactionId xid);