aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c49
1 files changed, 37 insertions, 12 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1351b33011f..b904697f18f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -397,6 +397,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ /* Reset the toast hash */
+ ReorderBufferToastReset(rb, txn);
+
pfree(txn);
}
@@ -467,6 +470,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
@@ -1677,8 +1681,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
change_done:
/*
- * Either speculative insertion was confirmed, or it was
- * unsuccessful and the record isn't needed anymore.
+ * If speculative insertion was confirmed, the record isn't
+ * needed anymore.
*/
if (specinsert != NULL)
{
@@ -1720,6 +1724,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
+
+ /*
+ * Abort for speculative insertion arrived. So cleanup the
+ * specinsert tuple and toast hash.
+ *
+ * Note that we get the spec abort change for each toast
+ * entry but we need to perform the cleanup only the first
+ * time we get it for the main table.
+ */
+ if (specinsert != NULL)
+ {
+ /*
+ * We must clean the toast hash before processing a
+ * completely new tuple to avoid confusion about the
+ * previous tuple's toast chunks.
+ */
+ Assert(change->data.tp.clear_toast_afterwards);
+ ReorderBufferToastReset(rb, txn);
+
+ /* We don't need this record anymore. */
+ ReorderBufferReturnChange(rb, specinsert);
+ specinsert = NULL;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_TRUNCATE:
{
int i;
@@ -1827,16 +1857,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
}
- /*
- * There's a speculative insertion remaining, just clean in up, it
- * can't have been successful, otherwise we'd gotten a confirmation
- * record.
- */
- if (specinsert)
- {
- ReorderBufferReturnChange(rb, specinsert);
- specinsert = NULL;
- }
+ /* speculative insertion record must be freed by now */
+ Assert(!specinsert);
/* clean up the iterator */
ReorderBufferIterTXNFinish(rb, iterstate);
@@ -2640,6 +2662,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -2747,6 +2770,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -3032,6 +3056,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;