diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 304 |
1 files changed, 217 insertions, 87 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1447a513b1..5f4aa071310 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); +static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); @@ -271,6 +273,7 @@ ReorderBufferAllocate(void) buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); + dlist_init(&buffer->txns_by_base_snapshot_lsn); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool found; Assert(TransactionIdIsValid(xid)); - Assert(!create || lsn != InvalidXLogRecPtr); /* * Check the one-entry lookup cache first @@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); + Assert(lsn != InvalidXLogRecPtr); ent->txn = ReorderBufferGetTXN(rb); ent->txn->xid = xid; @@ -607,43 +610,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } - +/* + * AssertTXNLsnOrder + * Verify LSN ordering of transaction lists in the reorderbuffer + * + * Other LSN-related invariants are checked too. + * + * No-op if assertions are not in use. + */ static void AssertTXNLsnOrder(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; + XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; dlist_foreach(iter, &rb->toplevel_by_lsn) { - ReorderBufferTXN *cur_txn; + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, + iter.cur); - cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur); + /* start LSN must be set */ Assert(cur_txn->first_lsn != InvalidXLogRecPtr); + /* If there is an end LSN, it must be higher than start LSN */ if (cur_txn->end_lsn != InvalidXLogRecPtr) Assert(cur_txn->first_lsn <= cur_txn->end_lsn); + /* Current initial LSN must be strictly higher than previous */ if (prev_first_lsn != InvalidXLogRecPtr) Assert(prev_first_lsn < cur_txn->first_lsn); + /* known-as-subtxn txns must not be listed */ Assert(!cur_txn->is_known_as_subxact); + prev_first_lsn = cur_txn->first_lsn; } + + dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) + { + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, + base_snapshot_node, + iter.cur); + + /* base snapshot (and its LSN) must be set */ + Assert(cur_txn->base_snapshot != NULL); + Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + + /* current LSN must be strictly higher than previous */ + if (prev_base_snap_lsn != InvalidXLogRecPtr) + Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); + + /* known-as-subtxn txns must not be listed */ + Assert(!cur_txn->is_known_as_subxact); + + prev_base_snap_lsn = cur_txn->base_snapshot_lsn; + } #endif } +/* + * ReorderBufferGetOldestTXN + * Return oldest transaction in reorderbuffer + */ ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb) { ReorderBufferTXN *txn; + AssertTXNLsnOrder(rb); + if (dlist_is_empty(&rb->toplevel_by_lsn)) return NULL; - AssertTXNLsnOrder(rb); - txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); Assert(!txn->is_known_as_subxact); @@ -651,12 +691,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) return txn; } +/* + * ReorderBufferGetOldestXmin + * Return oldest Xmin in reorderbuffer + * + * Returns oldest possibly running Xid from the point of view of snapshots + * used in the transactions kept by reorderbuffer, or InvalidTransactionId if + * there are none. + * + * Since snapshots are assigned monotonically, this equals the Xmin of the + * base snapshot with minimal base_snapshot_lsn. + */ +TransactionId +ReorderBufferGetOldestXmin(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + AssertTXNLsnOrder(rb); + + if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn)) + return InvalidTransactionId; + + txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, + &rb->txns_by_base_snapshot_lsn); + return txn->base_snapshot->xmin; +} + void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) { rb->current_restart_decoding_lsn = ptr; } +/* + * ReorderBufferAssignChild + * + * Make note that we know that subxid is a subtransaction of xid, seen as of + * the given lsn. + */ void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn) @@ -669,32 +741,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); - if (new_sub) + if (new_top && !new_sub) + elog(ERROR, "subtransaction logged without previous top-level txn record"); + + if (!new_sub) { - /* - * we assign subtransactions to top level transaction even if we don't - * have data for it yet, assignment records frequently reference xids - * that have not yet produced any records. Knowing those aren't top - * level xids allows us to make processing cheaper in some places. - */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; + if (subtxn->is_known_as_subxact) + { + /* already associated, nothing to do */ + return; + } + else + { + /* + * We already saw this transaction, but initially added it to the list + * of top-level txns. Now that we know it's not top-level, remove + * it from there. + */ + dlist_delete(&subtxn->node); + } } - else if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); + subtxn->is_known_as_subxact = true; + subtxn->toplevel_xid = xid; + Assert(subtxn->nsubtxns == 0); - /* add to toplevel transaction */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } - else if (new_top) + /* add to subtransaction list */ + dlist_push_tail(&txn->subtxns, &subtxn->node); + txn->nsubtxns++; + + /* Possibly transfer the subtxn's snapshot to its top-level txn. */ + ReorderBufferTransferSnapToParent(txn, subtxn); + + /* Verify LSN-ordering invariant */ + AssertTXNLsnOrder(rb); +} + +/* + * ReorderBufferTransferSnapToParent + * Transfer base snapshot from subtxn to top-level txn, if needed + * + * This is done if the top-level txn doesn't have a base snapshot, or if the + * subtxn's base snapshot has an earlier LSN than the top-level txn's base + * snapshot's LSN. This can happen if there are no changes in the toplevel + * txn but there are some in the subtxn, or the first change in subtxn has + * earlier LSN than first change in the top-level txn and we learned about + * their kinship only now. + * + * The subtransaction's snapshot is cleared regardless of the transfer + * happening, since it's not needed anymore in either case. + * + * We do this as soon as we become aware of their kinship, to avoid queueing + * extra snapshots to txns known-as-subtxns -- only top-level txns will + * receive further snapshots. + */ +static void +ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn) +{ + Assert(subtxn->toplevel_xid == txn->xid); + + if (subtxn->base_snapshot != NULL) { - elog(ERROR, "existing subxact assigned to unknown toplevel xact"); + if (txn->base_snapshot == NULL || + subtxn->base_snapshot_lsn < txn->base_snapshot_lsn) + { + /* + * If the toplevel transaction already has a base snapshot but + * it's newer than the subxact's, purge it. + */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + /* + * The snapshot is now the top transaction's; transfer it, and + * adjust the list position of the top transaction in the list by + * moving it to where the subtransaction is. + */ + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + dlist_insert_before(&subtxn->base_snapshot_node, + &txn->base_snapshot_node); + + /* + * The subtransaction doesn't have a snapshot anymore (so it + * mustn't be in the list.) + */ + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&subtxn->base_snapshot_node); + } + else + { + /* Base snap of toplevel is fine, so subxact's is not needed */ + SnapBuildSnapDecRefcount(subtxn->base_snapshot); + dlist_delete(&subtxn->base_snapshot_node); + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + } } } @@ -707,7 +854,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn) { - ReorderBufferTXN *txn; ReorderBufferTXN *subtxn; subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, @@ -719,42 +865,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, if (!subtxn) return; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true); - - if (txn == NULL) - elog(ERROR, "subxact logged without previous toplevel record"); - - /* - * Pass our base snapshot to the parent transaction if it doesn't have - * one, or ours is older. That can happen if there are no changes in the - * toplevel transaction but in one of the child transactions. This allows - * the parent to simply use its base snapshot initially. - */ - if (subtxn->base_snapshot != NULL && - (txn->base_snapshot == NULL || - txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) - { - txn->base_snapshot = subtxn->base_snapshot; - txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; - subtxn->base_snapshot = NULL; - subtxn->base_snapshot_lsn = InvalidXLogRecPtr; - } - subtxn->final_lsn = commit_lsn; subtxn->end_lsn = end_lsn; - if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); - - /* add to subtransaction list */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } + /* + * Assign this subxact as a child of the toplevel xact (no-op if already + * done.) + */ + ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); } @@ -1078,11 +1196,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferReturnChange(rb, change); } + /* + * Cleanup the base snapshot, if set. + */ if (txn->base_snapshot != NULL) { SnapBuildSnapDecRefcount(txn->base_snapshot); - txn->base_snapshot = NULL; - txn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&txn->base_snapshot_node); } /* @@ -1257,17 +1377,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * Perform the replay of a transaction and it's non-aborted subtransactions. + * Perform the replay of a transaction and its non-aborted subtransactions. * * Subtransactions previously have to be processed by * ReorderBufferCommitChild(), even if previously assigned to the toplevel * transaction with ReorderBufferAssignChild. * - * We currently can only decode a transaction's contents in when their commit - * record is read because that's currently the only place where we know about - * cache invalidations. Thus, once a toplevel commit is read, we iterate over - * the top and subtransactions (using a k-way merge) and replay the changes in - * lsn order. + * We currently can only decode a transaction's contents when its commit + * record is read because that's the only place where we know about cache + * invalidations. Thus, once a toplevel commit is read, we iterate over the top + * and subtransactions (using a k-way merge) and replay the changes in lsn + * order. */ void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, @@ -1295,10 +1415,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, txn->origin_lsn = origin_lsn; /* - * If this transaction didn't have any real changes in our database, it's - * OK not to have a snapshot. Note that ReorderBufferCommitChild will have - * transferred its snapshot to this transaction if it had one and the - * toplevel tx didn't. + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. */ if (txn->base_snapshot == NULL) { @@ -1861,12 +1981,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, } /* - * Setup the base snapshot of a transaction. The base snapshot is the snapshot - * that is used to decode all changes until either this transaction modifies - * the catalog or another catalog modifying transaction commits. + * Set up the transaction's base snapshot. * - * Needs to be called before any changes are added with - * ReorderBufferQueueChange(). + * If we know that xid is a subtransaction, set the base snapshot on the + * top-level transaction instead. */ void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, @@ -1875,12 +1993,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn; bool is_new; + AssertArg(snap != NULL); + + /* + * Fetch the transaction to operate on. If we know it's a subtransaction, + * operate on its top-level transaction instead. + */ txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); + if (txn->is_known_as_subxact) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); Assert(txn->base_snapshot == NULL); - Assert(snap != NULL); txn->base_snapshot = snap; txn->base_snapshot_lsn = lsn; + dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node); + + AssertTXNLsnOrder(rb); } /* @@ -1999,25 +2128,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) } /* - * Have we already added the first snapshot? + * ReorderBufferXidHasBaseSnapshot + * Have we already set the base snapshot for the given txn/subtxn? */ bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) { ReorderBufferTXN *txn; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); + txn = ReorderBufferTXNByXid(rb, xid, false, + NULL, InvalidXLogRecPtr, false); /* transaction isn't known yet, ergo no snapshot */ if (txn == NULL) return false; - /* - * TODO: It would be a nice improvement if we would check the toplevel - * transaction in subtransactions, but we'd need to keep track of a bit - * more state. - */ + /* a known subtxn? operate on top-level txn instead */ + if (txn->is_known_as_subxact) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + return txn->base_snapshot != NULL; } |