aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2018-04-07 11:24:53 -0400
committerPeter Eisentraut <peter_e@gmx.net>2018-04-07 11:34:11 -0400
commit039eb6e92f20499ac36cc74f8a5cef7430b706f6 (patch)
tree2cf52aeafb59917d5c7ed396acb6d86325b4a8b0 /src/backend
parent5dfd1e5a6696b271a2cdee54143fbc209c88c02f (diff)
downloadpostgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.tar.gz
postgresql-039eb6e92f20499ac36cc74f8a5cef7430b706f6.zip
Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of the previously added logical decoding for TRUNCATE support. Add the required truncate callback to pgoutput and a new logical replication protocol message. Publications get a new attribute to determine whether to replicate truncate actions. When updating a publication via pg_dump from an older version, this is not set, thus preserving the previous behavior. Author: Simon Riggs <simon@2ndquadrant.com> Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it> Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_publication.c1
-rw-r--r--src/backend/commands/publicationcmds.c20
-rw-r--r--src/backend/replication/logical/proto.c55
-rw-r--r--src/backend/replication/logical/worker.c68
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c129
-rw-r--r--src/backend/utils/cache/relcache.c3
6 files changed, 236 insertions, 40 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ba18258ebb7..ec3bd1d22d2 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -376,6 +376,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
+ pub->pubactions.pubtruncate = pubform->pubtruncate;
ReleaseSysCache(tup);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c5aa9ebc25..29992d4a0e2 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -62,7 +62,8 @@ parse_publication_options(List *options,
bool *publish_given,
bool *publish_insert,
bool *publish_update,
- bool *publish_delete)
+ bool *publish_delete,
+ bool *publish_truncate)
{
ListCell *lc;
@@ -72,6 +73,7 @@ parse_publication_options(List *options,
*publish_insert = true;
*publish_update = true;
*publish_delete = true;
+ *publish_truncate = true;
/* Parse options */
foreach(lc, options)
@@ -96,6 +98,7 @@ parse_publication_options(List *options,
*publish_insert = false;
*publish_update = false;
*publish_delete = false;
+ *publish_truncate = false;
*publish_given = true;
publish = defGetString(defel);
@@ -116,6 +119,8 @@ parse_publication_options(List *options,
*publish_update = true;
else if (strcmp(publish_opt, "delete") == 0)
*publish_delete = true;
+ else if (strcmp(publish_opt, "truncate") == 0)
+ *publish_truncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt)
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
AclResult aclresult;
/* must have CREATE privilege on database */
@@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt)
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
@@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt)
BoolGetDatum(publish_update);
values[Anum_pg_publication_pubdelete - 1] =
BoolGetDatum(publish_delete);
+ values[Anum_pg_publication_pubtruncate - 1] =
+ BoolGetDatum(publish_truncate);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
ObjectAddress obj;
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+ values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+ replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 948343e4aee..edc97a7662b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -26,6 +26,9 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define TRUNCATE_CASCADE (1<<0)
+#define TRUNCATE_RESTART_SEQS (1<<1)
+
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
@@ -293,6 +296,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
}
/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+ int nrelids,
+ Oid relids[],
+ bool cascade, bool restart_seqs)
+{
+ int i;
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'T'); /* action TRUNCATE */
+
+ pq_sendint32(out, nrelids);
+
+ /* encode and send truncate flags */
+ if (cascade)
+ flags |= TRUNCATE_CASCADE;
+ if (restart_seqs)
+ flags |= TRUNCATE_RESTART_SEQS;
+ pq_sendint8(out, flags);
+
+ for (i = 0; i < nrelids; i++)
+ pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs)
+{
+ int i;
+ int nrelids;
+ List *relids = NIL;
+ uint8 flags;
+
+ nrelids = pq_getmsgint(in, 4);
+
+ /* read and decode truncate flags */
+ flags = pq_getmsgint(in, 1);
+ *cascade = (flags & TRUNCATE_CASCADE) > 0;
+ *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+ for (i = 0; i < nrelids; i++)
+ relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+ return relids;
+}
+
+/*
* Write relation description to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b10857550a6..aa7e27179e8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -30,10 +30,12 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -83,6 +85,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
@@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s)
CommandCounterIncrement();
}
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+ bool cascade = false;
+ bool restart_seqs = false;
+ List *remote_relids = NIL;
+ List *remote_rels = NIL;
+ List *rels = NIL;
+ List *relids = NIL;
+ List *relids_logged = NIL;
+ ListCell *lc;
+
+ ensure_transaction();
+
+ remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+ foreach(lc, remote_relids)
+ {
+ LogicalRepRelId relid = lfirst_oid(lc);
+ LogicalRepRelMapEntry *rel;
+
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ continue;
+ }
+
+ remote_rels = lappend(remote_rels, rel);
+ rels = lappend(rels, rel->localrel);
+ relids = lappend_oid(relids, rel->localreloid);
+ if (RelationIsLogicallyLogged(rel->localrel))
+ relids_logged = lappend_oid(relids, rel->localreloid);
+ }
+
+ /*
+ * Even if we used CASCADE on the upstream master we explicitly
+ * default to replaying changes without further cascading.
+ * This might be later changeable with a user specified option.
+ */
+ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
+
+ foreach(lc, remote_rels)
+ {
+ LogicalRepRelMapEntry *rel = lfirst(lc);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+
+ CommandCounterIncrement();
+}
+
/*
* Logical replication protocol message dispatcher.
@@ -919,6 +983,10 @@ apply_dispatch(StringInfo s)
case 'D':
apply_handle_delete(s);
break;
+ /* TRUNCATE */
+ case 'T':
+ apply_handle_truncate(s);
+ break;
/* RELATION */
case 'R':
apply_handle_relation(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index aa9cf5b54ed..06dfbc082f2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, int nrelations, Relation relations[],
+ ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
@@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
+ cb->truncate_cb = pgoutput_truncate;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -251,6 +255,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
+ * Write the relation schema if the current schema hasn't been sent yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+ Relation relation, RelationSyncEntry *relentry)
+{
+ if (!relentry->schema_sent)
+ {
+ TupleDesc desc;
+ int i;
+
+ desc = RelationGetDescr(relation);
+
+ /*
+ * Write out type info if needed. We do that only for user created
+ * types.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped)
+ continue;
+
+ if (att->atttypid < FirstNormalObjectId)
+ continue;
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_typ(ctx->out, att->atttypid);
+ OutputPluginWrite(ctx, false);
+ }
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_rel(ctx->out, relation);
+ OutputPluginWrite(ctx, false);
+ relentry->schema_sent = true;
+ }
+}
+
+/*
* Sends the decoded DML over wire.
*/
static void
@@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- /*
- * Write the relation schema if the current schema haven't been sent yet.
- */
- if (!relentry->schema_sent)
- {
- TupleDesc desc;
- int i;
-
- desc = RelationGetDescr(relation);
-
- /*
- * Write out type info if needed. We do that only for user created
- * types.
- */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
-
- if (att->attisdropped)
- continue;
-
- if (att->atttypid < FirstNormalObjectId)
- continue;
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
- OutputPluginWrite(ctx, false);
- }
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
- OutputPluginWrite(ctx, false);
- relentry->schema_sent = true;
- }
+ maybe_send_schema(ctx, relation, relentry);
/* Send the data */
switch (change->action)
@@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
+ int i;
+ int nrelids;
+ Oid *relids;
+
+ old = MemoryContextSwitchTo(data->context);
+
+ relids = palloc0(nrelations * sizeof(Oid));
+ nrelids = 0;
+
+ for (i = 0; i < nrelations; i++)
+ {
+ Relation relation = relations[i];
+ Oid relid = RelationGetRelid(relation);
+
+ if (!is_publishable_relation(relation))
+ continue;
+
+ relentry = get_rel_sync_entry(data, relid);
+
+ if (!relentry->pubactions.pubtruncate)
+ continue;
+
+ relids[nrelids++] = relid;
+ maybe_send_schema(ctx, relation, relentry);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_truncate(ctx->out,
+ nrelids,
+ relids,
+ change->data.truncate.cascade,
+ change->data.truncate.restart_seqs);
+ OutputPluginWrite(ctx, true);
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
/*
* Currently we always forward.
*/
@@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
* we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
foreach(lc, data->publications)
{
@@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
+ entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
break;
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index b6ed06d5b3c..40a2c1df049 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubinsert |= pubform->pubinsert;
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
+ pubactions->pubtruncate |= pubform->pubtruncate;
ReleaseSysCache(tup);
@@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete)
+ pubactions->pubdelete && pubactions->pubtruncate)
break;
}