diff options
author | Robert Haas <rhaas@postgresql.org> | 2013-12-10 18:33:45 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2013-12-10 19:01:40 -0500 |
commit | e55704d8b2fe522fbc9435acbb5bc59033478bd5 (patch) | |
tree | 720602fc06bf251eb65dd7c4628d047027122ac8 /src/backend/access/heap/heapam.c | |
parent | 9ec6199d18d6235cc4b4d5e4e8986e73b55b14d8 (diff) | |
download | postgresql-e55704d8b2fe522fbc9435acbb5bc59033478bd5.tar.gz postgresql-e55704d8b2fe522fbc9435acbb5bc59033478bd5.zip |
Add new wal_level, logical, sufficient for logical decoding.
When wal_level=logical, we'll log columns from the old tuple as
configured by the REPLICA IDENTITY facility added in commit
07cacba983ef79be4a84fcd0e0ca3b5fcb85dd65. This makes it possible
a properly-configured logical replication solution to correctly
follow table updates even if they change the chosen key columns,
or, with REPLICA IDENTITY FULL, even if the table has no key at
all. Note that updates which do not modify the replica identity
column won't log anything extra, making the choice of a good key
(i.e. one that will rarely be changed) important to performance
when wal_level=logical is configured.
Each insert, update, or delete to a catalog table will also log
the CMIN and/or CMAX values of stamped by the current transaction.
This is necessary because logical decoding will require access to
historical snapshots of the catalog in order to decode some data
types, and the CMIN/CMAX values that we may need in order to judge
row visibility may have been overwritten by the time we need them.
Andres Freund, reviewed in various versions by myself, Heikki
Linnakangas, KONDO Mitsumasa, and many others.
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 603 |
1 files changed, 500 insertions, 103 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2035a2158f1..249fffeb061 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, - HeapTuple newtup, bool all_visible_cleared, - bool new_all_visible_cleared); + HeapTuple newtup, HeapTuple old_key_tup, + bool all_visible_cleared, bool new_all_visible_cleared); static void HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, - bool *satisfies_hot, bool *satisfies_key, - HeapTuple oldtup, HeapTuple newtup); + Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, + bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, + HeapTuple oldtup, HeapTuple newtup); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status, int *remaining, uint16 infomask); +static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); +static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, + bool *copy); /* @@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + bool need_tuple_data; + + /* + * For logical decoding, we need the tuple even if we're doing a + * full page write, so make sure to log it separately. (XXX We could + * alternatively store a pointer into the FPW). + * + * Also, if this is a catalog, we need to transmit combocids to + * properly decode, so log that as well. + */ + need_tuple_data = RelationIsLogicallyLogged(relation); + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, heaptup); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; @@ -2126,18 +2144,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[2].buffer = buffer; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if we're + * doing logical decoding, so that an eventual FPW doesn't + * remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[2].next = &(rdata[3]); + + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; + + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If this is the single and first tuple on page, we can reinit the * page instead of restoring the whole thing. Set flag, and hide * buffer references from XLogInsert. @@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Page page; bool needwal; Size saveFreeSpace; + bool need_tuple_data = RelationIsLogicallyLogged(relation); + bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); saveFreeSpace = RelationGetTargetPageFreeSpace(relation, @@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, { XLogRecPtr recptr; xl_heap_multi_insert *xlrec; - XLogRecData rdata[2]; + XLogRecData rdata[3]; uint8 info = XLOG_HEAP2_MULTI_INSERT; char *tupledata; int totaldatalen; @@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->all_visible_cleared = all_visible_cleared; + xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec->node = relation->rd_node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntuples = nthispage; @@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, datalen); tuphdr->datalen = datalen; scratchptr += datalen; + + /* + * We don't use heap_multi_insert for catalog tuples yet, but + * better be prepared... + */ + if (need_cids) + log_heap_new_cid(relation, heaptup); } totaldatalen = scratchptr - tupledata; Assert((scratchptr - scratch) < BLCKSZ); @@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, rdata[1].data = tupledata; rdata[1].len = totaldatalen; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if + * we're doing logical decoding, so that an eventual FPW + * doesn't remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[1].next = &(rdata[2]); + + rdata[2].data = NULL; + rdata[2].len = 0; + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = NULL; + xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If we're going to reinitialize the whole page using the WAL * record, hide buffer reference from XLogInsert. */ if (init) { - rdata[1].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = InvalidBuffer; info |= XLOG_HEAP_INIT_PAGE; } @@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid, bool have_tuple_lock = false; bool iscombo; bool all_visible_cleared = false; + HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ + bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); @@ -2734,6 +2798,12 @@ l1: /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + START_CRIT_SECTION(); /* @@ -2786,9 +2856,13 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; + + /* For logical decode we need combocids to properly decode the catalog */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, &tp); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.target.node = relation->rd_node; @@ -2805,6 +2879,37 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_heap_header xlhdr; + + xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + rdata[1].next = &(rdata[2]); + rdata[2].data = (char*)&xlhdr; + rdata[2].len = SizeOfHeapHeader; + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + + rdata[2].next = &(rdata[3]); + rdata[3].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].buffer = InvalidBuffer; + rdata[3].next = NULL; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr); @@ -2850,6 +2955,9 @@ l1: pgstat_count_heap_delete(relation); + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + return HeapTupleMayBeUpdated; } @@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *key_attrs; + Bitmapset *id_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; + HeapTuple old_key_tuple = NULL; + bool old_key_copied = false; Page page; BlockNumber block; MultiXactStatus mxact_status; @@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, bool iscombo; bool satisfies_hot; bool satisfies_key; + bool satisfies_id; bool use_hot_update = false; bool key_intact; bool all_visible_cleared = false; @@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * Note that we get a copy here, so we need not worry about relcache flush * happening midway through. */ - hot_attrs = RelationGetIndexAttrBitmap(relation, false); - key_attrs = RelationGetIndexAttrBitmap(relation, true); + hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL); + key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY); + id_attrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); block = ItemPointerGetBlockNumber(otid); buffer = ReadBuffer(relation, block); @@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * is updates that don't manipulate key columns, not those that * serendipitiously arrive at the same key values. */ - HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, + HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs, &satisfies_hot, &satisfies_key, - &oldtup, newtup); + &satisfies_id, &oldtup, newtup); if (satisfies_key) { *lockmode = LockTupleNoKeyExclusive; @@ -3514,6 +3628,14 @@ l2: PageSetFull(page); } + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + * ExtractReplicaIdentity() will return NULL if nothing needs to be + * logged. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -3589,11 +3711,23 @@ l2: /* XLOG stuff */ if (RelationNeedsWAL(relation)) { - XLogRecPtr recptr = log_heap_update(relation, buffer, - newbuf, &oldtup, heaptup, - all_visible_cleared, - all_visible_cleared_new); + XLogRecPtr recptr; + /* + * For logical decoding we need combocids to properly decode the + * catalog. + */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + { + log_heap_new_cid(relation, &oldtup); + log_heap_new_cid(relation, heaptup); + } + + recptr = log_heap_update(relation, buffer, + newbuf, &oldtup, heaptup, + old_key_tuple, + all_visible_cleared, + all_visible_cleared_new); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -3644,6 +3778,9 @@ l2: heap_freetuple(heaptup); } + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + bms_free(hot_attrs); bms_free(key_attrs); @@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, /* * Check which columns are being updated. * - * This simultaneously checks conditions for HOT updates and for FOR KEY - * SHARE updates. Since much of the time they will be checking very similar - * sets of columns, and doing the same tests on them, it makes sense to - * optimize and do them together. + * This simultaneously checks conditions for HOT updates, for FOR KEY + * SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they + * will be checking very similar sets of columns, and doing the same tests on + * them, it makes sense to optimize and do them together. * - * We receive two bitmapsets comprising the two sets of columns we're + * We receive three bitmapsets comprising the three sets of columns we're * interested in. Note these are destructively modified; that is OK since * this is invoked at most once in heap_update. * * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not * modified indexed columns); key_result is set to TRUE if the update does not - * modify columns used in the key. + * modify columns used in the key; id_result is set to TRUE if the update does + * not modify columns in any index marked as the REPLICA IDENTITY. */ static void -HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, +HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, HeapTuple oldtup, HeapTuple newtup) { int next_hot_attnum; int next_key_attnum; + int next_id_attnum; bool hot_result = true; bool key_result = true; - bool key_done = false; - bool hot_done = false; + bool id_result = true; - next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + /* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */ + Assert(bms_is_subset(id_attrs, key_attrs)); + Assert(bms_is_subset(key_attrs, hot_attrs)); + /* + * If one of these sets contains no remaining bits, bms_first_member will + * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which + * is negative!) we'll get an attribute number that can't possibly be + * real, and thus won't match any actual attribute number. + */ + next_hot_attnum = bms_first_member(hot_attrs); + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; for (;;) { - int check_now; bool changed; + int check_now; - /* both bitmapsets are now empty */ - if (key_done && hot_done) - break; - - /* XXX there's probably an easier way ... */ - if (hot_done) - check_now = next_key_attnum; - if (key_done) + /* + * Since the HOT attributes are a superset of the key attributes and + * the key attributes are a superset of the id attributes, this logic + * is guaranteed to identify the next column that needs to be + * checked. + */ + if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber) check_now = next_hot_attnum; + else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_key_attnum; + else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_id_attnum; else - check_now = Min(next_hot_attnum, next_key_attnum); + break; + /* See whether it changed. */ changed = !heap_tuple_attr_equals(RelationGetDescr(relation), check_now, oldtup, newtup); if (changed) @@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation, hot_result = false; if (check_now == next_key_attnum) key_result = false; - } + if (check_now == next_id_attnum) + id_result = false; - /* if both are false now, we can stop checking */ - if (!hot_result && !key_result) - break; + /* if all are false now, we can stop checking */ + if (!hot_result && !key_result && !id_result) + break; + } - if (check_now == next_hot_attnum) + /* + * Advance the next attribute numbers for the sets that contain + * the attribute we just checked. As we work our way through the + * columns, the next_attnum values will rise; but when each set + * becomes empty, bms_first_member() will return -1 and the attribute + * number will end up with a value less than + * FirstLowInvalidHeapAttributeNumber. + */ + if (hot_result && check_now == next_hot_attnum) { next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; } - if (check_now == next_key_attnum) + if (key_result && check_now == next_key_attnum) { next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + } + if (id_result && check_now == next_id_attnum) + { + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; } } *satisfies_hot = hot_result; *satisfies_key = key_result; + *satisfies_id = id_result; } /* @@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, + HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; + xl_heap_header_len xlhdr_idx; uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[7]; Page page = BufferGetPage(newbuf); + bool need_tuple_data = RelationIsLogicallyLogged(reln); /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, oldtup->t_data->t_infomask2); xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; xlrec.newtid = newtup->t_self; - xlrec.new_all_visible_cleared = new_all_visible_cleared; + if (new_all_visible_cleared) + xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; @@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf, rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); - xlhdr.t_infomask2 = newtup->t_data->t_infomask2; - xlhdr.t_infomask = newtup->t_data->t_infomask; - xlhdr.t_hoff = newtup->t_data->t_hoff; + xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; + xlhdr.header.t_infomask = newtup->t_data->t_infomask; + xlhdr.header.t_hoff = newtup->t_data->t_hoff; + xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); /* - * As with insert records, we need not store the rdata[2] segment if we - * decide to store the whole buffer instead. + * As with insert records, we need not store the rdata[2] segment + * if we decide to store the whole buffer instead unless we're + * doing logical decoding. */ rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = newbuf; + rdata[2].len = SizeOfHeapHeaderLen; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next = &(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].data = (char *) newtup->t_data + + offsetof(HeapTupleHeaderData, t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = newbuf; + rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL; + /* + * Separate storage for the FPW buffer reference of the new page in the + * wal_level >= logical case. + */ + if (need_tuple_data) + { + rdata[3].next = &(rdata[4]); + + rdata[4].data = NULL, + rdata[4].len = 0; + rdata[4].buffer = newbuf; + rdata[4].buffer_std = true; + rdata[4].next = NULL; + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + + /* We need to log a tuple identity */ + if (old_key_tuple) + { + /* don't really need this, but its more comfy to decode */ + xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr_idx.t_len = old_key_tuple->t_len; + + rdata[4].next = &(rdata[5]); + rdata[5].data = (char *) &xlhdr_idx; + rdata[5].len = SizeOfHeapHeaderLen; + rdata[5].buffer = InvalidBuffer; + rdata[5].next = &(rdata[6]); + + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + rdata[6].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[6].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[6].buffer = InvalidBuffer; + rdata[6].next = NULL; + + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + } + /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { + XLogRecData *rcur = &rdata[2]; info |= XLOG_HEAP_INIT_PAGE; - rdata[2].buffer = rdata[3].buffer = InvalidBuffer; + while (rcur != NULL) + { + rcur->buffer = InvalidBuffer; + rcur = rcur->next; + } } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -6340,6 +6553,184 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* + * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record + * + * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog + * tuples. + */ +static XLogRecPtr +log_heap_new_cid(Relation relation, HeapTuple tup) +{ + xl_heap_new_cid xlrec; + + XLogRecPtr recptr; + XLogRecData rdata[1]; + HeapTupleHeader hdr = tup->t_data; + + Assert(ItemPointerIsValid(&tup->t_self)); + Assert(tup->t_tableOid != InvalidOid); + + xlrec.top_xid = GetTopTransactionId(); + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tup->t_self; + + /* + * If the tuple got inserted & deleted in the same TX we definitely have a + * combocid, set cmin and cmax. + */ + if (hdr->t_infomask & HEAP_COMBOCID) + { + Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID)); + Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID)); + xlrec.cmin = HeapTupleHeaderGetCmin(hdr); + xlrec.cmax = HeapTupleHeaderGetCmax(hdr); + xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr); + } + /* No combocid, so only cmin or cmax can be set by this TX */ + else + { + /* + * Tuple inserted. + * + * We need to check for LOCK ONLY because multixacts might be + * transferred to the new tuple in case of FOR KEY SHARE updates in + * which case there will be a xmax, although the tuple just got + * inserted. + */ + if (hdr->t_infomask & HEAP_XMAX_INVALID || + HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask)) + { + xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr); + xlrec.cmax = InvalidCommandId; + } + /* Tuple from a different tx updated or deleted. */ + else + { + xlrec.cmin = InvalidCommandId; + xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr); + + } + xlrec.combocid = InvalidCommandId; + } + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapNewCid; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata); + + return recptr; +} + +/* + * Build a heap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log a identity or if there's no suitable + * key in the Relation relation. + */ +static HeapTuple +ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Relation idx_rel; + TupleDesc idx_desc; + char replident = relation->rd_rel->relreplident; + HeapTuple key_tuple = NULL; + bool copy_oid = false; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (HeapTupleHasExternal(tp)) + { + *copy = true; + tp = toast_flatten_tuple(tp, RelationGetDescr(relation)); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* needs to already have been fetched? */ + if (relation->rd_indexvalid == 0) + RelationGetIndexList(relation); + + if (!OidIsValid(relation->rd_replidindex)) + { + elog(DEBUG4, "Could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(relation->rd_replidindex); + idx_desc = RelationGetDescr(idx_rel); + + /* deform tuple, so we have fast access to columns */ + heap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < idx_desc->natts; natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno == ObjectIdAttributeNumber) + copy_oid = true; + else if (attno < 0) + elog(ERROR, "system column in index"); + else + nulls[attno - 1] = false; + } + + key_tuple = heap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* XXX: we could also do this unconditionally, the space is used anyway */ + if (copy_oid) + HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp)); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (HeapTupleHasExternal(key_tuple)) + { + HeapTuple oldtup = key_tuple; + key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation)); + heap_freetuple(oldtup); + } + + return key_tuple; +} + +/* * Handles CLEANUP_INFO */ static void @@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->node); Buffer vmbuffer = InvalidBuffer; @@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; int hsize; uint32 newlen; Size freespace; @@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); @@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* @@ -7164,7 +7555,7 @@ newt:; * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); @@ -7222,13 +7613,13 @@ newsame:; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate + SizeOfHeapHeader; + hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen; - newlen = record->xl_len - hsize; - Assert(newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, - SizeOfHeapHeader); + SizeOfHeapHeaderLen); + newlen = xlhdr.t_len; + Assert(newlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ @@ -7236,9 +7627,9 @@ newsame:; (char *) xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; + htup->t_infomask2 = xlhdr.header.t_infomask2; + htup->t_infomask = xlhdr.header.t_infomask; + htup->t_hoff = xlhdr.header.t_hoff; HeapTupleHeaderSetXmin(htup, record->xl_xid); HeapTupleHeaderSetCmin(htup, FirstCommandId); @@ -7250,7 +7641,7 @@ newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(lsn, record); break; + case XLOG_HEAP2_NEW_CID: + /* + * Nothing to do on a real replay, only used during logical + * decoding. + */ + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } |