diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/heap/heapam.c | 484 |
1 files changed, 439 insertions, 45 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 81422afa2f8..72ed6325384 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -24,6 +24,7 @@ * heap_getnext - retrieve next tuple in scan * heap_fetch - retrieve tuple with given tid * heap_insert - insert tuple into a relation + * heap_multi_insert - insert multiple tuples into a relation * heap_delete - delete a tuple from a relation * heap_update - replace a tuple in a relation with another tuple * heap_markpos - mark scan position @@ -79,6 +80,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool is_bitmapscan); +static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, + TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool all_visible_cleared, bool new_all_visible_cleared); @@ -1866,55 +1869,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; - if (relation->rd_rel->relhasoids) - { -#ifdef NOT_USED - /* this is redundant with an Assert in HeapTupleSetOid */ - Assert(tup->t_data->t_infomask & HEAP_HASOID); -#endif - - /* - * If the object id of this tuple has already been assigned, trust the - * caller. There are a couple of ways this can happen. At initial db - * creation, the backend program sets oids for tuples. When we define - * an index, we set the oid. Finally, in the future, we may allow - * users to set their own object ids in order to support a persistent - * object store (objects need to contain pointers to one another). - */ - if (!OidIsValid(HeapTupleGetOid(tup))) - HeapTupleSetOid(tup, GetNewOid(relation)); - } - else - { - /* check there is not space for an OID */ - Assert(!(tup->t_data->t_infomask & HEAP_HASOID)); - } - - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); - tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); - tup->t_data->t_infomask |= HEAP_XMAX_INVALID; - HeapTupleHeaderSetXmin(tup->t_data, xid); - HeapTupleHeaderSetCmin(tup->t_data, cid); - HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */ - tup->t_tableOid = RelationGetRelid(relation); - /* - * If the new tuple is too big for storage or contains already toasted - * out-of-line attributes from some other relation, invoke the toaster. + * Fill in tuple header fields, assign an OID, and toast the tuple if + * necessary. * * Note: below this point, heaptup is the data we actually intend to store * into the relation; tup is the caller's original untoasted data. */ - if (relation->rd_rel->relkind != RELKIND_RELATION) - { - /* toast table entries should never be recursively toasted */ - Assert(!HeapTupleHasExternal(tup)); - heaptup = tup; - } - else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) - heaptup = toast_insert_or_update(relation, tup, NULL, options); - else - heaptup = tup; + heaptup = heap_prepare_insert(relation, tup, xid, cid, options); /* * We're about to do the actual insert -- but check for conflict first, @@ -2035,7 +1997,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ CacheInvalidateHeapTuple(relation, heaptup, NULL); - pgstat_count_heap_insert(relation); + pgstat_count_heap_insert(relation, 1); /* * If heaptup is a private copy, release it. Don't forget to copy t_self @@ -2051,6 +2013,285 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } /* + * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the + * tuple header fields, assigns an OID, and toasts the tuple if necessary. + * Returns a toasted version of the tuple if it was toasted, or the original + * tuple if not. Note that in any case, the header fields are also set in + * the original tuple. + */ +static HeapTuple +heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options) +{ + if (relation->rd_rel->relhasoids) + { +#ifdef NOT_USED + /* this is redundant with an Assert in HeapTupleSetOid */ + Assert(tup->t_data->t_infomask & HEAP_HASOID); +#endif + + /* + * If the object id of this tuple has already been assigned, trust the + * caller. There are a couple of ways this can happen. At initial db + * creation, the backend program sets oids for tuples. When we define + * an index, we set the oid. Finally, in the future, we may allow + * users to set their own object ids in order to support a persistent + * object store (objects need to contain pointers to one another). + */ + if (!OidIsValid(HeapTupleGetOid(tup))) + HeapTupleSetOid(tup, GetNewOid(relation)); + } + else + { + /* check there is not space for an OID */ + Assert(!(tup->t_data->t_infomask & HEAP_HASOID)); + } + + tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); + tup->t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmin(tup->t_data, xid); + HeapTupleHeaderSetCmin(tup->t_data, cid); + HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */ + tup->t_tableOid = RelationGetRelid(relation); + + /* + * If the new tuple is too big for storage or contains already toasted + * out-of-line attributes from some other relation, invoke the toaster. + */ + if (relation->rd_rel->relkind != RELKIND_RELATION) + { + /* toast table entries should never be recursively toasted */ + Assert(!HeapTupleHasExternal(tup)); + return tup; + } + else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) + return toast_insert_or_update(relation, tup, NULL, options); + else + return tup; +} + +/* + * heap_multi_insert - insert multiple tuple into a heap + * + * This is like heap_insert(), but inserts multiple tuples in one operation. + * That's faster than calling heap_insert() in a loop, because when multiple + * tuples can be inserted on a single page, we can write just a single WAL + * record covering all of them, and only need to lock/unlock the page once. + * + * Note: this leaks memory into the current memory context. You can create a + * temporary context before calling this, if that's a problem. + */ +void +heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, + CommandId cid, int options, BulkInsertState bistate) +{ + TransactionId xid = GetCurrentTransactionId(); + HeapTuple *heaptuples; + Buffer buffer; + Buffer vmbuffer = InvalidBuffer; + bool all_visible_cleared = false; + int i; + int ndone; + char *scratch = NULL; + Page page; + bool needwal; + + needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); + + /* Toast and set header data in all the tuples */ + heaptuples = palloc(ntuples * sizeof(HeapTuple)); + for (i = 0; i < ntuples; i++) + heaptuples[i] = heap_prepare_insert(relation, tuples[i], + xid, cid, options); + + /* + * Allocate some memory to use for constructing the WAL record. Using + * palloc() within a critical section is not safe, so we allocate this + * beforehand. + */ + if (needwal) + scratch = palloc(BLCKSZ); + + /* + * We're about to do the actual inserts -- but check for conflict first, + * to avoid possibly having to roll back work we've just done. + * + * For a heap insert, we only need to check for table-level SSI locks. + * Our new tuple can't possibly conflict with existing tuple locks, and + * heap page locks are only consolidated versions of tuple locks; they do + * not lock "gaps" as index page locks do. So we don't need to identify + * a buffer before making the call. + */ + CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); + + ndone = 0; + while (ndone < ntuples) + { + int nthispage; + + /* + * Find buffer where at least the next tuple will fit. If the page + * is all-visible, this will also pin the requisite visibility map + * page. + */ + buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, + InvalidBuffer, options, bistate, + &vmbuffer, NULL); + page = BufferGetPage(buffer); + + if (PageIsAllVisible(page)) + { + all_visible_cleared = true; + PageClearAllVisible(page); + visibilitymap_clear(relation, + BufferGetBlockNumber(buffer), + vmbuffer); + } + + /* NO EREPORT(ERROR) from here till changes are logged */ + START_CRIT_SECTION(); + + /* Put as many tuples as fit on this page */ + for (nthispage = 0; ndone + nthispage < ntuples; nthispage++) + { + HeapTuple heaptup = heaptuples[ndone + nthispage]; + + if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len)) + break; + + RelationPutHeapTuple(relation, buffer, heaptup); + } + + /* + * XXX Should we set PageSetPrunable on this page ? See heap_insert() + */ + + MarkBufferDirty(buffer); + + /* XLOG stuff */ + if (needwal) + { + XLogRecPtr recptr; + xl_heap_multi_insert *xlrec; + XLogRecData rdata[2]; + uint8 info = XLOG_HEAP2_MULTI_INSERT; + char *tupledata; + int totaldatalen; + char *scratchptr = scratch; + bool init; + + /* + * If the page was previously empty, we can reinit the page + * instead of restoring the whole thing. + */ + init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1); + + /* allocate xl_heap_multi_insert struct from the scratch area */ + xlrec = (xl_heap_multi_insert *) scratchptr; + scratchptr += SizeOfHeapMultiInsert; + + /* + * Allocate offsets array. Unless we're reinitializing the page, + * in that case the tuples are stored in order starting at + * FirstOffsetNumber and we don't need to store the offsets + * explicitly. + */ + if (!init) + scratchptr += nthispage * sizeof(OffsetNumber); + + /* the rest of the scratch space is used for tuple data */ + tupledata = scratchptr; + + xlrec->all_visible_cleared = all_visible_cleared; + xlrec->node = relation->rd_node; + xlrec->blkno = BufferGetBlockNumber(buffer); + xlrec->ntuples = nthispage; + + /* + * Write out an xl_multi_insert_tuple and the tuple data itself + * for each tuple. + */ + for (i = 0; i < nthispage; i++) + { + HeapTuple heaptup = heaptuples[ndone + i]; + xl_multi_insert_tuple *tuphdr; + int datalen; + + if (!init) + xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self); + /* xl_multi_insert_tuple needs two-byte alignment. */ + tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr); + scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple; + + tuphdr->t_infomask2 = heaptup->t_data->t_infomask2; + tuphdr->t_infomask = heaptup->t_data->t_infomask; + tuphdr->t_hoff = heaptup->t_data->t_hoff; + + /* write bitmap [+ padding] [+ oid] + data */ + datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); + memcpy(scratchptr, + (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits), + datalen); + tuphdr->datalen = datalen; + scratchptr += datalen; + } + totaldatalen = scratchptr - tupledata; + Assert((scratchptr - scratch) < BLCKSZ); + + rdata[0].data = (char *) xlrec; + rdata[0].len = tupledata - scratch; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &rdata[1]; + + rdata[1].data = tupledata; + rdata[1].len = totaldatalen; + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + rdata[1].next = NULL; + + /* + * If we're going to reinitialize the whole page using the WAL + * record, hide buffer reference from XLogInsert. + */ + if (init) + { + rdata[1].buffer = InvalidBuffer; + info |= XLOG_HEAP_INIT_PAGE; + } + + recptr = XLogInsert(RM_HEAP2_ID, info, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); + + ndone += nthispage; + } + + /* + * If tuples are cachable, mark them for invalidation from the caches in + * case we abort. Note it is OK to do this after releasing the buffer, + * because the heaptuples data structure is all in local memory, not in + * the shared buffer. + */ + if (IsSystemRelation(relation)) + { + for (i = 0; i < ntuples; i++) + CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); + } + + pgstat_count_heap_insert(relation, ntuples); +} + +/* * simple_heap_insert - insert a tuple * * Currently, this routine differs from heap_insert only in supplying @@ -4776,6 +5017,144 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) } /* + * Handles MULTI_INSERT record type. + */ +static void +heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) +{ + char *recdata = XLogRecGetData(record); + xl_heap_multi_insert *xlrec; + Buffer buffer; + Page page; + struct + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; + HeapTupleHeader htup; + uint32 newlen; + Size freespace; + BlockNumber blkno; + int i; + bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + + xlrec = (xl_heap_multi_insert *) recdata; + recdata += SizeOfHeapMultiInsert; + + /* + * If we're reinitializing the page, the tuples are stored in order from + * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL + * record. + */ + if (!isinit) + recdata += sizeof(OffsetNumber) * xlrec->ntuples; + + blkno = xlrec->blkno; + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->node); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + + if (isinit) + { + buffer = XLogReadBuffer(xlrec->node, blkno, true); + Assert(BufferIsValid(buffer)); + page = (Page) BufferGetPage(buffer); + + PageInit(page, BufferGetPageSize(buffer), 0); + } + else + { + buffer = XLogReadBuffer(xlrec->node, blkno, false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockReleaseBuffer(buffer); + return; + } + } + + for (i = 0; i < xlrec->ntuples; i++) + { + OffsetNumber offnum; + xl_multi_insert_tuple *xlhdr; + + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = xlrec->offsets[i]; + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "heap_multi_insert_redo: invalid max offset number"); + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata); + recdata += SizeOfMultiInsertTuple; + + newlen = xlhdr->datalen; + Assert(newlen <= MaxHeapTupleSize); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), + (char *) recdata, + newlen); + recdata += newlen; + + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_infomask2 = xlhdr->t_infomask2; + htup->t_infomask = xlhdr->t_infomask; + htup->t_hoff = xlhdr->t_hoff; + HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + ItemPointerSetBlockNumber(&htup->t_ctid, blkno); + ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); + + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "heap_multi_insert_redo: failed to add tuple"); + } + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); + + /* + * If the page is running low on free space, update the FSM as well. + * Arbitrarily, our definition of "low" is less than 20%. We can't do much + * better than that without knowing the fill-factor for the table. + * + * XXX: We don't get here if the page was restored from full page image. + * We don't bother to update the FSM in that case, it doesn't need to be + * totally accurate anyway. + */ + if (freespace < BLCKSZ / 5) + XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace); +} + +/* * Handles UPDATE and HOT_UPDATE */ static void @@ -5164,6 +5543,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_VISIBLE: heap_xlog_visible(lsn, record); break; + case XLOG_HEAP2_MULTI_INSERT: + heap_xlog_multi_insert(lsn, record); + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } @@ -5301,6 +5683,18 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->block); } + else if (info == XLOG_HEAP2_MULTI_INSERT) + { + xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec; + + if (xl_info & XLOG_HEAP_INIT_PAGE) + appendStringInfo(buf, "multi-insert (init): "); + else + appendStringInfo(buf, "multi-insert: "); + appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples", + xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, + xlrec->blkno, xlrec->ntuples); + } else appendStringInfo(buf, "UNKNOWN"); } |