aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/localbuf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/localbuf.c')
-rw-r--r--src/backend/storage/buffer/localbuf.c156
1 files changed, 155 insertions, 1 deletions
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index c9ba5ee00ff..3846d3eaca4 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -49,6 +49,9 @@ static int nextFreeLocalBufId = 0;
static HTAB *LocalBufHash = NULL;
+/* number of local buffers pinned at least once */
+static int NLocalPinnedBuffers = 0;
+
static void InitLocalBuffers(void);
static Block GetLocalBufferStorage(void);
@@ -273,6 +276,154 @@ GetLocalVictimBuffer(void)
return BufferDescriptorGetBuffer(bufHdr);
}
+/* see LimitAdditionalPins() */
+static void
+LimitAdditionalLocalPins(uint32 *additional_pins)
+{
+ uint32 max_pins;
+
+ if (*additional_pins <= 1)
+ return;
+
+ /*
+ * In contrast to LimitAdditionalPins() other backends don't play a role
+ * here. We can allow up to NLocBuffer pins in total.
+ */
+ max_pins = (NLocBuffer - NLocalPinnedBuffers);
+
+ if (*additional_pins >= max_pins)
+ *additional_pins = max_pins;
+}
+
+/*
+ * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
+ * temporary buffers.
+ */
+BlockNumber
+ExtendBufferedRelLocal(ExtendBufferedWhat eb,
+ ForkNumber fork,
+ uint32 flags,
+ uint32 extend_by,
+ BlockNumber extend_upto,
+ Buffer *buffers,
+ uint32 *extended_by)
+{
+ BlockNumber first_block;
+
+ /* Initialize local buffers if first request in this session */
+ if (LocalBufHash == NULL)
+ InitLocalBuffers();
+
+ LimitAdditionalLocalPins(&extend_by);
+
+ for (uint32 i = 0; i < extend_by; i++)
+ {
+ BufferDesc *buf_hdr;
+ Block buf_block;
+
+ buffers[i] = GetLocalVictimBuffer();
+ buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+ buf_block = LocalBufHdrGetBlock(buf_hdr);
+
+ /* new buffers are zero-filled */
+ MemSet((char *) buf_block, 0, BLCKSZ);
+ }
+
+ first_block = smgrnblocks(eb.smgr, fork);
+
+ if (extend_upto != InvalidBlockNumber)
+ {
+ /*
+ * In contrast to shared relations, nothing could change the relation
+ * size concurrently. Thus we shouldn't end up finding that we don't
+ * need to do anything.
+ */
+ Assert(first_block <= extend_upto);
+
+ Assert((uint64) first_block + extend_by <= extend_upto);
+ }
+
+ /* Fail if relation is already at maximum possible length */
+ if ((uint64) first_block + extend_by >= MaxBlockNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("cannot extend relation %s beyond %u blocks",
+ relpath(eb.smgr->smgr_rlocator, fork),
+ MaxBlockNumber)));
+
+ for (int i = 0; i < extend_by; i++)
+ {
+ int victim_buf_id;
+ BufferDesc *victim_buf_hdr;
+ BufferTag tag;
+ LocalBufferLookupEnt *hresult;
+ bool found;
+
+ victim_buf_id = -buffers[i] - 1;
+ victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
+
+ InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
+
+ hresult = (LocalBufferLookupEnt *)
+ hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
+ if (found)
+ {
+ BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
+ uint32 buf_state;
+
+ UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
+
+ existing_hdr = GetLocalBufferDescriptor(hresult->id);
+ PinLocalBuffer(existing_hdr, false);
+ buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
+
+ buf_state = pg_atomic_read_u32(&existing_hdr->state);
+ Assert(buf_state & BM_TAG_VALID);
+ Assert(!(buf_state & BM_DIRTY));
+ buf_state &= BM_VALID;
+ pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
+ }
+ else
+ {
+ uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
+
+ Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
+
+ victim_buf_hdr->tag = tag;
+
+ buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+
+ pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
+
+ hresult->id = victim_buf_id;
+ }
+ }
+
+ /* actually extend relation */
+ smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
+
+ for (int i = 0; i < extend_by; i++)
+ {
+ Buffer buf = buffers[i];
+ BufferDesc *buf_hdr;
+ uint32 buf_state;
+
+ buf_hdr = GetLocalBufferDescriptor(-buf - 1);
+
+ buf_state = pg_atomic_read_u32(&buf_hdr->state);
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+ }
+
+ *extended_by = extend_by;
+
+ pgBufferUsage.temp_blks_written += extend_by;
+ pgstat_count_io_op_n(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
+ extend_by);
+
+ return first_block;
+}
+
/*
* MarkLocalBufferDirty -
* mark a local buffer dirty
@@ -492,6 +643,7 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
if (LocalRefCount[bufid] == 0)
{
+ NLocalPinnedBuffers++;
if (adjust_usagecount &&
BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
{
@@ -513,9 +665,11 @@ UnpinLocalBuffer(Buffer buffer)
Assert(BufferIsLocal(buffer));
Assert(LocalRefCount[buffid] > 0);
+ Assert(NLocalPinnedBuffers > 0);
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
- LocalRefCount[buffid]--;
+ if (--LocalRefCount[buffid] == 0)
+ NLocalPinnedBuffers--;
}
/*