diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2018-04-07 11:24:53 -0400 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2018-04-07 11:34:11 -0400 |
commit | 039eb6e92f20499ac36cc74f8a5cef7430b706f6 (patch) | |
tree | 2cf52aeafb59917d5c7ed396acb6d86325b4a8b0 /src/backend | |
parent | 5dfd1e5a6696b271a2cdee54143fbc209c88c02f (diff) | |
download | postgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.tar.gz postgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.zip |
Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of the
previously added logical decoding for TRUNCATE support. Add the
required truncate callback to pgoutput and a new logical replication
protocol message.
Publications get a new attribute to determine whether to replicate
truncate actions. When updating a publication via pg_dump from an older
version, this is not set, thus preserving the previous behavior.
Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 1 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 20 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 55 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 68 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 129 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 3 |
6 files changed, 236 insertions, 40 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ba18258ebb7..ec3bd1d22d2 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -376,6 +376,7 @@ GetPublication(Oid pubid) pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; + pub->pubactions.pubtruncate = pubform->pubtruncate; ReleaseSysCache(tup); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c5aa9ebc25..29992d4a0e2 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -62,7 +62,8 @@ parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, - bool *publish_delete) + bool *publish_delete, + bool *publish_truncate) { ListCell *lc; @@ -72,6 +73,7 @@ parse_publication_options(List *options, *publish_insert = true; *publish_update = true; *publish_delete = true; + *publish_truncate = true; /* Parse options */ foreach(lc, options) @@ -96,6 +98,7 @@ parse_publication_options(List *options, *publish_insert = false; *publish_update = false; *publish_delete = false; + *publish_truncate = false; *publish_given = true; publish = defGetString(defel); @@ -116,6 +119,8 @@ parse_publication_options(List *options, *publish_update = true; else if (strcmp(publish_opt, "delete") == 0) *publish_delete = true; + else if (strcmp(publish_opt, "truncate") == 0) + *publish_truncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt) bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; AclResult aclresult; /* must have CREATE privilege on database */ @@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt) parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); @@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt) BoolGetDatum(publish_update); values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubtruncate - 1] = + BoolGetDatum(publish_truncate); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; ObjectAddress obj; parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); replaces[Anum_pg_publication_pubdelete - 1] = true; + + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + replaces[Anum_pg_publication_pubtruncate - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4aee..edc97a7662b 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -26,6 +26,9 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); @@ -293,6 +296,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) } /* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, + int nrelids, + Oid relids[], + bool cascade, bool restart_seqs) +{ + int i; + uint8 flags = 0; + + pq_sendbyte(out, 'T'); /* action TRUNCATE */ + + pq_sendint32(out, nrelids); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); + + for (i = 0; i < nrelids; i++) + pq_sendint32(out, relids[i]); +} + +/* + * Read TRUNCATE from stream. + */ +List * +logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs) +{ + int i; + int nrelids; + List *relids = NIL; + uint8 flags; + + nrelids = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + for (i = 0; i < nrelids; i++) + relids = lappend_oid(relids, pq_getmsgint(in, 4)); + + return relids; +} + +/* * Write relation description to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b10857550a6..aa7e27179e8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -30,10 +30,12 @@ #include "access/xact.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -83,6 +85,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/timeout.h" #include "utils/tqual.h" #include "utils/syscache.h" @@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s) CommandCounterIncrement(); } +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + bool cascade = false; + bool restart_seqs = false; + List *remote_relids = NIL; + List *remote_rels = NIL; + List *rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + ListCell *lc; + + ensure_transaction(); + + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); + + foreach(lc, remote_relids) + { + LogicalRepRelId relid = lfirst_oid(lc); + LogicalRepRelMapEntry *rel; + + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } + + remote_rels = lappend(remote_rels, rel); + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids, rel->localreloid); + } + + /* + * Even if we used CASCADE on the upstream master we explicitly + * default to replaying changes without further cascading. + * This might be later changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); + + foreach(lc, remote_rels) + { + LogicalRepRelMapEntry *rel = lfirst(lc); + + logicalrep_rel_close(rel, NoLock); + } + + CommandCounterIncrement(); +} + /* * Logical replication protocol message dispatcher. @@ -919,6 +983,10 @@ apply_dispatch(StringInfo s) case 'D': apply_handle_delete(s); break; + /* TRUNCATE */ + case 'T': + apply_handle_truncate(s); + break; /* RELATION */ case 'R': apply_handle_relation(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54ed..06dfbc082f2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -251,6 +255,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * Write the relation schema if the current schema hasn't been sent yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + Relation relation, RelationSyncEntry *relentry) +{ + if (!relentry->schema_sent) + { + TupleDesc desc; + int i; + + desc = RelationGetDescr(relation); + + /* + * Write out type info if needed. We do that only for user created + * types. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->atttypid < FirstNormalObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); + relentry->schema_sent = true; + } +} + +/* * Sends the decoded DML over wire. */ static void @@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - /* - * Write the relation schema if the current schema haven't been sent yet. - */ - if (!relentry->schema_sent) - { - TupleDesc desc; - int i; - - desc = RelationGetDescr(relation); - - /* - * Write out type info if needed. We do that only for user created - * types. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->atttypid < FirstNormalObjectId) - continue; - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); - OutputPluginWrite(ctx, false); - relentry->schema_sent = true; - } + maybe_send_schema(ctx, relation, relentry); /* Send the data */ switch (change->action) @@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + if (!is_publishable_relation(relation)) + continue; + + relentry = get_rel_sync_entry(data, relid); + + if (!relentry->pubactions.pubtruncate) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, relation, relentry); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + nrelids, + relids, + change->data.truncate.cascade, + change->data.truncate.restart_seqs); + OutputPluginWrite(ctx, true); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* * Currently we always forward. */ @@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index b6ed06d5b3c..40a2c1df049 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation) pubactions->pubinsert |= pubform->pubinsert; pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; + pubactions->pubtruncate |= pubform->pubtruncate; ReleaseSysCache(tup); @@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation) * other publications. */ if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete) + pubactions->pubdelete && pubactions->pubtruncate) break; } |