diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 103 |
1 files changed, 82 insertions, 21 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4cb27f22244..7a8bf760791 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); -static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); /* * --------------------------------------- @@ -486,6 +486,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + if (change->data.inval.invalidations) + pfree(change->data.inval.invalidations); + change->data.inval.invalidations = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -2194,6 +2199,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferApplyMessage(rb, txn, change, streaming); break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* Execute the invalidation messages locally */ + ReorderBufferExecuteInvalidations( + change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: /* get rid of the old */ TeardownHistoricSnapshot(false); @@ -2244,13 +2256,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, TeardownHistoricSnapshot(false); SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); - - /* - * Every time the CommandId is incremented, we could - * see new catalog contents, so execute all - * invalidations. - */ - ReorderBufferExecuteInvalidations(rb, txn); } break; @@ -2317,7 +2322,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2356,7 +2361,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2813,10 +2819,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, * Setup the invalidation of the toplevel transaction. * * This needs to be called for each XLOG_XACT_INVALIDATIONS message and - * accumulates all the invalidation messages in the toplevel transaction. - * This is required because in some cases where we skip processing the - * transaction (see ReorderBufferForget), we need to execute all the - * invalidations together. + * accumulates all the invalidation messages in the toplevel transaction as + * well as in the form of change in reorder buffer. We require to record it in + * form of the change so that we can execute only the required invalidations + * instead of executing all the invalidations on each CommandId increment. We + * also need to accumulate these in the toplevel transaction because in some + * cases we skip processing the transaction (see ReorderBufferForget), we need + * to execute all the invalidations together. */ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, @@ -2824,12 +2833,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage *msgs) { ReorderBufferTXN *txn; + MemoryContext oldcontext; + ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + oldcontext = MemoryContextSwitchTo(rb->context); + /* - * We collect all the invalidations under the top transaction so that we - * can execute them all together. + * Collect all the invalidations under the top transaction so that we can + * execute them all together. See comment atop this function */ if (txn->toptxn) txn = txn->toptxn; @@ -2841,8 +2854,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { txn->ninvalidations = nmsgs; txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); + palloc(sizeof(SharedInvalidationMessage) * nmsgs); memcpy(txn->invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); } @@ -2856,6 +2868,18 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, nmsgs * sizeof(SharedInvalidationMessage)); txn->ninvalidations += nmsgs; } + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); } /* @@ -2863,12 +2887,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * in the changestream but we don't know which those are. */ static void -ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) { int i; - for (i = 0; i < txn->ninvalidations; i++) - LocalExecuteInvalidationMessage(&txn->invalidations[i]); + for (i = 0; i < nmsgs; i++) + LocalExecuteInvalidationMessage(&msgs[i]); } /* @@ -3303,6 +3327,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + char *data; + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + sz += inval_size; + + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(data, change->data.inval.invalidations, inval_size); + data += inval_size; + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -3578,6 +3620,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + sz += sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -3846,6 +3894,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + change->data.inval.invalidations = + MemoryContextAlloc(rb->context, inval_size); + + /* read the message */ + memcpy(change->data.inval.invalidations, data, inval_size); + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot oldsnap; |