diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/decode.c | 14 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 49 |
2 files changed, 43 insertions, 20 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 70670169acc..453efc51e16 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -1040,19 +1040,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f96029f15a4..19e96f3fd94 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -443,6 +443,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -520,6 +523,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -2211,8 +2215,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -2254,6 +2258,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. + * + * Note that we get the spec abort change for each toast + * entry but we need to perform the cleanup only the first + * time we get it for the main table. + */ + if (specinsert != NULL) + { + /* + * We must clean the toast hash before processing a + * completely new tuple to avoid confusion about the + * previous tuple's toast chunks. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* We don't need this record anymore. */ + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -2360,16 +2390,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } - /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. - */ - if (specinsert) - { - ReorderBufferReturnChange(rb, specinsert, true); - specinsert = NULL; - } + /* speculative insertion record must be freed by now */ + Assert(!specinsert); /* clean up the iterator */ ReorderBufferIterTXNFinish(rb, iterstate); @@ -3754,6 +3776,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -4017,6 +4040,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -4315,6 +4339,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; |