aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c103
1 files changed, 82 insertions, 21 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4cb27f22244..7a8bf760791 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
/*
* ---------------------------------------
@@ -486,6 +486,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
pfree(change->data.msg.message);
change->data.msg.message = NULL;
break;
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ if (change->data.inval.invalidations)
+ pfree(change->data.inval.invalidations);
+ change->data.inval.invalidations = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
if (change->data.snapshot)
{
@@ -2194,6 +2199,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferApplyMessage(rb, txn, change, streaming);
break;
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ /* Execute the invalidation messages locally */
+ ReorderBufferExecuteInvalidations(
+ change->data.inval.ninvalidations,
+ change->data.inval.invalidations);
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
@@ -2244,13 +2256,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
TeardownHistoricSnapshot(false);
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
-
- /*
- * Every time the CommandId is incremented, we could
- * see new catalog contents, so execute all
- * invalidations.
- */
- ReorderBufferExecuteInvalidations(rb, txn);
}
break;
@@ -2317,7 +2322,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2356,7 +2361,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
@@ -2813,10 +2819,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
* Setup the invalidation of the toplevel transaction.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
- * accumulates all the invalidation messages in the toplevel transaction.
- * This is required because in some cases where we skip processing the
- * transaction (see ReorderBufferForget), we need to execute all the
- * invalidations together.
+ * accumulates all the invalidation messages in the toplevel transaction as
+ * well as in the form of change in reorder buffer. We require to record it in
+ * form of the change so that we can execute only the required invalidations
+ * instead of executing all the invalidations on each CommandId increment. We
+ * also need to accumulate these in the toplevel transaction because in some
+ * cases we skip processing the transaction (see ReorderBufferForget), we need
+ * to execute all the invalidations together.
*/
void
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2824,12 +2833,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
SharedInvalidationMessage *msgs)
{
ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
/*
- * We collect all the invalidations under the top transaction so that we
- * can execute them all together.
+ * Collect all the invalidations under the top transaction so that we can
+ * execute them all together. See comment atop this function
*/
if (txn->toptxn)
txn = txn->toptxn;
@@ -2841,8 +2854,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
{
txn->ninvalidations = nmsgs;
txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
memcpy(txn->invalidations, msgs,
sizeof(SharedInvalidationMessage) * nmsgs);
}
@@ -2856,6 +2868,18 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
nmsgs * sizeof(SharedInvalidationMessage));
txn->ninvalidations += nmsgs;
}
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
}
/*
@@ -2863,12 +2887,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
* in the changestream but we don't know which those are.
*/
static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
{
int i;
- for (i = 0; i < txn->ninvalidations; i++)
- LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+ for (i = 0; i < nmsgs; i++)
+ LocalExecuteInvalidationMessage(&msgs[i]);
}
/*
@@ -3303,6 +3327,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ char *data;
+ Size inval_size = sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+
+ sz += inval_size;
+
+ ReorderBufferSerializeReserve(rb, sz);
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+ memcpy(data, change->data.inval.invalidations, inval_size);
+ data += inval_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
@@ -3578,6 +3620,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ sz += sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
@@ -3846,6 +3894,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ Size inval_size = sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+
+ change->data.inval.invalidations =
+ MemoryContextAlloc(rb->context, inval_size);
+
+ /* read the message */
+ memcpy(change->data.inval.invalidations, data, inval_size);
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot oldsnap;