aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/protocol.sgml4
-rw-r--r--doc/src/sgml/ref/create_publication.sgml3
-rw-r--r--src/backend/catalog/pg_publication.c10
-rw-r--r--src/backend/replication/logical/proto.c63
-rw-r--r--src/backend/replication/logical/tablesync.c56
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c28
-rw-r--r--src/include/replication/logicalproto.h2
-rw-r--r--src/test/regress/expected/publication.out6
-rw-r--r--src/test/regress/sql/publication.sql6
-rw-r--r--src/test/subscription/t/031_column_list.pl41
10 files changed, 145 insertions, 74 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 057c46f3f57..71b6b2a535f 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6544,7 +6544,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<para>
Next, the following message part appears for each column included in
- the publication (except generated columns):
+ the publication:
</para>
<variablelist>
@@ -7477,7 +7477,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</variablelist>
<para>
- Next, one of the following submessages appears for each column (except generated columns):
+ Next, one of the following submessages appears for each column:
<variablelist>
<varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index fd9c5deac95..d2cac06fd76 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -89,7 +89,8 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<para>
When a column list is specified, only the named columns are replicated.
- If no column list is specified, all columns of the table are replicated
+ The column list can contain generated columns as well. If no column list
+ is specified, all table columns (except generated columns) are replicated
through this publication, including any columns added later. It has no
effect on <literal>TRUNCATE</literal> commands. See
<xref linkend="logical-replication-col-lists"/> for details about column
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 7e5e357fd9e..17a6093d069 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -500,8 +500,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
* pub_collist_validate
* Process and validate the 'columns' list and ensure the columns are all
* valid to use for a publication. Checks for and raises an ERROR for
- * any; unknown columns, system columns, duplicate columns or generated
- * columns.
+ * any unknown columns, system columns, or duplicate columns.
*
* Looks up each column's attnum and returns a 0-based Bitmapset of the
* corresponding attnums.
@@ -511,7 +510,6 @@ pub_collist_validate(Relation targetrel, List *columns)
{
Bitmapset *set = NULL;
ListCell *lc;
- TupleDesc tupdesc = RelationGetDescr(targetrel);
foreach(lc, columns)
{
@@ -530,12 +528,6 @@ pub_collist_validate(Relation targetrel, List *columns)
errmsg("cannot use system column \"%s\" in publication column list",
colname));
- if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("cannot use generated column \"%s\" in publication column list",
- colname));
-
if (bms_is_member(attnum, set))
ereport(ERROR,
errcode(ERRCODE_DUPLICATE_OBJECT),
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 980f6e27410..ac4af53feba 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -41,19 +41,6 @@ static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
/*
- * Check if a column is covered by a column list.
- *
- * Need to be careful about NULL, which is treated as a column list covering
- * all columns.
- */
-static bool
-column_in_column_list(int attnum, Bitmapset *columns)
-{
- return (columns == NULL || bms_is_member(attnum, columns));
-}
-
-
-/*
* Write BEGIN to the output stream.
*/
void
@@ -781,10 +768,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
nliveatts++;
@@ -802,10 +786,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
Form_pg_type typclass;
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
if (isnull[i])
@@ -938,10 +919,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
nliveatts++;
@@ -959,10 +937,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
Form_pg_attribute att = TupleDescAttr(desc, i);
uint8 flags = 0;
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
@@ -1269,3 +1244,33 @@ logicalrep_message_type(LogicalRepMsgType action)
return err_unknown;
}
+
+/*
+ * Check if the column 'att' of a table should be published.
+ *
+ * 'columns' represents the column list specified for that table in the
+ * publication.
+ *
+ * Note that generated columns can be present only in 'columns' list.
+ */
+bool
+logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
+{
+ if (att->attisdropped)
+ return false;
+
+ /*
+ * Skip publishing generated columns if they are not included in the
+ * column list.
+ */
+ if (!columns && att->attgenerated)
+ return false;
+
+ /*
+ * Check if a column is covered by a column list.
+ */
+ if (columns && !bms_is_member(att->attnum, columns))
+ return false;
+
+ return true;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d4b5d210e3e..118503fcb76 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * message provides during replication.
+ *
+ * This function also returns (a) the relation qualifications to be used in
+ * the COPY command, and (b) whether the remote relation has published any
+ * generated column.
*/
static void
-fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
+ List **qual, bool *gencol_published)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
StringInfo pub_names = NULL;
Bitmapset *included_cols = NULL;
+ int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
lrel->nspname = nspname;
lrel->relname = relname;
@@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
WalRcvExecResult *pubres;
TupleTableSlot *tslot;
@@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname,
"SELECT a.attnum,"
" a.attname,"
" a.atttypid,"
- " a.attnum = ANY(i.indkey)"
+ " a.attnum = ANY(i.indkey)");
+
+ /* Generated columns can be replicated since version 18. */
+ if (server_version >= 180000)
+ appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+ appendStringInfo(&cmd,
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
@@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ (server_version >= 120000 && server_version < 180000 ?
"AND a.attgenerated = ''" : ""),
lrel->remoteid);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
- lengthof(attrRow), attrRow);
+ server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname,
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+ /* Remember if the remote table has published any generated column. */
+ if (server_version >= 180000 && !(*gencol_published))
+ {
+ *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
+ Assert(!isnull);
+ }
+
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* 3) one of the subscribed publications is declared as TABLES IN SCHEMA
* that includes this relation
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
/* Reuse the already-built pub_names. */
Assert(pub_names != NULL);
@@ -1106,10 +1123,12 @@ copy_table(Relation rel)
List *attnamelist;
ParseState *pstate;
List *options = NIL;
+ bool gencol_published = false;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &gencol_published);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1121,8 +1140,8 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ /* Regular table with no row filter or generated columns */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
{
appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1153,9 +1172,14 @@ copy_table(Relation rel)
{
/*
* For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
+ * (SELECT ...), but we can't just do SELECT * because we may need to
+ * copy only subset of columns including generated columns. For tables
+ * with any row filters, build a SELECT query with OR'ed row filters
+ * for COPY.
+ *
+ * We also need to use this same COPY (SELECT ...) syntax when
+ * generated columns are published, because copy of generated columns
+ * is not supported by the normal COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00e7024563e..12c17359063 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -766,16 +766,12 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
+ if (!logicalrep_should_publish_column(att, columns))
continue;
if (att->atttypid < FirstGenbkiObjectId)
continue;
- /* Skip this attribute if it's not present in the column list */
- if (columns != NULL && !bms_is_member(att->attnum, columns))
- continue;
-
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
@@ -1074,6 +1070,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
int i;
int nliveatts = 0;
TupleDesc desc = RelationGetDescr(relation);
+ bool att_gen_present = false;
pgoutput_ensure_entry_cxt(data, entry);
@@ -1085,17 +1082,30 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
+ if (att->attisdropped)
continue;
+ if (att->attgenerated)
+ {
+ /*
+ * Generated cols are skipped unless they are
+ * present in a column list.
+ */
+ if (!bms_is_member(att->attnum, cols))
+ continue;
+
+ att_gen_present = true;
+ }
+
nliveatts++;
}
/*
- * If column list includes all the columns of the table,
- * set it to NULL.
+ * Generated attributes are published only when they are
+ * present in the column list. Otherwise, a NULL column
+ * list means publish all columns.
*/
- if (bms_num_members(cols) == nliveatts)
+ if (!att_gen_present && bms_num_members(cols) == nliveatts)
{
bms_free(cols);
cols = NULL;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2ef..b219f226557 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -270,5 +270,7 @@ extern void logicalrep_read_stream_abort(StringInfo in,
LogicalRepStreamAbortData *abort_data,
bool read_abort_info);
extern const char *logicalrep_message_type(LogicalRepMsgType action);
+extern bool logicalrep_should_publish_column(Form_pg_attribute att,
+ Bitmapset *columns);
#endif /* LOGICAL_PROTO_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 660245ed0c4..d2ed1efc3bf 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -687,9 +687,6 @@ UPDATE testpub_tbl5 SET a = 1;
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-ERROR: cannot use generated column "d" in publication column list
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
ERROR: cannot use system column "ctid" in publication column list
@@ -717,6 +714,9 @@ UPDATE testpub_tbl5 SET a = 1;
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index f68a5b59862..12aea71c0f6 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -413,8 +413,6 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid);
@@ -435,6 +433,10 @@ ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9a97fa50203..e54861b599d 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1202,9 +1202,10 @@ $result = $node_publisher->safe_psql(
is( $result, qq(t
t), 'check the number of columns in the old tuple');
-# TEST: Generated and dropped columns are not considered for the column list.
-# So, the publication having a column list except for those columns and a
-# publication without any column (aka all columns as part of the columns
+# TEST: Dropped columns are not considered for the column list, and generated
+# columns are not replicated if they are not explicitly included in the column
+# list. So, the publication having a column list except for those columns and a
+# publication without any column list (aka all columns as part of the columns
# list) are considered to have the same column list.
$node_publisher->safe_psql(
'postgres', qq(
@@ -1275,6 +1276,40 @@ ok( $stderr =~
qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
'different column lists detected');
+# TEST: Generated columns are considered for the column list.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED);
+ INSERT INTO test_gen VALUES (0);
+ CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b);
+));
+
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE test_gen (a int PRIMARY KEY, b int);
+ CREATE SUBSCRIPTION sub_gen CONNECTION '$publisher_connstr' PUBLICATION pub_gen;
+));
+
+$node_subscriber->wait_for_subscription_sync;
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM test_gen ORDER BY a"),
+ qq(0|1),
+ 'initial replication with generated columns in column list');
+
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO test_gen VALUES (1);
+));
+
+$node_publisher->wait_for_catchup('sub_gen');
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM test_gen ORDER BY a"),
+ qq(0|1
+1|2),
+ 'replication with generated columns in column list');
+
# TEST: If the column list is changed after creating the subscription, we
# should catch the error reported by walsender.