aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/heap/heapam.c484
1 files changed, 439 insertions, 45 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 81422afa2f8..72ed6325384 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -24,6 +24,7 @@
* heap_getnext - retrieve next tuple in scan
* heap_fetch - retrieve tuple with given tid
* heap_insert - insert tuple into a relation
+ * heap_multi_insert - insert multiple tuples into a relation
* heap_delete - delete a tuple from a relation
* heap_update - replace a tuple in a relation with another tuple
* heap_markpos - mark scan position
@@ -79,6 +80,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync,
bool is_bitmapscan);
+static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
+ TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
ItemPointerData from, Buffer newbuf, HeapTuple newtup,
bool all_visible_cleared, bool new_all_visible_cleared);
@@ -1866,55 +1869,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
Buffer vmbuffer = InvalidBuffer;
bool all_visible_cleared = false;
- if (relation->rd_rel->relhasoids)
- {
-#ifdef NOT_USED
- /* this is redundant with an Assert in HeapTupleSetOid */
- Assert(tup->t_data->t_infomask & HEAP_HASOID);
-#endif
-
- /*
- * If the object id of this tuple has already been assigned, trust the
- * caller. There are a couple of ways this can happen. At initial db
- * creation, the backend program sets oids for tuples. When we define
- * an index, we set the oid. Finally, in the future, we may allow
- * users to set their own object ids in order to support a persistent
- * object store (objects need to contain pointers to one another).
- */
- if (!OidIsValid(HeapTupleGetOid(tup)))
- HeapTupleSetOid(tup, GetNewOid(relation));
- }
- else
- {
- /* check there is not space for an OID */
- Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
- }
-
- tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
- tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
- tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
- HeapTupleHeaderSetXmin(tup->t_data, xid);
- HeapTupleHeaderSetCmin(tup->t_data, cid);
- HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
- tup->t_tableOid = RelationGetRelid(relation);
-
/*
- * If the new tuple is too big for storage or contains already toasted
- * out-of-line attributes from some other relation, invoke the toaster.
+ * Fill in tuple header fields, assign an OID, and toast the tuple if
+ * necessary.
*
* Note: below this point, heaptup is the data we actually intend to store
* into the relation; tup is the caller's original untoasted data.
*/
- if (relation->rd_rel->relkind != RELKIND_RELATION)
- {
- /* toast table entries should never be recursively toasted */
- Assert(!HeapTupleHasExternal(tup));
- heaptup = tup;
- }
- else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
- heaptup = toast_insert_or_update(relation, tup, NULL, options);
- else
- heaptup = tup;
+ heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
/*
* We're about to do the actual insert -- but check for conflict first,
@@ -2035,7 +1997,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
CacheInvalidateHeapTuple(relation, heaptup, NULL);
- pgstat_count_heap_insert(relation);
+ pgstat_count_heap_insert(relation, 1);
/*
* If heaptup is a private copy, release it. Don't forget to copy t_self
@@ -2051,6 +2013,285 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
}
/*
+ * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
+ * tuple header fields, assigns an OID, and toasts the tuple if necessary.
+ * Returns a toasted version of the tuple if it was toasted, or the original
+ * tuple if not. Note that in any case, the header fields are also set in
+ * the original tuple.
+ */
+static HeapTuple
+heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
+ CommandId cid, int options)
+{
+ if (relation->rd_rel->relhasoids)
+ {
+#ifdef NOT_USED
+ /* this is redundant with an Assert in HeapTupleSetOid */
+ Assert(tup->t_data->t_infomask & HEAP_HASOID);
+#endif
+
+ /*
+ * If the object id of this tuple has already been assigned, trust the
+ * caller. There are a couple of ways this can happen. At initial db
+ * creation, the backend program sets oids for tuples. When we define
+ * an index, we set the oid. Finally, in the future, we may allow
+ * users to set their own object ids in order to support a persistent
+ * object store (objects need to contain pointers to one another).
+ */
+ if (!OidIsValid(HeapTupleGetOid(tup)))
+ HeapTupleSetOid(tup, GetNewOid(relation));
+ }
+ else
+ {
+ /* check there is not space for an OID */
+ Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
+ }
+
+ tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+ tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
+ tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderSetXmin(tup->t_data, xid);
+ HeapTupleHeaderSetCmin(tup->t_data, cid);
+ HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
+ tup->t_tableOid = RelationGetRelid(relation);
+
+ /*
+ * If the new tuple is too big for storage or contains already toasted
+ * out-of-line attributes from some other relation, invoke the toaster.
+ */
+ if (relation->rd_rel->relkind != RELKIND_RELATION)
+ {
+ /* toast table entries should never be recursively toasted */
+ Assert(!HeapTupleHasExternal(tup));
+ return tup;
+ }
+ else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
+ return toast_insert_or_update(relation, tup, NULL, options);
+ else
+ return tup;
+}
+
+/*
+ * heap_multi_insert - insert multiple tuple into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+void
+heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ CommandId cid, int options, BulkInsertState bistate)
+{
+ TransactionId xid = GetCurrentTransactionId();
+ HeapTuple *heaptuples;
+ Buffer buffer;
+ Buffer vmbuffer = InvalidBuffer;
+ bool all_visible_cleared = false;
+ int i;
+ int ndone;
+ char *scratch = NULL;
+ Page page;
+ bool needwal;
+
+ needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+
+ /* Toast and set header data in all the tuples */
+ heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ for (i = 0; i < ntuples; i++)
+ heaptuples[i] = heap_prepare_insert(relation, tuples[i],
+ xid, cid, options);
+
+ /*
+ * Allocate some memory to use for constructing the WAL record. Using
+ * palloc() within a critical section is not safe, so we allocate this
+ * beforehand.
+ */
+ if (needwal)
+ scratch = palloc(BLCKSZ);
+
+ /*
+ * We're about to do the actual inserts -- but check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ *
+ * For a heap insert, we only need to check for table-level SSI locks.
+ * Our new tuple can't possibly conflict with existing tuple locks, and
+ * heap page locks are only consolidated versions of tuple locks; they do
+ * not lock "gaps" as index page locks do. So we don't need to identify
+ * a buffer before making the call.
+ */
+ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+ ndone = 0;
+ while (ndone < ntuples)
+ {
+ int nthispage;
+
+ /*
+ * Find buffer where at least the next tuple will fit. If the page
+ * is all-visible, this will also pin the requisite visibility map
+ * page.
+ */
+ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
+ InvalidBuffer, options, bistate,
+ &vmbuffer, NULL);
+ page = BufferGetPage(buffer);
+
+ if (PageIsAllVisible(page))
+ {
+ all_visible_cleared = true;
+ PageClearAllVisible(page);
+ visibilitymap_clear(relation,
+ BufferGetBlockNumber(buffer),
+ vmbuffer);
+ }
+
+ /* NO EREPORT(ERROR) from here till changes are logged */
+ START_CRIT_SECTION();
+
+ /* Put as many tuples as fit on this page */
+ for (nthispage = 0; ndone + nthispage < ntuples; nthispage++)
+ {
+ HeapTuple heaptup = heaptuples[ndone + nthispage];
+
+ if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len))
+ break;
+
+ RelationPutHeapTuple(relation, buffer, heaptup);
+ }
+
+ /*
+ * XXX Should we set PageSetPrunable on this page ? See heap_insert()
+ */
+
+ MarkBufferDirty(buffer);
+
+ /* XLOG stuff */
+ if (needwal)
+ {
+ XLogRecPtr recptr;
+ xl_heap_multi_insert *xlrec;
+ XLogRecData rdata[2];
+ uint8 info = XLOG_HEAP2_MULTI_INSERT;
+ char *tupledata;
+ int totaldatalen;
+ char *scratchptr = scratch;
+ bool init;
+
+ /*
+ * If the page was previously empty, we can reinit the page
+ * instead of restoring the whole thing.
+ */
+ init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
+
+ /* allocate xl_heap_multi_insert struct from the scratch area */
+ xlrec = (xl_heap_multi_insert *) scratchptr;
+ scratchptr += SizeOfHeapMultiInsert;
+
+ /*
+ * Allocate offsets array. Unless we're reinitializing the page,
+ * in that case the tuples are stored in order starting at
+ * FirstOffsetNumber and we don't need to store the offsets
+ * explicitly.
+ */
+ if (!init)
+ scratchptr += nthispage * sizeof(OffsetNumber);
+
+ /* the rest of the scratch space is used for tuple data */
+ tupledata = scratchptr;
+
+ xlrec->all_visible_cleared = all_visible_cleared;
+ xlrec->node = relation->rd_node;
+ xlrec->blkno = BufferGetBlockNumber(buffer);
+ xlrec->ntuples = nthispage;
+
+ /*
+ * Write out an xl_multi_insert_tuple and the tuple data itself
+ * for each tuple.
+ */
+ for (i = 0; i < nthispage; i++)
+ {
+ HeapTuple heaptup = heaptuples[ndone + i];
+ xl_multi_insert_tuple *tuphdr;
+ int datalen;
+
+ if (!init)
+ xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ /* xl_multi_insert_tuple needs two-byte alignment. */
+ tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
+ scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
+
+ tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
+ tuphdr->t_infomask = heaptup->t_data->t_infomask;
+ tuphdr->t_hoff = heaptup->t_data->t_hoff;
+
+ /* write bitmap [+ padding] [+ oid] + data */
+ datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ memcpy(scratchptr,
+ (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ datalen);
+ tuphdr->datalen = datalen;
+ scratchptr += datalen;
+ }
+ totaldatalen = scratchptr - tupledata;
+ Assert((scratchptr - scratch) < BLCKSZ);
+
+ rdata[0].data = (char *) xlrec;
+ rdata[0].len = tupledata - scratch;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].data = tupledata;
+ rdata[1].len = totaldatalen;
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
+
+ /*
+ * If we're going to reinitialize the whole page using the WAL
+ * record, hide buffer reference from XLogInsert.
+ */
+ if (init)
+ {
+ rdata[1].buffer = InvalidBuffer;
+ info |= XLOG_HEAP_INIT_PAGE;
+ }
+
+ recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(buffer);
+ if (vmbuffer != InvalidBuffer)
+ ReleaseBuffer(vmbuffer);
+
+ ndone += nthispage;
+ }
+
+ /*
+ * If tuples are cachable, mark them for invalidation from the caches in
+ * case we abort. Note it is OK to do this after releasing the buffer,
+ * because the heaptuples data structure is all in local memory, not in
+ * the shared buffer.
+ */
+ if (IsSystemRelation(relation))
+ {
+ for (i = 0; i < ntuples; i++)
+ CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
+ }
+
+ pgstat_count_heap_insert(relation, ntuples);
+}
+
+/*
* simple_heap_insert - insert a tuple
*
* Currently, this routine differs from heap_insert only in supplying
@@ -4776,6 +5017,144 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
}
/*
+ * Handles MULTI_INSERT record type.
+ */
+static void
+heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *recdata = XLogRecGetData(record);
+ xl_heap_multi_insert *xlrec;
+ Buffer buffer;
+ Page page;
+ struct
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ HeapTupleHeader htup;
+ uint32 newlen;
+ Size freespace;
+ BlockNumber blkno;
+ int i;
+ bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+
+ xlrec = (xl_heap_multi_insert *) recdata;
+ recdata += SizeOfHeapMultiInsert;
+
+ /*
+ * If we're reinitializing the page, the tuples are stored in order from
+ * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
+ * record.
+ */
+ if (!isinit)
+ recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+
+ blkno = xlrec->blkno;
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->node);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+ visibilitymap_clear(reln, blkno, vmbuffer);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ if (isinit)
+ {
+ buffer = XLogReadBuffer(xlrec->node, blkno, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ }
+ else
+ {
+ buffer = XLogReadBuffer(xlrec->node, blkno, false);
+ if (!BufferIsValid(buffer))
+ return;
+ page = (Page) BufferGetPage(buffer);
+
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockReleaseBuffer(buffer);
+ return;
+ }
+ }
+
+ for (i = 0; i < xlrec->ntuples; i++)
+ {
+ OffsetNumber offnum;
+ xl_multi_insert_tuple *xlhdr;
+
+ if (isinit)
+ offnum = FirstOffsetNumber + i;
+ else
+ offnum = xlrec->offsets[i];
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+ recdata += SizeOfMultiInsertTuple;
+
+ newlen = xlhdr->datalen;
+ Assert(newlen <= MaxHeapTupleSize);
+ htup = &tbuf.hdr;
+ MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+ /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+ (char *) recdata,
+ newlen);
+ recdata += newlen;
+
+ newlen += offsetof(HeapTupleHeaderData, t_bits);
+ htup->t_infomask2 = xlhdr->t_infomask2;
+ htup->t_infomask = xlhdr->t_infomask;
+ htup->t_hoff = xlhdr->t_hoff;
+ HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+ ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+ }
+
+ freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ if (xlrec->all_visible_cleared)
+ PageClearAllVisible(page);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * If the page is running low on free space, update the FSM as well.
+ * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ * better than that without knowing the fill-factor for the table.
+ *
+ * XXX: We don't get here if the page was restored from full page image.
+ * We don't bother to update the FSM in that case, it doesn't need to be
+ * totally accurate anyway.
+ */
+ if (freespace < BLCKSZ / 5)
+ XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+}
+
+/*
* Handles UPDATE and HOT_UPDATE
*/
static void
@@ -5164,6 +5543,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_HEAP2_VISIBLE:
heap_xlog_visible(lsn, record);
break;
+ case XLOG_HEAP2_MULTI_INSERT:
+ heap_xlog_multi_insert(lsn, record);
+ break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}
@@ -5301,6 +5683,18 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->block);
}
+ else if (info == XLOG_HEAP2_MULTI_INSERT)
+ {
+ xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec;
+
+ if (xl_info & XLOG_HEAP_INIT_PAGE)
+ appendStringInfo(buf, "multi-insert (init): ");
+ else
+ appendStringInfo(buf, "multi-insert: ");
+ appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples",
+ xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+ xlrec->blkno, xlrec->ntuples);
+ }
else
appendStringInfo(buf, "UNKNOWN");
}