aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c121
1 files changed, 121 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9d78c8c134e..52c6986dc0c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
change->data.tp.oldtuple = NULL;
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ if (change->data.msg.prefix != NULL)
+ pfree(change->data.msg.prefix);
+ change->data.msg.prefix = NULL;
+ if (change->data.msg.message != NULL)
+ pfree(change->data.msg.message);
+ change->data.msg.message = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
if (change->data.snapshot)
{
@@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
ReorderBufferCheckSerializeTXN(rb, txn);
}
+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message)
+{
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ Assert(xid != InvalidTransactionId);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+ change->data.msg.prefix = pstrdup(prefix);
+ change->data.msg.message_size = message_size;
+ change->data.msg.message = palloc(message_size);
+ memcpy(change->data.msg.message, message, message_size);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+ PG_TRY();
+ {
+ rb->message(rb, txn, lsn, false, prefix, message_size, message);
+
+ TeardownHistoricSnapshot(false);
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
+
static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
@@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ rb->message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
@@ -2159,6 +2229,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
}
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ char *data;
+ Size prefix_size = strlen(change->data.msg.prefix) + 1;
+
+ sz += prefix_size + change->data.msg.message_size +
+ sizeof(Size) + sizeof(Size);
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* write the prefix including the size */
+ memcpy(data, &prefix_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.prefix,
+ prefix_size);
+ data += prefix_size;
+
+ /* write the message including the size */
+ memcpy(data, &change->data.msg.message_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.message,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
@@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ Size prefix_size;
+
+ /* read prefix */
+ memcpy(&prefix_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.prefix = MemoryContextAlloc(rb->context,
+ prefix_size);
+ memcpy(change->data.msg.prefix, data, prefix_size);
+ Assert(change->data.msg.prefix[prefix_size-1] == '\0');
+ data += prefix_size;
+
+ /* read the messsage */
+ memcpy(&change->data.msg.message_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.message = MemoryContextAlloc(rb->context,
+ change->data.msg.message_size);
+ memcpy(change->data.msg.message, data,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot oldsnap;