diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2018-03-21 09:13:24 -0400 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2018-03-21 09:15:04 -0400 |
commit | 325f2ec5557fd1c9156c910102522e04cb42d99c (patch) | |
tree | d41044a9ebc9beec2809fe61289467b107802c25 /src | |
parent | be8a7a6866276b228b4ffaa3003e1dc2dd1d140a (diff) | |
download | postgresql-325f2ec5557fd1c9156c910102522e04cb42d99c.tar.gz postgresql-325f2ec5557fd1c9156c910102522e04cb42d99c.zip |
Handle heap rewrites even better in logical decoding
Logical decoding should not publish anything about tables created as
part of a heap rewrite during DDL. Those tables don't exist externally,
so consumers of logical decoding cannot do anything sensible with that
information. In ab28feae2bd3d4629bd73ae3548e671c57d785f0, we worked
around this for built-in logical replication, but that was hack.
This is a more proper fix: We mark such transient heaps using the new
field pg_class.relwrite, linking to the original relation OID. By
default, we ignore them in logical decoding before they get to the
output plugin. Optionally, a plugin can register their interest in
getting such changes, if they handle DDL specially, in which case the
new field will help them get information about the actual table.
Reviewed-by: Craig Ringer <craig@2ndquadrant.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/bootstrap/bootparse.y | 1 | ||||
-rw-r--r-- | src/backend/catalog/heap.c | 4 | ||||
-rw-r--r-- | src/backend/catalog/toasting.c | 1 | ||||
-rw-r--r-- | src/backend/commands/cluster.c | 1 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 1 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 4 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 7 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 26 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/heap.h | 1 | ||||
-rw-r--r-- | src/include/catalog/pg_class.h | 22 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 1 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 5 |
13 files changed, 39 insertions, 37 deletions
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index ed7a55596f8..4ea3aa97cf7 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -257,6 +257,7 @@ Boot_CreateStmt: false, true, false, + InvalidOid, NULL); elog(DEBUG4, "relation created with OID %u", id); } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 0497332e9d7..ca2c2f99520 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -806,6 +806,7 @@ InsertPgClassTuple(Relation pg_class_desc, values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated); values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident); values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition); + values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite); values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid); values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid); if (relacl != (Datum) 0) @@ -1038,6 +1039,7 @@ heap_create_with_catalog(const char *relname, bool use_user_acl, bool allow_system_table_mods, bool is_internal, + Oid relrewrite, ObjectAddress *typaddress) { Relation pg_class_desc; @@ -1176,6 +1178,8 @@ heap_create_with_catalog(const char *relname, Assert(relid == RelationGetRelid(new_rel_desc)); + new_rel_desc->rd_rel->relrewrite = relrewrite; + /* * Decide whether to create an array type over the relation's rowtype. We * do not create any array types for system catalogs (ie, those made diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c index 8bf26985450..c4515e6c1d1 100644 --- a/src/backend/catalog/toasting.c +++ b/src/backend/catalog/toasting.c @@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, false, true, true, + InvalidOid, NULL); Assert(toast_relid != InvalidOid); diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 96a51bb7603..57f3917fdc4 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -692,6 +692,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence, false, true, true, + OIDOldHeap, NULL); Assert(OIDNewHeap != InvalidOid); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 8f83aa46753..2ec99f99f93 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -764,6 +764,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, true, allowSystemTableMods, false, + InvalidOid, typaddress); /* Store inheritance information for new rel. */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7637efc32e0..e2e39f45779 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -317,6 +317,8 @@ CreateInitDecodingContext(char *plugin, startup_cb_wrapper(ctx, &ctx->options, true); MemoryContextSwitchTo(old_context); + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + return ctx; } @@ -410,6 +412,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, startup_cb_wrapper(ctx, &ctx->options, false); MemoryContextSwitchTo(old_context); + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + ereport(LOG, (errmsg("starting logical decoding for slot \"%s\"", NameStr(slot->data.name)), diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7612cf5f04c..5ffe638b19c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1403,6 +1403,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, goto change_done; /* + * Ignore temporary heaps created during DDL unless the + * plugin has asked for them. + */ + if (relation->rd_rel->relrewrite && !rb->output_rewrites) + goto change_done; + + /* * For now ignore sequence changes entirely. Most of the * time they don't log changes using records we * understand, so it doesn't make sense to handle the few diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d538f25ede6..aa9cf5b54ed 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -21,7 +21,6 @@ #include "utils/inval.h" #include "utils/int8.h" -#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -511,31 +510,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); - /* - * Skip tables that look like they are from a heap rewrite (see - * make_new_heap()). We need to skip them because the subscriber - * won't have a table by that name to receive the data. That - * means we won't ship the new data in, say, an added column with - * a DEFAULT, but if the user applies the same DDL manually on the - * subscriber, then this will work out for them. - * - * We only need to consider the alltables case, because such a - * transient heap won't be an explicit member of a publication. - */ - if (pub->alltables) - { - char *relname = get_rel_name(relid); - unsigned int u; - int n; - - if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 && - relname[n] == '\0') - { - if (get_rel_relkind(u) == RELKIND_RELATION) - break; - } - } - if (pub->alltables || list_member_oid(pubids, pub->oid)) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 96d3406abe0..3a3593be8dc 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201803141 +#define CATALOG_VERSION_NO 201803211 #endif diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 9bdc63ceb5a..3308fa3dfd5 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -71,6 +71,7 @@ extern Oid heap_create_with_catalog(const char *relname, bool use_user_acl, bool allow_system_table_mods, bool is_internal, + Oid relrewrite, ObjectAddress *typaddress); extern void heap_create_init_fork(Relation rel); diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h index 7fc355acb89..85cdb99f1f6 100644 --- a/src/include/catalog/pg_class.h +++ b/src/include/catalog/pg_class.h @@ -70,6 +70,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO bool relispopulated; /* matview currently holds query results */ char relreplident; /* see REPLICA_IDENTITY_xxx constants */ bool relispartition; /* is relation a partition? */ + Oid relrewrite; /* heap for rewrite during DDL, link to original rel */ TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */ TransactionId relminmxid; /* all multixacts in this rel are >= this. * this is really a MultiXactId */ @@ -98,7 +99,7 @@ typedef FormData_pg_class *Form_pg_class; * ---------------- */ -#define Natts_pg_class 32 +#define Natts_pg_class 33 #define Anum_pg_class_relname 1 #define Anum_pg_class_relnamespace 2 #define Anum_pg_class_reltype 3 @@ -126,11 +127,12 @@ typedef FormData_pg_class *Form_pg_class; #define Anum_pg_class_relispopulated 25 #define Anum_pg_class_relreplident 26 #define Anum_pg_class_relispartition 27 -#define Anum_pg_class_relfrozenxid 28 -#define Anum_pg_class_relminmxid 29 -#define Anum_pg_class_relacl 30 -#define Anum_pg_class_reloptions 31 -#define Anum_pg_class_relpartbound 32 +#define Anum_pg_class_relrewrite 28 +#define Anum_pg_class_relfrozenxid 29 +#define Anum_pg_class_relminmxid 30 +#define Anum_pg_class_relacl 31 +#define Anum_pg_class_reloptions 32 +#define Anum_pg_class_relpartbound 33 /* ---------------- * initial contents of pg_class @@ -145,13 +147,13 @@ typedef FormData_pg_class *Form_pg_class; * Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId; * similarly, "1" in relminmxid stands for FirstMultiXactId */ -DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 32 0 t f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 33 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 78fd38bb169..82875d6b3d5 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -26,6 +26,7 @@ typedef enum OutputPluginOutputType typedef struct OutputPluginOptions { OutputPluginOutputType output_type; + bool receive_rewrites; } OutputPluginOptions; /* diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0970abca52a..aa430c843c0 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -337,6 +337,11 @@ struct ReorderBuffer void *private_data; /* + * Saved output plugin option + */ + bool output_rewrites; + + /* * Private memory context. */ MemoryContext context; |