aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/hio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/hio.c')
-rw-r--r--src/backend/access/heap/hio.c48
1 files changed, 47 insertions, 1 deletions
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 72a69e52b02..aee2a205aa4 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -17,6 +17,7 @@
#include "access/heapam.h"
#include "access/hio.h"
+#include "access/visibilitymap.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
@@ -150,7 +151,8 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
Buffer otherBuffer, int options,
- struct BulkInsertStateData * bistate)
+ struct BulkInsertStateData * bistate,
+ Buffer *vmbuffer)
{
bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
Buffer buffer = InvalidBuffer;
@@ -237,23 +239,37 @@ RelationGetBufferForTuple(Relation relation, Size len,
* Read and exclusive-lock the target block, as well as the other
* block if one was given, taking suitable care with lock ordering and
* the possibility they are the same block.
+ *
+ * If the page-level all-visible flag is set, caller will need to clear
+ * both that and the corresponding visibility map bit. However, by the
+ * time we return, we'll have x-locked the buffer, and we don't want to
+ * do any I/O while in that state. So we check the bit here before
+ * taking the lock, and pin the page if it appears necessary.
+ * Checking without the lock creates a risk of getting the wrong
+ * answer, so we'll have to recheck after acquiring the lock.
*/
if (otherBuffer == InvalidBuffer)
{
/* easy case */
buffer = ReadBufferBI(relation, targetBlock, bistate);
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock == targetBlock)
{
/* also easy case */
buffer = otherBuffer;
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock < targetBlock)
{
/* lock other buffer first */
buffer = ReadBuffer(relation, targetBlock);
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
@@ -261,11 +277,41 @@ RelationGetBufferForTuple(Relation relation, Size len,
{
/* lock target buffer first */
buffer = ReadBuffer(relation, targetBlock);
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
}
/*
+ * If the page is all visible but we don't have the right visibility
+ * map page pinned, then give up our locks, go get the pin, and
+ * re-lock. This is pretty painful, but hopefully shouldn't happen
+ * often. Note that there's a small possibility that we didn't pin
+ * the page above but still have the correct page pinned anyway, either
+ * because we've already made a previous pass through this loop, or
+ * because caller passed us the right page anyway.
+ *
+ * Note also that it's possible that by the time we get the pin and
+ * retake the buffer locks, the visibility map bit will have been
+ * cleared by some other backend anyway. In that case, we'll have done
+ * a bit of extra work for no gain, but there's no real harm done.
+ */
+ if (PageIsAllVisible(BufferGetPage(buffer))
+ && !visibilitymap_pin_ok(targetBlock, *vmbuffer))
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ if (otherBlock != targetBlock)
+ LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
+ visibilitymap_pin(relation, targetBlock, vmbuffer);
+ if (otherBuffer != InvalidBuffer && otherBlock < targetBlock)
+ LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ if (otherBuffer != InvalidBuffer && otherBlock > targetBlock)
+ LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+ }
+
+ /*
* Now we can check to see if there's enough free space here. If so,
* we're done.
*/