diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 49 |
1 files changed, 37 insertions, 12 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1351b33011f..b904697f18f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -397,6 +397,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -467,6 +470,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1677,8 +1681,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1720,6 +1724,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. + * + * Note that we get the spec abort change for each toast + * entry but we need to perform the cleanup only the first + * time we get it for the main table. + */ + if (specinsert != NULL) + { + /* + * We must clean the toast hash before processing a + * completely new tuple to avoid confusion about the + * previous tuple's toast chunks. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* We don't need this record anymore. */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -1827,16 +1857,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } - /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. - */ - if (specinsert) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } + /* speculative insertion record must be freed by now */ + Assert(!specinsert); /* clean up the iterator */ ReorderBufferIterTXNFinish(rb, iterstate); @@ -2640,6 +2662,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2747,6 +2770,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -3032,6 +3056,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; |