diff options
-rw-r--r-- | doc/src/sgml/protocol.sgml | 4 | ||||
-rw-r--r-- | doc/src/sgml/ref/create_publication.sgml | 3 | ||||
-rw-r--r-- | src/backend/catalog/pg_publication.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 63 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 56 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 28 | ||||
-rw-r--r-- | src/include/replication/logicalproto.h | 2 | ||||
-rw-r--r-- | src/test/regress/expected/publication.out | 6 | ||||
-rw-r--r-- | src/test/regress/sql/publication.sql | 6 | ||||
-rw-r--r-- | src/test/subscription/t/031_column_list.pl | 41 |
10 files changed, 145 insertions, 74 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 057c46f3f57..71b6b2a535f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6544,7 +6544,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" <para> Next, the following message part appears for each column included in - the publication (except generated columns): + the publication: </para> <variablelist> @@ -7477,7 +7477,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> <para> - Next, one of the following submessages appears for each column (except generated columns): + Next, one of the following submessages appears for each column: <variablelist> <varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index fd9c5deac95..d2cac06fd76 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -89,7 +89,8 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <para> When a column list is specified, only the named columns are replicated. - If no column list is specified, all columns of the table are replicated + The column list can contain generated columns as well. If no column list + is specified, all table columns (except generated columns) are replicated through this publication, including any columns added later. It has no effect on <literal>TRUNCATE</literal> commands. See <xref linkend="logical-replication-col-lists"/> for details about column diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7e5e357fd9e..17a6093d069 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -500,8 +500,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, * pub_collist_validate * Process and validate the 'columns' list and ensure the columns are all * valid to use for a publication. Checks for and raises an ERROR for - * any; unknown columns, system columns, duplicate columns or generated - * columns. + * any unknown columns, system columns, or duplicate columns. * * Looks up each column's attnum and returns a 0-based Bitmapset of the * corresponding attnums. @@ -511,7 +510,6 @@ pub_collist_validate(Relation targetrel, List *columns) { Bitmapset *set = NULL; ListCell *lc; - TupleDesc tupdesc = RelationGetDescr(targetrel); foreach(lc, columns) { @@ -530,12 +528,6 @@ pub_collist_validate(Relation targetrel, List *columns) errmsg("cannot use system column \"%s\" in publication column list", colname)); - if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) - ereport(ERROR, - errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("cannot use generated column \"%s\" in publication column list", - colname)); - if (bms_is_member(attnum, set)) ereport(ERROR, errcode(ERRCODE_DUPLICATE_OBJECT), diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 980f6e27410..ac4af53feba 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -41,19 +41,6 @@ static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); /* - * Check if a column is covered by a column list. - * - * Need to be careful about NULL, which is treated as a column list covering - * all columns. - */ -static bool -column_in_column_list(int attnum, Bitmapset *columns) -{ - return (columns == NULL || bms_is_member(attnum, columns)); -} - - -/* * Write BEGIN to the output stream. */ void @@ -781,10 +768,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; nliveatts++; @@ -802,10 +786,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; if (isnull[i]) @@ -938,10 +919,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; nliveatts++; @@ -959,10 +937,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ @@ -1269,3 +1244,33 @@ logicalrep_message_type(LogicalRepMsgType action) return err_unknown; } + +/* + * Check if the column 'att' of a table should be published. + * + * 'columns' represents the column list specified for that table in the + * publication. + * + * Note that generated columns can be present only in 'columns' list. + */ +bool +logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns) +{ + if (att->attisdropped) + return false; + + /* + * Skip publishing generated columns if they are not included in the + * column list. + */ + if (!columns && att->attgenerated) + return false; + + /* + * Check if a column is covered by a column list. + */ + if (columns && !bms_is_member(att->attnum, columns)) + return false; + + return true; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d4b5d210e3e..118503fcb76 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. This function also returns the relation - * qualifications to be used in the COPY command. + * message provides during replication. + * + * This function also returns (a) the relation qualifications to be used in + * the COPY command, and (b) whether the remote relation has published any + * generated column. */ static void -fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel, List **qual) +fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, + List **qual, bool *gencol_published) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; StringInfo pub_names = NULL; Bitmapset *included_cols = NULL; + int server_version = walrcv_server_version(LogRepWorkerWalRcvConn); lrel->nspname = nspname; lrel->relname = relname; @@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname, * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { WalRcvExecResult *pubres; TupleTableSlot *tslot; @@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname, "SELECT a.attnum," " a.attname," " a.atttypid," - " a.attnum = ANY(i.indkey)" + " a.attnum = ANY(i.indkey)"); + + /* Generated columns can be replicated since version 18. */ + if (server_version >= 180000) + appendStringInfo(&cmd, ", a.attgenerated != ''"); + + appendStringInfo(&cmd, " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" " ON (i.indexrelid = pg_get_replica_identity_index(%u))" @@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname, " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, - (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? + (server_version >= 120000 && server_version < 180000 ? "AND a.attgenerated = ''" : ""), lrel->remoteid); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, - lengthof(attrRow), attrRow); + server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname, if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); + /* Remember if the remote table has published any generated column. */ + if (server_version >= 180000 && !(*gencol_published)) + { + *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull)); + Assert(!isnull); + } + /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", @@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname, * 3) one of the subscribed publications is declared as TABLES IN SCHEMA * that includes this relation */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { /* Reuse the already-built pub_names. */ Assert(pub_names != NULL); @@ -1106,10 +1123,12 @@ copy_table(Relation rel) List *attnamelist; ParseState *pstate; List *options = NIL; + bool gencol_published = false; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); + RelationGetRelationName(rel), &lrel, &qual, + &gencol_published); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1121,8 +1140,8 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - /* Regular table with no row filter */ - if (lrel.relkind == RELKIND_RELATION && qual == NIL) + /* Regular table with no row filter or generated columns */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); @@ -1153,9 +1172,14 @@ copy_table(Relation rel) { /* * For non-tables and tables with row filters, we need to do COPY - * (SELECT ...), but we can't just do SELECT * because we need to not - * copy generated columns. For tables with any row filters, build a - * SELECT query with OR'ed row filters for COPY. + * (SELECT ...), but we can't just do SELECT * because we may need to + * copy only subset of columns including generated columns. For tables + * with any row filters, build a SELECT query with OR'ed row filters + * for COPY. + * + * We also need to use this same COPY (SELECT ...) syntax when + * generated columns are published, because copy of generated columns + * is not supported by the normal COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 00e7024563e..12c17359063 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -766,16 +766,12 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (!logicalrep_should_publish_column(att, columns)) continue; if (att->atttypid < FirstGenbkiObjectId) continue; - /* Skip this attribute if it's not present in the column list */ - if (columns != NULL && !bms_is_member(att->attnum, columns)) - continue; - OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); @@ -1074,6 +1070,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, int i; int nliveatts = 0; TupleDesc desc = RelationGetDescr(relation); + bool att_gen_present = false; pgoutput_ensure_entry_cxt(data, entry); @@ -1085,17 +1082,30 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; + if (att->attgenerated) + { + /* + * Generated cols are skipped unless they are + * present in a column list. + */ + if (!bms_is_member(att->attnum, cols)) + continue; + + att_gen_present = true; + } + nliveatts++; } /* - * If column list includes all the columns of the table, - * set it to NULL. + * Generated attributes are published only when they are + * present in the column list. Otherwise, a NULL column + * list means publish all columns. */ - if (bms_num_members(cols) == nliveatts) + if (!att_gen_present && bms_num_members(cols) == nliveatts) { bms_free(cols); cols = NULL; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2ef..b219f226557 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -270,5 +270,7 @@ extern void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info); extern const char *logicalrep_message_type(LogicalRepMsgType action); +extern bool logicalrep_should_publish_column(Form_pg_attribute att, + Bitmapset *columns); #endif /* LOGICAL_PROTO_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 660245ed0c4..d2ed1efc3bf 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -687,9 +687,6 @@ UPDATE testpub_tbl5 SET a = 1; ERROR: cannot update table "testpub_tbl5" DETAIL: Column list used by the publication does not cover the replica identity. ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list -ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -ERROR: cannot use generated column "d" in publication column list -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); ERROR: cannot use system column "ctid" in publication column list @@ -717,6 +714,9 @@ UPDATE testpub_tbl5 SET a = 1; ERROR: cannot update table "testpub_tbl5" DETAIL: Column list used by the publication does not cover the replica identity. ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- ok: generated column "d" can be in the list too +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; -- error: change the replica identity to "b", and column list to (a, c) -- then update fails, because (a, c) does not cover replica identity ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index f68a5b59862..12aea71c0f6 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -413,8 +413,6 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); UPDATE testpub_tbl5 SET a = 1; ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list -ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid); @@ -435,6 +433,10 @@ ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; UPDATE testpub_tbl5 SET a = 1; ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- ok: generated column "d" can be in the list too +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; + -- error: change the replica identity to "b", and column list to (a, c) -- then update fails, because (a, c) does not cover replica identity ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 9a97fa50203..e54861b599d 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -1202,9 +1202,10 @@ $result = $node_publisher->safe_psql( is( $result, qq(t t), 'check the number of columns in the old tuple'); -# TEST: Generated and dropped columns are not considered for the column list. -# So, the publication having a column list except for those columns and a -# publication without any column (aka all columns as part of the columns +# TEST: Dropped columns are not considered for the column list, and generated +# columns are not replicated if they are not explicitly included in the column +# list. So, the publication having a column list except for those columns and a +# publication without any column list (aka all columns as part of the columns # list) are considered to have the same column list. $node_publisher->safe_psql( 'postgres', qq( @@ -1275,6 +1276,40 @@ ok( $stderr =~ qr/cannot use different column lists for table "public.test_mix_1" in different publications/, 'different column lists detected'); +# TEST: Generated columns are considered for the column list. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED); + INSERT INTO test_gen VALUES (0); + CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b); +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE test_gen (a int PRIMARY KEY, b int); + CREATE SUBSCRIPTION sub_gen CONNECTION '$publisher_connstr' PUBLICATION pub_gen; +)); + +$node_subscriber->wait_for_subscription_sync; + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM test_gen ORDER BY a"), + qq(0|1), + 'initial replication with generated columns in column list'); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO test_gen VALUES (1); +)); + +$node_publisher->wait_for_catchup('sub_gen'); + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM test_gen ORDER BY a"), + qq(0|1 +1|2), + 'replication with generated columns in column list'); + # TEST: If the column list is changed after creating the subscription, we # should catch the error reported by walsender. |