aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2022-03-26 00:45:21 +0100
committerTomas Vondra <tomas.vondra@postgresql.org>2022-03-26 01:01:27 +0100
commit923def9a533a7d986acfb524139d8b9e5466d0a5 (patch)
treeb6ce8d5bfe8d932e3cc89e52aba68519558e8033 /src/backend/replication/logical
parent05843b1aa49df2ecc9b97c693b755bd1b6f856a9 (diff)
downloadpostgresql-923def9a533a7d986acfb524139d8b9e5466d0a5.tar.gz
postgresql-923def9a533a7d986acfb524139d8b9e5466d0a5.zip
Allow specifying column lists for logical replication
This allows specifying an optional column list when adding a table to logical replication. The column list may be specified after the table name, enclosed in parentheses. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. For UPDATE/DELETE publications, the column list needs to cover all REPLICA IDENTITY columns. For INSERT publications, the column list is arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if the table uses REPLICA IDENTITY FULL, column list is not allowed. The column list can contain only simple column references. Complex expressions, function calls etc. are not allowed. This restriction could be relaxed in the future. During the initial table synchronization, only columns included in the column list are copied to the subscriber. If the subscription has several publications, containing the same table with different column lists, columns specified in any of the lists will be copied. This means all columns are replicated if the table has no column list at all (which is treated as column list with all columns), or when of the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that matches the schema of the table). For partitioned tables, publish_via_partition_root determines whether the column list for the root or the leaf relation will be used. If the parameter is 'false' (the default), the list defined for the leaf relation is used. Otherwise, the column list for the root partition will be used. Psql commands \dRp+ and \d <table-name> now display any column lists. Author: Tomas Vondra, Alvaro Herrera, Rahila Syed Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed, Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/proto.c61
-rw-r--r--src/backend/replication/logical/tablesync.c153
2 files changed, 192 insertions, 22 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3dbe85d61a2..18d3cbb9248 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,10 +29,11 @@
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+ Bitmapset *columns);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
- bool binary);
+ bool binary, Bitmapset *columns);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -40,6 +41,19 @@ static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
/*
+ * Check if a column is covered by a column list.
+ *
+ * Need to be careful about NULL, which is treated as a column list covering
+ * all columns.
+ */
+static bool
+column_in_column_list(int attnum, Bitmapset *columns)
+{
+ return (columns == NULL || bms_is_member(attnum, columns));
+}
+
+
+/*
* Write BEGIN to the output stream.
*/
void
@@ -398,7 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- TupleTableSlot *newslot, bool binary)
+ TupleTableSlot *newslot, bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -410,7 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
@@ -443,7 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
- bool binary)
+ bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -464,11 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newslot, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
@@ -537,7 +551,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldslot, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
/*
@@ -702,7 +716,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+ Bitmapset *columns)
{
char *relname;
@@ -724,7 +739,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
- logicalrep_write_attrs(out, rel);
+ logicalrep_write_attrs(out, rel, columns);
}
/*
@@ -801,7 +816,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
- bool binary)
+ bool binary, Bitmapset *columns)
{
TupleDesc desc;
Datum *values;
@@ -813,8 +828,14 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
for (i = 0; i < desc->natts; i++)
{
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (!column_in_column_list(att->attnum, columns))
continue;
+
nliveatts++;
}
pq_sendint16(out, nliveatts);
@@ -833,6 +854,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
if (att->attisdropped || att->attgenerated)
continue;
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
if (isnull[i])
{
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -954,7 +978,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
TupleDesc desc;
int i;
@@ -967,8 +991,14 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
/* send number of live attributes */
for (i = 0; i < desc->natts; i++)
{
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
continue;
+
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
nliveatts++;
}
pq_sendint16(out, nliveatts);
@@ -987,6 +1017,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
if (att->attisdropped || att->attgenerated)
continue;
+ if (!column_in_column_list(att->attnum, columns))
+ continue;
+
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d8b12d94bc3..697fb23634c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -113,6 +113,7 @@
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname,
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
ListCell *lc;
bool first;
+ Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- /* Now fetch columns. */
+
+ /*
+ * Get column lists for each relation.
+ *
+ * For initial synchronization, column lists can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any column list
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ *
+ * We need to do this before fetching info about column names and types,
+ * so that we can skip columns that should not be replicated.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ WalRcvExecResult *pubres;
+ TupleTableSlot *slot;
+ Oid attrsRow[] = {INT2OID};
+ StringInfoData pub_names;
+ bool first = true;
+
+ initStringInfo(&pub_names);
+ foreach(lc, MySubscription->publications)
+ {
+ if (!first)
+ appendStringInfo(&pub_names, ", ");
+ appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
+ first = false;
+ }
+
+ /*
+ * Fetch info about column lists for the relation (from all the
+ * publications). We unnest the int2vector values, because that
+ * makes it easier to combine lists by simply adding the attnums
+ * to a new bitmap (without having to parse the int2vector data).
+ * This preserves NULL values, so that if one of the publications
+ * has no column list, we'll know that.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT unnest"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
+ " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrsRow), attrsRow);
+
+ if (pubres->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, pubres->err)));
+
+ /*
+ * Merge the column lists (from different publications) by creating
+ * a single bitmap with all the attnums. If we find a NULL value,
+ * that means one of the publications has no column list for the
+ * table we're syncing.
+ */
+ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+ {
+ Datum cfval = slot_getattr(slot, 1, &isnull);
+
+ /* NULL means empty column list, so we're done. */
+ if (isnull)
+ {
+ bms_free(included_cols);
+ included_cols = NULL;
+ break;
+ }
+
+ included_cols = bms_add_member(included_cols,
+ DatumGetInt16(cfval));
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(pubres);
+
+ pfree(pub_names.data);
+ }
+
+ /*
+ * Now fetch column names and types.
+ */
resetStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT a.attname,"
+ "SELECT a.attnum,"
+ " a.attname,"
" a.atttypid,"
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
@@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL;
+ /*
+ * Store the columns as a list of names. Ignore those that are not
+ * present in the column list, if there is one.
+ */
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
- lrel->attnames[natt] =
- TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ char *rel_colname;
+ AttrNumber attnum;
+
+ attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
- lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+ /* If the column is not in the column list, skip it. */
+ if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+ {
+ ExecClearTuple(slot);
+ continue;
+ }
+
+ rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ lrel->attnames[natt] = rel_colname;
+ lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
- if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+ if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
/* Should never happen. */
@@ -931,8 +1052,24 @@ copy_table(Relation rel)
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ {
+ appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're replicating
+ * all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
+
+ appendStringInfo(&cmd, ") TO STDOUT");
+ }
else
{
/*