diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
commit | 3fe3511d05127cc024b221040db2eeb352e7d716 (patch) | |
tree | b17a084bec318a70a1c0fcd755596b771871bce7 /src/backend/replication/logical/reorderbuffer.c | |
parent | 989be0810dffd08b54e1caecec0677608211c339 (diff) | |
download | postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip |
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are
intended to be read by logical decoding plugins. This commit adds an optional
new callback to the logical decoding API.
Messages are either text or bytea. Messages can be transactional, or not, and
are identified by a prefix to allow multiple concurrent decoding plugins.
(Not to be confused with Generic WAL records, which are intended to allow crash
recovery of extensible objects.)
Author: Petr Jelinek and Andres Freund
Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs
Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 9d78c8c134e..52c6986dc0c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) change->data.tp.oldtuple = NULL; } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + if (change->data.msg.prefix != NULL) + pfree(change->data.msg.prefix); + change->data.msg.prefix = NULL; + if (change->data.msg.message != NULL) + pfree(change->data.msg.message); + change->data.msg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferCheckSerializeTXN(rb, txn); } +/* + * Queue message into a transaction so it can be processed upon commit. + */ +void +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message) +{ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_MESSAGE; + change->data.msg.prefix = pstrdup(prefix); + change->data.msg.message_size = message_size; + change->data.msg.message = palloc(message_size); + memcpy(change->data.msg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change); + + MemoryContextSwitchTo(oldcontext); + } + else + { + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + PG_TRY(); + { + rb->message(rb, txn, lsn, false, prefix, message_size, message); + + TeardownHistoricSnapshot(false); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + + static void AssertTXNLsnOrder(ReorderBuffer *rb) { @@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_MESSAGE: + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: /* get rid of the old */ TeardownHistoricSnapshot(false); @@ -2159,6 +2229,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_MESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.msg.prefix) + 1; + + sz += prefix_size + change->data.msg.message_size + + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.prefix, + prefix_size); + data += prefix_size; + + /* write the message including the size */ + memcpy(data, &change->data.msg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.message, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + { + Size prefix_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, + prefix_size); + memcpy(change->data.msg.prefix, data, prefix_size); + Assert(change->data.msg.prefix[prefix_size-1] == '\0'); + data += prefix_size; + + /* read the messsage */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot oldsnap; |