diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2018-04-07 11:24:53 -0400 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2018-04-07 11:34:11 -0400 |
commit | 039eb6e92f20499ac36cc74f8a5cef7430b706f6 (patch) | |
tree | 2cf52aeafb59917d5c7ed396acb6d86325b4a8b0 /src/backend/replication/pgoutput/pgoutput.c | |
parent | 5dfd1e5a6696b271a2cdee54143fbc209c88c02f (diff) | |
download | postgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.tar.gz postgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.zip |
Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of the
previously added logical decoding for TRUNCATE support. Add the
required truncate callback to pgoutput and a new logical replication
protocol message.
Publications get a new attribute to determine whether to replicate
truncate actions. When updating a publication via pg_dump from an older
version, this is not set, thus preserving the previous behavior.
Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 129 |
1 files changed, 93 insertions, 36 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54ed..06dfbc082f2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -251,6 +255,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * Write the relation schema if the current schema hasn't been sent yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + Relation relation, RelationSyncEntry *relentry) +{ + if (!relentry->schema_sent) + { + TupleDesc desc; + int i; + + desc = RelationGetDescr(relation); + + /* + * Write out type info if needed. We do that only for user created + * types. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->atttypid < FirstNormalObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); + relentry->schema_sent = true; + } +} + +/* * Sends the decoded DML over wire. */ static void @@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - /* - * Write the relation schema if the current schema haven't been sent yet. - */ - if (!relentry->schema_sent) - { - TupleDesc desc; - int i; - - desc = RelationGetDescr(relation); - - /* - * Write out type info if needed. We do that only for user created - * types. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->atttypid < FirstNormalObjectId) - continue; - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); - OutputPluginWrite(ctx, false); - relentry->schema_sent = true; - } + maybe_send_schema(ctx, relation, relentry); /* Send the data */ switch (change->action) @@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + if (!is_publishable_relation(relation)) + continue; + + relentry = get_rel_sync_entry(data, relid); + + if (!relentry->pubactions.pubtruncate) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, relation, relentry); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + nrelids, + relids, + change->data.truncate.cascade, + change->data.truncate.restart_seqs); + OutputPluginWrite(ctx, true); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* * Currently we always forward. */ @@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } |