aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c49
1 files changed, 37 insertions, 12 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f96029f15a4..19e96f3fd94 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -443,6 +443,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ /* Reset the toast hash */
+ ReorderBufferToastReset(rb, txn);
+
pfree(txn);
}
@@ -520,6 +523,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
@@ -2211,8 +2215,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
change_done:
/*
- * Either speculative insertion was confirmed, or it was
- * unsuccessful and the record isn't needed anymore.
+ * If speculative insertion was confirmed, the record isn't
+ * needed anymore.
*/
if (specinsert != NULL)
{
@@ -2254,6 +2258,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
+
+ /*
+ * Abort for speculative insertion arrived. So cleanup the
+ * specinsert tuple and toast hash.
+ *
+ * Note that we get the spec abort change for each toast
+ * entry but we need to perform the cleanup only the first
+ * time we get it for the main table.
+ */
+ if (specinsert != NULL)
+ {
+ /*
+ * We must clean the toast hash before processing a
+ * completely new tuple to avoid confusion about the
+ * previous tuple's toast chunks.
+ */
+ Assert(change->data.tp.clear_toast_afterwards);
+ ReorderBufferToastReset(rb, txn);
+
+ /* We don't need this record anymore. */
+ ReorderBufferReturnChange(rb, specinsert, true);
+ specinsert = NULL;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_TRUNCATE:
{
int i;
@@ -2360,16 +2390,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
- /*
- * There's a speculative insertion remaining, just clean in up, it
- * can't have been successful, otherwise we'd gotten a confirmation
- * record.
- */
- if (specinsert)
- {
- ReorderBufferReturnChange(rb, specinsert, true);
- specinsert = NULL;
- }
+ /* speculative insertion record must be freed by now */
+ Assert(!specinsert);
/* clean up the iterator */
ReorderBufferIterTXNFinish(rb, iterstate);
@@ -3754,6 +3776,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -4017,6 +4040,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -4315,6 +4339,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;