diff options
Diffstat (limited to 'src/backend/storage/buffer/localbuf.c')
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 156 |
1 files changed, 155 insertions, 1 deletions
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index c9ba5ee00ff..3846d3eaca4 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -49,6 +49,9 @@ static int nextFreeLocalBufId = 0; static HTAB *LocalBufHash = NULL; +/* number of local buffers pinned at least once */ +static int NLocalPinnedBuffers = 0; + static void InitLocalBuffers(void); static Block GetLocalBufferStorage(void); @@ -273,6 +276,154 @@ GetLocalVictimBuffer(void) return BufferDescriptorGetBuffer(bufHdr); } +/* see LimitAdditionalPins() */ +static void +LimitAdditionalLocalPins(uint32 *additional_pins) +{ + uint32 max_pins; + + if (*additional_pins <= 1) + return; + + /* + * In contrast to LimitAdditionalPins() other backends don't play a role + * here. We can allow up to NLocBuffer pins in total. + */ + max_pins = (NLocBuffer - NLocalPinnedBuffers); + + if (*additional_pins >= max_pins) + *additional_pins = max_pins; +} + +/* + * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for + * temporary buffers. + */ +BlockNumber +ExtendBufferedRelLocal(ExtendBufferedWhat eb, + ForkNumber fork, + uint32 flags, + uint32 extend_by, + BlockNumber extend_upto, + Buffer *buffers, + uint32 *extended_by) +{ + BlockNumber first_block; + + /* Initialize local buffers if first request in this session */ + if (LocalBufHash == NULL) + InitLocalBuffers(); + + LimitAdditionalLocalPins(&extend_by); + + for (uint32 i = 0; i < extend_by; i++) + { + BufferDesc *buf_hdr; + Block buf_block; + + buffers[i] = GetLocalVictimBuffer(); + buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1); + buf_block = LocalBufHdrGetBlock(buf_hdr); + + /* new buffers are zero-filled */ + MemSet((char *) buf_block, 0, BLCKSZ); + } + + first_block = smgrnblocks(eb.smgr, fork); + + if (extend_upto != InvalidBlockNumber) + { + /* + * In contrast to shared relations, nothing could change the relation + * size concurrently. Thus we shouldn't end up finding that we don't + * need to do anything. + */ + Assert(first_block <= extend_upto); + + Assert((uint64) first_block + extend_by <= extend_upto); + } + + /* Fail if relation is already at maximum possible length */ + if ((uint64) first_block + extend_by >= MaxBlockNumber) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("cannot extend relation %s beyond %u blocks", + relpath(eb.smgr->smgr_rlocator, fork), + MaxBlockNumber))); + + for (int i = 0; i < extend_by; i++) + { + int victim_buf_id; + BufferDesc *victim_buf_hdr; + BufferTag tag; + LocalBufferLookupEnt *hresult; + bool found; + + victim_buf_id = -buffers[i] - 1; + victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id); + + InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i); + + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found); + if (found) + { + BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id); + uint32 buf_state; + + UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr)); + + existing_hdr = GetLocalBufferDescriptor(hresult->id); + PinLocalBuffer(existing_hdr, false); + buffers[i] = BufferDescriptorGetBuffer(existing_hdr); + + buf_state = pg_atomic_read_u32(&existing_hdr->state); + Assert(buf_state & BM_TAG_VALID); + Assert(!(buf_state & BM_DIRTY)); + buf_state &= BM_VALID; + pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state); + } + else + { + uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state); + + Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED))); + + victim_buf_hdr->tag = tag; + + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + + pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state); + + hresult->id = victim_buf_id; + } + } + + /* actually extend relation */ + smgrzeroextend(eb.smgr, fork, first_block, extend_by, false); + + for (int i = 0; i < extend_by; i++) + { + Buffer buf = buffers[i]; + BufferDesc *buf_hdr; + uint32 buf_state; + + buf_hdr = GetLocalBufferDescriptor(-buf - 1); + + buf_state = pg_atomic_read_u32(&buf_hdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); + } + + *extended_by = extend_by; + + pgBufferUsage.temp_blks_written += extend_by; + pgstat_count_io_op_n(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND, + extend_by); + + return first_block; +} + /* * MarkLocalBufferDirty - * mark a local buffer dirty @@ -492,6 +643,7 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount) if (LocalRefCount[bufid] == 0) { + NLocalPinnedBuffers++; if (adjust_usagecount && BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) { @@ -513,9 +665,11 @@ UnpinLocalBuffer(Buffer buffer) Assert(BufferIsLocal(buffer)); Assert(LocalRefCount[buffid] > 0); + Assert(NLocalPinnedBuffers > 0); ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); - LocalRefCount[buffid]--; + if (--LocalRefCount[buffid] == 0) + NLocalPinnedBuffers--; } /* |