aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorPeter Eisentraut <peter@eisentraut.org>2020-04-06 15:15:52 +0200
committerPeter Eisentraut <peter@eisentraut.org>2020-04-06 15:15:52 +0200
commitf1ac27bfda6ce8a399d8001843e9aefff5814f9b (patch)
treebd3a62808e434421ee2af2abe494a5308b465e7b /src/backend/replication/logical/worker.c
parentb7ce6de93b59852c55d09acdaeebbf5aaf89114e (diff)
downloadpostgresql-f1ac27bfda6ce8a399d8001843e9aefff5814f9b.tar.gz
postgresql-f1ac27bfda6ce8a399d8001843e9aefff5814f9b.zip
Add logical replication support to replicate into partitioned tables
Mainly, this adds support code in logical/worker.c for applying replicated operations whose target is a partitioned table to its relevant partitions. Author: Amit Langote <amitlangote09@gmail.com> Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com> Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Reviewed-by: Petr Jelinek <petr@2ndquadrant.com> Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c316
1 files changed, 307 insertions, 9 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 673ebd211d1..a752a1224d6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -29,11 +29,14 @@
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
+#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
@@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
+static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
+ EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry,
+ CmdType operation);
/*
* Should this worker apply changes for given relation.
@@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_insert_internal(estate->es_result_relation_info, estate,
- remoteslot);
+ /* For a partitioned table, insert the tuple into a partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, NULL, rel, CMD_INSERT);
+ else
+ apply_handle_insert_internal(estate->es_result_relation_info, estate,
+ remoteslot);
PopActiveSnapshot();
@@ -767,9 +780,13 @@ apply_handle_update(StringInfo s)
has_oldtup ? oldtup.values : newtup.values);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_update_internal(estate->es_result_relation_info, estate,
- remoteslot, &newtup, rel);
+ /* For a partitioned table, apply update to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, &newtup, rel, CMD_UPDATE);
+ else
+ apply_handle_update_internal(estate->es_result_relation_info, estate,
+ remoteslot, &newtup, rel);
PopActiveSnapshot();
@@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s)
slot_store_cstrings(remoteslot, rel, oldtup.values);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_delete_internal(estate->es_result_relation_info, estate,
- remoteslot, &rel->remoterel);
+ /* For a partitioned table, apply delete to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, NULL, rel, CMD_DELETE);
+ else
+ apply_handle_delete_internal(estate->es_result_relation_info, estate,
+ remoteslot, &rel->remoterel);
PopActiveSnapshot();
@@ -976,6 +997,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
}
/*
+ * This handles insert, update, delete on a partitioned table.
+ */
+static void
+apply_handle_tuple_routing(ResultRelInfo *relinfo,
+ EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry,
+ CmdType operation)
+{
+ Relation parentrel = relinfo->ri_RelationDesc;
+ ModifyTableState *mtstate = NULL;
+ PartitionTupleRouting *proute = NULL;
+ ResultRelInfo *partrelinfo;
+ Relation partrel;
+ TupleTableSlot *remoteslot_part;
+ PartitionRoutingInfo *partinfo;
+ TupleConversionMap *map;
+ MemoryContext oldctx;
+
+ /* ModifyTableState is needed for ExecFindPartition(). */
+ mtstate = makeNode(ModifyTableState);
+ mtstate->ps.plan = NULL;
+ mtstate->ps.state = estate;
+ mtstate->operation = operation;
+ mtstate->resultRelInfo = relinfo;
+ proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
+
+ /*
+ * Find the partition to which the "search tuple" belongs.
+ */
+ Assert(remoteslot != NULL);
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
+ remoteslot, estate);
+ Assert(partrelinfo != NULL);
+ partrel = partrelinfo->ri_RelationDesc;
+
+ /*
+ * To perform any of the operations below, the tuple must match the
+ * partition's rowtype. Convert if needed or just copy, using a dedicated
+ * slot to store the tuple in any case.
+ */
+ partinfo = partrelinfo->ri_PartitionInfo;
+ remoteslot_part = partinfo->pi_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
+ map = partinfo->pi_RootToPartitionMap;
+ if (map != NULL)
+ remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+ remoteslot_part);
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
+ slot_getallattrs(remoteslot_part);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ estate->es_result_relation_info = partrelinfo;
+ switch (operation)
+ {
+ case CMD_INSERT:
+ apply_handle_insert_internal(partrelinfo, estate,
+ remoteslot_part);
+ break;
+
+ case CMD_DELETE:
+ apply_handle_delete_internal(partrelinfo, estate,
+ remoteslot_part,
+ &relmapentry->remoterel);
+ break;
+
+ case CMD_UPDATE:
+
+ /*
+ * For UPDATE, depending on whether or not the updated tuple
+ * satisfies the partition's constraint, perform a simple UPDATE
+ * of the partition or move the updated tuple into a different
+ * suitable partition.
+ */
+ {
+ AttrMap *attrmap = map ? map->attrMap : NULL;
+ LogicalRepRelMapEntry *part_entry;
+ TupleTableSlot *localslot;
+ ResultRelInfo *partrelinfo_new;
+ bool found;
+
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+
+ /* Get the matching local tuple from the partition. */
+ found = FindReplTupleInLocalRel(estate, partrel,
+ &part_entry->remoterel,
+ remoteslot_part, &localslot);
+
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ if (found)
+ {
+ /* Apply the update. */
+ slot_modify_cstrings(remoteslot_part, localslot,
+ part_entry,
+ newtup->values, newtup->changed);
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ {
+ /*
+ * The tuple to be updated could not be found.
+ *
+ * TODO what to do here, change the log level to LOG
+ * perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row for update "
+ "in replication target relation \"%s\"",
+ RelationGetRelationName(partrel));
+ }
+
+ /*
+ * Does the updated tuple still satisfy the current
+ * partition's constraint?
+ */
+ if (partrelinfo->ri_PartitionCheck == NULL ||
+ ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
+ false))
+ {
+ /*
+ * Yes, so simply UPDATE the partition. We don't call
+ * apply_handle_update_internal() here, which would
+ * normally do the following work, to avoid repeating some
+ * work already done above to find the local tuple in the
+ * partition.
+ */
+ EPQState epqstate;
+
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ ExecOpenIndices(partrelinfo, false);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot_part);
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot,
+ remoteslot_part);
+ ExecCloseIndices(partrelinfo);
+ EvalPlanQualEnd(&epqstate);
+ }
+ else
+ {
+ /* Move the tuple into the new partition. */
+
+ /*
+ * New partition will be found using tuple routing, which
+ * can only occur via the parent table. We might need to
+ * convert the tuple to the parent's rowtype. Note that
+ * this is the tuple found in the partition, not the
+ * original search tuple received by this function.
+ */
+ if (map)
+ {
+ TupleConversionMap *PartitionToRootMap =
+ convert_tuples_by_name(RelationGetDescr(partrel),
+ RelationGetDescr(parentrel));
+
+ remoteslot =
+ execute_attr_map_slot(PartitionToRootMap->attrMap,
+ remoteslot_part, remoteslot);
+ }
+ else
+ {
+ remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
+ slot_getallattrs(remoteslot);
+ }
+
+
+ /* Find the new partition. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo_new = ExecFindPartition(mtstate, relinfo,
+ proute, remoteslot,
+ estate);
+ MemoryContextSwitchTo(oldctx);
+ Assert(partrelinfo_new != partrelinfo);
+
+ /* DELETE old tuple found in the old partition. */
+ estate->es_result_relation_info = partrelinfo;
+ apply_handle_delete_internal(partrelinfo, estate,
+ localslot,
+ &relmapentry->remoterel);
+
+ /* INSERT new tuple into the new partition. */
+
+ /*
+ * Convert the replacement tuple to match the destination
+ * partition rowtype.
+ */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrel = partrelinfo_new->ri_RelationDesc;
+ partinfo = partrelinfo_new->ri_PartitionInfo;
+ remoteslot_part = partinfo->pi_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel,
+ &estate->es_tupleTable);
+ map = partinfo->pi_RootToPartitionMap;
+ if (map != NULL)
+ {
+ remoteslot_part = execute_attr_map_slot(map->attrMap,
+ remoteslot,
+ remoteslot_part);
+ }
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part,
+ remoteslot);
+ slot_getallattrs(remoteslot);
+ }
+ MemoryContextSwitchTo(oldctx);
+ estate->es_result_relation_info = partrelinfo_new;
+ apply_handle_insert_internal(partrelinfo_new, estate,
+ remoteslot_part);
+ }
+ }
+ break;
+
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) operation);
+ break;
+ }
+
+ ExecCleanupTupleRouting(mtstate, proute);
+}
+
+/*
* Handle TRUNCATE message.
*
* TODO: FDW support
@@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s)
List *remote_relids = NIL;
List *remote_rels = NIL;
List *rels = NIL;
+ List *part_rels = NIL;
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
@@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s)
relids = lappend_oid(relids, rel->localreloid);
if (RelationIsLogicallyLogged(rel->localrel))
relids_logged = lappend_oid(relids_logged, rel->localreloid);
+
+ /*
+ * Truncate partitions if we got a message to truncate a partitioned
+ * table.
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ ListCell *child;
+ List *children = find_all_inheritors(rel->localreloid,
+ RowExclusiveLock,
+ NULL);
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ if (list_member_oid(relids, childrelid))
+ continue;
+
+ /* find_all_inheritors already got lock */
+ childrel = table_open(childrelid, NoLock);
+
+ /*
+ * Ignore temp tables of other backends. See similar code in
+ * ExecuteTruncate().
+ */
+ if (RELATION_IS_OTHER_TEMP(childrel))
+ {
+ table_close(childrel, RowExclusiveLock);
+ continue;
+ }
+
+ rels = lappend(rels, childrel);
+ part_rels = lappend(part_rels, childrel);
+ relids = lappend_oid(relids, childrelid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(childrel))
+ relids_logged = lappend_oid(relids_logged, childrelid);
+ }
+ }
}
/*
@@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s)
logicalrep_rel_close(rel, NoLock);
}
+ foreach(lc, part_rels)
+ {
+ Relation rel = lfirst(lc);
+
+ table_close(rel, NoLock);
+ }
CommandCounterIncrement();
}