diff options
author | Peter Eisentraut <peter@eisentraut.org> | 2020-04-06 15:15:52 +0200 |
---|---|---|
committer | Peter Eisentraut <peter@eisentraut.org> | 2020-04-06 15:15:52 +0200 |
commit | f1ac27bfda6ce8a399d8001843e9aefff5814f9b (patch) | |
tree | bd3a62808e434421ee2af2abe494a5308b465e7b /src/backend/replication/logical/worker.c | |
parent | b7ce6de93b59852c55d09acdaeebbf5aaf89114e (diff) | |
download | postgresql-f1ac27bfda6ce8a399d8001843e9aefff5814f9b.tar.gz postgresql-f1ac27bfda6ce8a399d8001843e9aefff5814f9b.zip |
Add logical replication support to replicate into partitioned tables
Mainly, this adds support code in logical/worker.c for applying
replicated operations whose target is a partitioned table to its
relevant partitions.
Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 316 |
1 files changed, 307 insertions, 9 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 673ebd211d1..a752a1224d6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,14 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); +static void apply_handle_tuple_routing(ResultRelInfo *relinfo, + EState *estate, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry, + CmdType operation); /* * Should this worker apply changes for given relation. @@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_insert_internal(estate->es_result_relation_info, estate, - remoteslot); + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_INSERT); + else + apply_handle_insert_internal(estate->es_result_relation_info, estate, + remoteslot); PopActiveSnapshot(); @@ -767,9 +780,13 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_update_internal(estate->es_result_relation_info, estate, - remoteslot, &newtup, rel); + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel, CMD_UPDATE); + else + apply_handle_update_internal(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel); PopActiveSnapshot(); @@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s) slot_store_cstrings(remoteslot, rel, oldtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_delete_internal(estate->es_result_relation_info, estate, - remoteslot, &rel->remoterel); + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_DELETE); + else + apply_handle_delete_internal(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); PopActiveSnapshot(); @@ -976,6 +997,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, } /* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ResultRelInfo *relinfo, + EState *estate, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry, + CmdType operation) +{ + Relation parentrel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; + ResultRelInfo *partrelinfo; + Relation partrel; + TupleTableSlot *remoteslot_part; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + MemoryContext oldctx; + + /* ModifyTableState is needed for ExecFindPartition(). */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel); + + /* + * Find the partition to which the "search tuple" belongs. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + partrel = partrelinfo->ri_RelationDesc; + + /* + * To perform any of the operations below, the tuple must match the + * partition's rowtype. Convert if needed or just copy, using a dedicated + * slot to store the tuple in any case. + */ + partinfo = partrelinfo->ri_PartitionInfo; + remoteslot_part = partinfo->pi_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, + remoteslot_part); + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); + slot_getallattrs(remoteslot_part); + } + MemoryContextSwitchTo(oldctx); + + estate->es_result_relation_info = partrelinfo; + switch (operation) + { + case CMD_INSERT: + apply_handle_insert_internal(partrelinfo, estate, + remoteslot_part); + break; + + case CMD_DELETE: + apply_handle_delete_internal(partrelinfo, estate, + remoteslot_part, + &relmapentry->remoterel); + break; + + case CMD_UPDATE: + + /* + * For UPDATE, depending on whether or not the updated tuple + * satisfies the partition's constraint, perform a simple UPDATE + * of the partition or move the updated tuple into a different + * suitable partition. + */ + { + AttrMap *attrmap = map ? map->attrMap : NULL; + LogicalRepRelMapEntry *part_entry; + TupleTableSlot *localslot; + ResultRelInfo *partrelinfo_new; + bool found; + + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + + /* Get the matching local tuple from the partition. */ + found = FindReplTupleInLocalRel(estate, partrel, + &part_entry->remoterel, + remoteslot_part, &localslot); + + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + if (found) + { + /* Apply the update. */ + slot_modify_cstrings(remoteslot_part, localslot, + part_entry, + newtup->values, newtup->changed); + MemoryContextSwitchTo(oldctx); + } + else + { + /* + * The tuple to be updated could not be found. + * + * TODO what to do here, change the log level to LOG + * perhaps? + */ + elog(DEBUG1, + "logical replication did not find row for update " + "in replication target relation \"%s\"", + RelationGetRelationName(partrel)); + } + + /* + * Does the updated tuple still satisfy the current + * partition's constraint? + */ + if (partrelinfo->ri_PartitionCheck == NULL || + ExecPartitionCheck(partrelinfo, remoteslot_part, estate, + false)) + { + /* + * Yes, so simply UPDATE the partition. We don't call + * apply_handle_update_internal() here, which would + * normally do the following work, to avoid repeating some + * work already done above to find the local tuple in the + * partition. + */ + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + ExecOpenIndices(partrelinfo, false); + + EvalPlanQualSetSlot(&epqstate, remoteslot_part); + ExecSimpleRelationUpdate(estate, &epqstate, localslot, + remoteslot_part); + ExecCloseIndices(partrelinfo); + EvalPlanQualEnd(&epqstate); + } + else + { + /* Move the tuple into the new partition. */ + + /* + * New partition will be found using tuple routing, which + * can only occur via the parent table. We might need to + * convert the tuple to the parent's rowtype. Note that + * this is the tuple found in the partition, not the + * original search tuple received by this function. + */ + if (map) + { + TupleConversionMap *PartitionToRootMap = + convert_tuples_by_name(RelationGetDescr(partrel), + RelationGetDescr(parentrel)); + + remoteslot = + execute_attr_map_slot(PartitionToRootMap->attrMap, + remoteslot_part, remoteslot); + } + else + { + remoteslot = ExecCopySlot(remoteslot, remoteslot_part); + slot_getallattrs(remoteslot); + } + + + /* Find the new partition. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo_new = ExecFindPartition(mtstate, relinfo, + proute, remoteslot, + estate); + MemoryContextSwitchTo(oldctx); + Assert(partrelinfo_new != partrelinfo); + + /* DELETE old tuple found in the old partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_delete_internal(partrelinfo, estate, + localslot, + &relmapentry->remoterel); + + /* INSERT new tuple into the new partition. */ + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrel = partrelinfo_new->ri_RelationDesc; + partinfo = partrelinfo_new->ri_PartitionInfo; + remoteslot_part = partinfo->pi_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, + &estate->es_tupleTable); + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + remoteslot_part = execute_attr_map_slot(map->attrMap, + remoteslot, + remoteslot_part); + } + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, + remoteslot); + slot_getallattrs(remoteslot); + } + MemoryContextSwitchTo(oldctx); + estate->es_result_relation_info = partrelinfo_new; + apply_handle_insert_internal(partrelinfo_new, estate, + remoteslot_part); + } + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } + + ExecCleanupTupleRouting(mtstate, proute); +} + +/* * Handle TRUNCATE message. * * TODO: FDW support @@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s) List *remote_relids = NIL; List *remote_rels = NIL; List *rels = NIL; + List *part_rels = NIL; List *relids = NIL; List *relids_logged = NIL; ListCell *lc; @@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s) relids = lappend_oid(relids, rel->localreloid); if (RelationIsLogicallyLogged(rel->localrel)) relids_logged = lappend_oid(relids_logged, rel->localreloid); + + /* + * Truncate partitions if we got a message to truncate a partitioned + * table. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + RowExclusiveLock, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * Ignore temp tables of other backends. See similar code in + * ExecuteTruncate(). + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, RowExclusiveLock); + continue; + } + + rels = lappend(rels, childrel); + part_rels = lappend(part_rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } } /* @@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s) logicalrep_rel_close(rel, NoLock); } + foreach(lc, part_rels) + { + Relation rel = lfirst(lc); + + table_close(rel, NoLock); + } CommandCounterIncrement(); } |