aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c14
-rw-r--r--src/backend/replication/logical/reorderbuffer.c49
-rw-r--r--src/include/replication/reorderbuffer.h9
3 files changed, 48 insertions, 24 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 70670169acc..453efc51e16 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1040,19 +1040,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
- /*
- * Super deletions are irrelevant for logical decoding, it's driven by the
- * confirmation records.
- */
- if (xlrec->flags & XLH_DELETE_IS_SUPER)
- return;
-
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder);
- change->action = REORDER_BUFFER_CHANGE_DELETE;
+
+ if (xlrec->flags & XLH_DELETE_IS_SUPER)
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_DELETE;
+
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f96029f15a4..19e96f3fd94 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -443,6 +443,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->invalidations = NULL;
}
+ /* Reset the toast hash */
+ ReorderBufferToastReset(rb, txn);
+
pfree(txn);
}
@@ -520,6 +523,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
@@ -2211,8 +2215,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
change_done:
/*
- * Either speculative insertion was confirmed, or it was
- * unsuccessful and the record isn't needed anymore.
+ * If speculative insertion was confirmed, the record isn't
+ * needed anymore.
*/
if (specinsert != NULL)
{
@@ -2254,6 +2258,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
+
+ /*
+ * Abort for speculative insertion arrived. So cleanup the
+ * specinsert tuple and toast hash.
+ *
+ * Note that we get the spec abort change for each toast
+ * entry but we need to perform the cleanup only the first
+ * time we get it for the main table.
+ */
+ if (specinsert != NULL)
+ {
+ /*
+ * We must clean the toast hash before processing a
+ * completely new tuple to avoid confusion about the
+ * previous tuple's toast chunks.
+ */
+ Assert(change->data.tp.clear_toast_afterwards);
+ ReorderBufferToastReset(rb, txn);
+
+ /* We don't need this record anymore. */
+ ReorderBufferReturnChange(rb, specinsert, true);
+ specinsert = NULL;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_TRUNCATE:
{
int i;
@@ -2360,16 +2390,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
- /*
- * There's a speculative insertion remaining, just clean in up, it
- * can't have been successful, otherwise we'd gotten a confirmation
- * record.
- */
- if (specinsert)
- {
- ReorderBufferReturnChange(rb, specinsert, true);
- specinsert = NULL;
- }
+ /* speculative insertion record must be freed by now */
+ Assert(!specinsert);
/* clean up the iterator */
ReorderBufferIterTXNFinish(rb, iterstate);
@@ -3754,6 +3776,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -4017,6 +4040,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
@@ -4315,6 +4339,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0c6e9d1cb92..ba257d81b51 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -46,10 +46,10 @@ typedef struct ReorderBufferTupleBuf
* changes. Users of the decoding facilities will never see changes with
* *_INTERNAL_* actions.
*
- * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
- * "speculative insertions", and their confirmation respectively. They're
- * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't
- * have to care about these.
+ * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
+ * changes concern "speculative insertions", their confirmation, and abort
+ * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
+ * logical decoding don't have to care about these.
*/
enum ReorderBufferChangeType
{
@@ -63,6 +63,7 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
REORDER_BUFFER_CHANGE_TRUNCATE
};