aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/catalogs.sgml8
-rw-r--r--doc/src/sgml/logical-replication.sgml13
-rw-r--r--doc/src/sgml/protocol.sgml56
-rw-r--r--doc/src/sgml/ref/create_publication.sgml10
-rw-r--r--src/backend/catalog/pg_publication.c1
-rw-r--r--src/backend/commands/publicationcmds.c20
-rw-r--r--src/backend/replication/logical/proto.c55
-rw-r--r--src/backend/replication/logical/worker.c68
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c129
-rw-r--r--src/backend/utils/cache/relcache.c3
-rw-r--r--src/bin/pg_dump/pg_dump.c33
-rw-r--r--src/bin/pg_dump/pg_dump.h1
-rw-r--r--src/bin/pg_dump/t/002_pg_dump.pl2
-rw-r--r--src/bin/psql/describe.c26
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_publication.h7
-rw-r--r--src/include/replication/logicalproto.h4
-rw-r--r--src/test/regress/expected/publication.out84
-rw-r--r--src/test/subscription/t/010_truncate.pl161
19 files changed, 572 insertions, 111 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d6a9d8c5808..e8efa13e8df 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5518,6 +5518,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<entry>If true, <command>DELETE</command> operations are replicated for
tables in the publication.</entry>
</row>
+
+ <row>
+ <entry><structfield>pubtruncate</structfield></entry>
+ <entry><type>bool</type></entry>
+ <entry></entry>
+ <entry>If true, <command>TRUNCATE</command> operations are replicated for
+ tables in the publication.</entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 75551d8ee1a..151e773fc2c 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -108,8 +108,8 @@
<para>
Publications can choose to limit the changes they produce to
- any combination of <command>INSERT</command>, <command>UPDATE</command>, and
- <command>DELETE</command>, similar to how triggers are fired by
+ any combination of <command>INSERT</command>, <command>UPDATE</command>,
+ <command>DELETE</command>, and <command>TRUNCATE</command>, similar to how triggers are fired by
particular event types. By default, all operation types are replicated.
</para>
@@ -366,15 +366,6 @@
<listitem>
<para>
- <command>TRUNCATE</command> commands are not replicated. This can, of
- course, be worked around by using <command>DELETE</command> instead. To
- avoid accidental <command>TRUNCATE</command> invocations, you can revoke
- the <literal>TRUNCATE</literal> privilege from tables.
- </para>
- </listitem>
-
- <listitem>
- <para>
Large objects (see <xref linkend="largeobjects"/>) are not replicated.
There is no workaround for that, other than storing data in normal
tables.
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b94dd4ac654..004b36084f1 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6774,6 +6774,62 @@ Delete
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+Truncate
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('T')
+</term>
+<listitem>
+<para>
+ Identifies the message as a truncate message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ Number of relations
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int8
+</term>
+<listitem>
+<para>
+ Option bits for <command>TRUNCATE</command>:
+ 1 for <literal>CASCADE</literal>, 2 for <literal>RESTART IDENTITY</literal>
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ ID of the relation corresponding to the ID in the relation
+ message. This field is repeated for each relation.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
</variablelist>
<para>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index bfe12d5f410..99f87ca3938 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -106,10 +106,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
This parameter determines which DML operations will be published by
the new publication to the subscribers. The value is
comma-separated list of operations. The allowed operations are
- <literal>insert</literal>, <literal>update</literal>, and
- <literal>delete</literal>. The default is to publish all actions,
+ <literal>insert</literal>, <literal>update</literal>,
+ <literal>delete</literal>, and <literal>truncate</literal>.
+ The default is to publish all actions,
and so the default value for this option is
- <literal>'insert, update, delete'</literal>.
+ <literal>'insert, update, delete, truncate'</literal>.
</para>
</listitem>
</varlistentry>
@@ -168,8 +169,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
</para>
<para>
- <command>TRUNCATE</command> and <acronym>DDL</acronym> operations
- are not published.
+ <acronym>DDL</acronym> operations are not published.
</para>
</refsect1>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ba18258ebb7..ec3bd1d22d2 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -376,6 +376,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
+ pub->pubactions.pubtruncate = pubform->pubtruncate;
ReleaseSysCache(tup);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c5aa9ebc25..29992d4a0e2 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -62,7 +62,8 @@ parse_publication_options(List *options,
bool *publish_given,
bool *publish_insert,
bool *publish_update,
- bool *publish_delete)
+ bool *publish_delete,
+ bool *publish_truncate)
{
ListCell *lc;
@@ -72,6 +73,7 @@ parse_publication_options(List *options,
*publish_insert = true;
*publish_update = true;
*publish_delete = true;
+ *publish_truncate = true;
/* Parse options */
foreach(lc, options)
@@ -96,6 +98,7 @@ parse_publication_options(List *options,
*publish_insert = false;
*publish_update = false;
*publish_delete = false;
+ *publish_truncate = false;
*publish_given = true;
publish = defGetString(defel);
@@ -116,6 +119,8 @@ parse_publication_options(List *options,
*publish_update = true;
else if (strcmp(publish_opt, "delete") == 0)
*publish_delete = true;
+ else if (strcmp(publish_opt, "truncate") == 0)
+ *publish_truncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt)
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
AclResult aclresult;
/* must have CREATE privilege on database */
@@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt)
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
@@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt)
BoolGetDatum(publish_update);
values[Anum_pg_publication_pubdelete - 1] =
BoolGetDatum(publish_delete);
+ values[Anum_pg_publication_pubtruncate - 1] =
+ BoolGetDatum(publish_truncate);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
ObjectAddress obj;
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+ values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+ replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 948343e4aee..edc97a7662b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -26,6 +26,9 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define TRUNCATE_CASCADE (1<<0)
+#define TRUNCATE_RESTART_SEQS (1<<1)
+
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
@@ -293,6 +296,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
}
/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+ int nrelids,
+ Oid relids[],
+ bool cascade, bool restart_seqs)
+{
+ int i;
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'T'); /* action TRUNCATE */
+
+ pq_sendint32(out, nrelids);
+
+ /* encode and send truncate flags */
+ if (cascade)
+ flags |= TRUNCATE_CASCADE;
+ if (restart_seqs)
+ flags |= TRUNCATE_RESTART_SEQS;
+ pq_sendint8(out, flags);
+
+ for (i = 0; i < nrelids; i++)
+ pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs)
+{
+ int i;
+ int nrelids;
+ List *relids = NIL;
+ uint8 flags;
+
+ nrelids = pq_getmsgint(in, 4);
+
+ /* read and decode truncate flags */
+ flags = pq_getmsgint(in, 1);
+ *cascade = (flags & TRUNCATE_CASCADE) > 0;
+ *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+ for (i = 0; i < nrelids; i++)
+ relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+ return relids;
+}
+
+/*
* Write relation description to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b10857550a6..aa7e27179e8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -30,10 +30,12 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -83,6 +85,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
@@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s)
CommandCounterIncrement();
}
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+ bool cascade = false;
+ bool restart_seqs = false;
+ List *remote_relids = NIL;
+ List *remote_rels = NIL;
+ List *rels = NIL;
+ List *relids = NIL;
+ List *relids_logged = NIL;
+ ListCell *lc;
+
+ ensure_transaction();
+
+ remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+ foreach(lc, remote_relids)
+ {
+ LogicalRepRelId relid = lfirst_oid(lc);
+ LogicalRepRelMapEntry *rel;
+
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ continue;
+ }
+
+ remote_rels = lappend(remote_rels, rel);
+ rels = lappend(rels, rel->localrel);
+ relids = lappend_oid(relids, rel->localreloid);
+ if (RelationIsLogicallyLogged(rel->localrel))
+ relids_logged = lappend_oid(relids, rel->localreloid);
+ }
+
+ /*
+ * Even if we used CASCADE on the upstream master we explicitly
+ * default to replaying changes without further cascading.
+ * This might be later changeable with a user specified option.
+ */
+ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
+
+ foreach(lc, remote_rels)
+ {
+ LogicalRepRelMapEntry *rel = lfirst(lc);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+
+ CommandCounterIncrement();
+}
+
/*
* Logical replication protocol message dispatcher.
@@ -919,6 +983,10 @@ apply_dispatch(StringInfo s)
case 'D':
apply_handle_delete(s);
break;
+ /* TRUNCATE */
+ case 'T':
+ apply_handle_truncate(s);
+ break;
/* RELATION */
case 'R':
apply_handle_relation(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index aa9cf5b54ed..06dfbc082f2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, int nrelations, Relation relations[],
+ ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
@@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
+ cb->truncate_cb = pgoutput_truncate;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -251,6 +255,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
+ * Write the relation schema if the current schema hasn't been sent yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+ Relation relation, RelationSyncEntry *relentry)
+{
+ if (!relentry->schema_sent)
+ {
+ TupleDesc desc;
+ int i;
+
+ desc = RelationGetDescr(relation);
+
+ /*
+ * Write out type info if needed. We do that only for user created
+ * types.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped)
+ continue;
+
+ if (att->atttypid < FirstNormalObjectId)
+ continue;
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_typ(ctx->out, att->atttypid);
+ OutputPluginWrite(ctx, false);
+ }
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_rel(ctx->out, relation);
+ OutputPluginWrite(ctx, false);
+ relentry->schema_sent = true;
+ }
+}
+
+/*
* Sends the decoded DML over wire.
*/
static void
@@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- /*
- * Write the relation schema if the current schema haven't been sent yet.
- */
- if (!relentry->schema_sent)
- {
- TupleDesc desc;
- int i;
-
- desc = RelationGetDescr(relation);
-
- /*
- * Write out type info if needed. We do that only for user created
- * types.
- */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
-
- if (att->attisdropped)
- continue;
-
- if (att->atttypid < FirstNormalObjectId)
- continue;
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
- OutputPluginWrite(ctx, false);
- }
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
- OutputPluginWrite(ctx, false);
- relentry->schema_sent = true;
- }
+ maybe_send_schema(ctx, relation, relentry);
/* Send the data */
switch (change->action)
@@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
+ int i;
+ int nrelids;
+ Oid *relids;
+
+ old = MemoryContextSwitchTo(data->context);
+
+ relids = palloc0(nrelations * sizeof(Oid));
+ nrelids = 0;
+
+ for (i = 0; i < nrelations; i++)
+ {
+ Relation relation = relations[i];
+ Oid relid = RelationGetRelid(relation);
+
+ if (!is_publishable_relation(relation))
+ continue;
+
+ relentry = get_rel_sync_entry(data, relid);
+
+ if (!relentry->pubactions.pubtruncate)
+ continue;
+
+ relids[nrelids++] = relid;
+ maybe_send_schema(ctx, relation, relentry);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_truncate(ctx->out,
+ nrelids,
+ relids,
+ change->data.truncate.cascade,
+ change->data.truncate.restart_seqs);
+ OutputPluginWrite(ctx, true);
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
/*
* Currently we always forward.
*/
@@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
* we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
foreach(lc, data->publications)
{
@@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
+ entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
break;
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index b6ed06d5b3c..40a2c1df049 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubinsert |= pubform->pubinsert;
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
+ pubactions->pubtruncate |= pubform->pubtruncate;
ReleaseSysCache(tup);
@@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete)
+ pubactions->pubdelete && pubactions->pubtruncate)
break;
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index d066f4f00b6..69016a6c4d3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3712,6 +3712,7 @@ getPublications(Archive *fout)
int i_pubinsert;
int i_pubupdate;
int i_pubdelete;
+ int i_pubtruncate;
int i,
ntups;
@@ -3723,12 +3724,20 @@ getPublications(Archive *fout)
resetPQExpBuffer(query);
/* Get the publications. */
- appendPQExpBuffer(query,
- "SELECT p.tableoid, p.oid, p.pubname, "
- "(%s p.pubowner) AS rolname, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
- "FROM pg_publication p",
- username_subquery);
+ if (fout->remoteVersion >= 110000)
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+ "FROM pg_publication p",
+ username_subquery);
+ else
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+ "FROM pg_publication p",
+ username_subquery);
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -3742,6 +3751,7 @@ getPublications(Archive *fout)
i_pubinsert = PQfnumber(res, "pubinsert");
i_pubupdate = PQfnumber(res, "pubupdate");
i_pubdelete = PQfnumber(res, "pubdelete");
+ i_pubtruncate = PQfnumber(res, "pubtruncate");
pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
@@ -3762,6 +3772,8 @@ getPublications(Archive *fout)
(strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
pubinfo[i].pubdelete =
(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+ pubinfo[i].pubtruncate =
+ (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
if (strlen(pubinfo[i].rolname) == 0)
write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
@@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
first = false;
}
+ if (pubinfo->pubtruncate)
+ {
+ if (!first)
+ appendPQExpBufferStr(query, ", ");
+
+ appendPQExpBufferStr(query, "truncate");
+ first = false;
+ }
+
appendPQExpBufferStr(query, "');\n");
ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index a4d6d926a81..c2314758dea 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -595,6 +595,7 @@ typedef struct _PublicationInfo
bool pubinsert;
bool pubupdate;
bool pubdelete;
+ bool pubtruncate;
} PublicationInfo;
/*
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index acdfde2a1e8..25852b903c0 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2038,7 +2038,7 @@ qr/CREATE TRANSFORM FOR integer LANGUAGE sql \(FROM SQL WITH FUNCTION pg_catalog
create_order => 50,
create_sql => 'CREATE PUBLICATION pub1;',
regexp => qr/^
- \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E
+ \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E
/xm,
like => {
%full_runs,
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0c3be1f5046..75a1e42ceea 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5187,7 +5187,7 @@ listPublications(const char *pattern)
PQExpBufferData buf;
PGresult *res;
printQueryOpt myopt = pset.popt;
- static const bool translate_columns[] = {false, false, false, false, false, false};
+ static const bool translate_columns[] = {false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -5207,13 +5207,17 @@ listPublications(const char *pattern)
" puballtables AS \"%s\",\n"
" pubinsert AS \"%s\",\n"
" pubupdate AS \"%s\",\n"
- " pubdelete AS \"%s\"\n",
+ " pubdelete AS \"%s\"",
gettext_noop("Name"),
gettext_noop("Owner"),
gettext_noop("All tables"),
gettext_noop("Inserts"),
gettext_noop("Updates"),
gettext_noop("Deletes"));
+ if (pset.sversion >= 110000)
+ appendPQExpBuffer(&buf,
+ ",\n pubtruncate AS \"%s\"",
+ gettext_noop("Truncates"));
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
@@ -5254,6 +5258,7 @@ describePublications(const char *pattern)
PQExpBufferData buf;
int i;
PGresult *res;
+ bool has_pubtruncate;
if (pset.sversion < 100000)
{
@@ -5265,13 +5270,19 @@ describePublications(const char *pattern)
return true;
}
+ has_pubtruncate = (pset.sversion >= 110000);
+
initPQExpBuffer(&buf);
printfPQExpBuffer(&buf,
"SELECT oid, pubname,\n"
" pg_catalog.pg_get_userbyid(pubowner) AS owner,\n"
- " puballtables, pubinsert, pubupdate, pubdelete\n"
- "FROM pg_catalog.pg_publication\n");
+ " puballtables, pubinsert, pubupdate, pubdelete");
+ if (has_pubtruncate)
+ appendPQExpBuffer(&buf,
+ ", pubtruncate");
+ appendPQExpBuffer(&buf,
+ "\nFROM pg_catalog.pg_publication\n");
processSQLNamePattern(pset.db, &buf, pattern, false, false,
NULL, "pubname", NULL,
@@ -5317,6 +5328,9 @@ describePublications(const char *pattern)
printTableOpt myopt = pset.popt.topt;
printTableContent cont;
+ if (has_pubtruncate)
+ ncols++;
+
initPQExpBuffer(&title);
printfPQExpBuffer(&title, _("Publication %s"), pubname);
printTableInit(&cont, &myopt, title.data, ncols, nrows);
@@ -5326,12 +5340,16 @@ describePublications(const char *pattern)
printTableAddHeader(&cont, gettext_noop("Inserts"), true, align);
printTableAddHeader(&cont, gettext_noop("Updates"), true, align);
printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
+ if (has_pubtruncate)
+ printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
+ if (has_pubtruncate)
+ printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
if (!puballtables)
{
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 7a0c5d36db3..d88a6bb4c1c 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201804061
+#define CATALOG_VERSION_NO 201804071
#endif
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 37e77b8be7e..b643c489cdf 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -49,6 +49,9 @@ CATALOG(pg_publication,6104)
/* true if deletes are published */
bool pubdelete;
+ /* true if truncates are published */
+ bool pubtruncate;
+
} FormData_pg_publication;
/* ----------------
@@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication;
* ----------------
*/
-#define Natts_pg_publication 6
+#define Natts_pg_publication 7
#define Anum_pg_publication_pubname 1
#define Anum_pg_publication_pubowner 2
#define Anum_pg_publication_puballtables 3
#define Anum_pg_publication_pubinsert 4
#define Anum_pg_publication_pubupdate 5
#define Anum_pg_publication_pubdelete 6
+#define Anum_pg_publication_pubtruncate 7
typedef struct PublicationActions
{
bool pubinsert;
bool pubupdate;
bool pubdelete;
+ bool pubtruncate;
} PublicationActions;
typedef struct Publication
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 116f16f42d1..92e88d31279 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation rel,
HeapTuple oldtuple);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
+extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
+ bool cascade, bool restart_seqs);
+extern List *logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs);
extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 0c86c647bca..afbbdd543df 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -21,20 +21,20 @@ ERROR: unrecognized publication parameter: foo
CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
ERROR: unrecognized "publish" value: "cluster"
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f | t | f | f
- testpub_default | regress_publication_user | f | f | t | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f
+ testpub_default | regress_publication_user | f | f | t | f | f
(2 rows)
ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f | t | f | f
- testpub_default | regress_publication_user | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f
+ testpub_default | regress_publication_user | f | t | t | t | f
(2 rows)
--- adding tables
@@ -76,10 +76,10 @@ Publications:
"testpub_foralltables"
\dRp+ testpub_foralltables
- Publication testpub_foralltables
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | t | t | t | f
+ Publication testpub_foralltables
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | t | t | t | f | f
(1 row)
DROP TABLE testpub_tbl2;
@@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
\dRp+ testpub3
- Publication testpub3
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub3
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"public.testpub_tbl3"
"public.testpub_tbl3a"
\dRp+ testpub4
- Publication testpub4
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub4
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"public.testpub_tbl3"
@@ -119,10 +119,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
ERROR: publication "testpub_fortbl" already exists
\dRp+ testpub_fortbl
- Publication testpub_fortbl
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_fortbl
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -165,10 +165,10 @@ Publications:
"testpub_fortbl"
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -210,10 +210,10 @@ DROP TABLE testpub_parted;
DROP VIEW testpub_view;
DROP TABLE testpub_tbl1;
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | f
(1 row)
-- fail - must be owner of publication
@@ -223,20 +223,20 @@ ERROR: must be owner of publication testpub_default
RESET ROLE;
ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
--------------+--------------------------+------------+---------+---------+---------
- testpub_foo | regress_publication_user | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+-------------+--------------------------+------------+---------+---------+---------+-----------
+ testpub_foo | regress_publication_user | f | t | t | t | f
(1 row)
-- rename back to keep the rest simple
ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
\dRp testpub_default
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
------------------+---------------------------+------------+---------+---------+---------
- testpub_default | regress_publication_user2 | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+-----------------+---------------------------+------------+---------+---------+---------+-----------
+ testpub_default | regress_publication_user2 | f | t | t | t | f
(1 row)
DROP PUBLICATION testpub_default;
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
new file mode 100644
index 00000000000..8ea4ab624f0
--- /dev/null
+++ b/src/test/subscription/t/010_truncate.pl
@@ -0,0 +1,161 @@
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SEQUENCE seq1 OWNED BY tab1.a"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SEQUENCE seq1 START 101"
+);
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr application_name=sub3' PUBLICATION pub3");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# insert data to truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# truncate and check
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+ 'truncate replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT nextval('seq1')");
+is($result, qq(1),
+ 'sequence not restarted');
+
+# truncate with restart identity
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT nextval('seq1')");
+is($result, qq(101),
+ 'truncate restarted identities');
+
+# test publication that does not replicate truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+ 'truncate not replicated');
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||),
+ 'truncate replicated after publication change');
+
+# test multiple tables connected by foreign keys
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4");
+
+$node_publisher->wait_for_catchup('sub3');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab3");
+is($result, qq(0||),
+ 'truncate of multiple tables replicated');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(x), max(x) FROM tab4");
+is($result, qq(0||),
+ 'truncate of multiple tables replicated');
+
+# test truncate of multiple tables, some of which are not published
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2");
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+ 'truncate of multiple tables some not published');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+ 'truncate of multiple tables some not published');