diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2017-01-19 12:00:00 -0500 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2017-01-20 09:04:49 -0500 |
commit | 665d1fad99e7b11678b0d5fa24d2898424243cd6 (patch) | |
tree | eefe3eb528f840780aef6c09939a1844dbabb30a /src/backend/commands/subscriptioncmds.c | |
parent | ba61a04bc7fefeee03416d9911eb825c4897c223 (diff) | |
download | postgresql-665d1fad99e7b11678b0d5fa24d2898424243cd6.tar.gz postgresql-665d1fad99e7b11678b0d5fa24d2898424243cd6.zip |
Logical replication
- Add PUBLICATION catalogs and DDL
- Add SUBSCRIPTION catalog and DDL
- Define logical replication protocol and output plugin
- Add logical replication workers
From: Petr Jelinek <petr@2ndquadrant.com>
Reviewed-by: Steve Singer <steve@ssinger.info>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Erik Rijkers <er@xs4all.nl>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c new file mode 100644 index 00000000000..1448ee3beea --- /dev/null +++ b/src/backend/commands/subscriptioncmds.c @@ -0,0 +1,643 @@ +/*------------------------------------------------------------------------- + * + * subscriptioncmds.c + * subscription catalog manipulation functions + * + * Copyright (c) 2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * subscriptioncmds.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "access/heapam.h" +#include "access/htup_details.h" + +#include "catalog/indexing.h" +#include "catalog/objectaccess.h" +#include "catalog/objectaddress.h" +#include "catalog/pg_type.h" +#include "catalog/pg_subscription.h" + +#include "commands/defrem.h" +#include "commands/event_trigger.h" +#include "commands/subscriptioncmds.h" + +#include "replication/logicallauncher.h" +#include "replication/origin.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" + +#include "storage/lmgr.h" + +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +/* + * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. + * + * Since not all options can be specified in both commands, this function + * will report an error on options if the target output pointer is NULL to + * accomodate that. + */ +static void +parse_subscription_options(List *options, char **conninfo, + List **publications, bool *enabled_given, + bool *enabled, bool *create_slot, char **slot_name) +{ + ListCell *lc; + bool create_slot_given = false; + + if (conninfo) + *conninfo = NULL; + if (publications) + *publications = NIL; + if (enabled) + { + *enabled_given = false; + *enabled = true; + } + if (create_slot) + *create_slot = true; + if (slot_name) + *slot_name = NULL; + + /* Parse options */ + foreach (lc, options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "conninfo") == 0 && conninfo) + { + if (*conninfo) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *conninfo = defGetString(defel); + } + else if (strcmp(defel->defname, "publication") == 0 && publications) + { + if (*publications) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *publications = defGetStringList(defel); + } + else if (strcmp(defel->defname, "enabled") == 0 && enabled) + { + if (*enabled_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *enabled_given = true; + *enabled = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "disabled") == 0 && enabled) + { + if (*enabled_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *enabled_given = true; + *enabled = !defGetBoolean(defel); + } + else if (strcmp(defel->defname, "create slot") == 0 && create_slot) + { + if (create_slot_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + create_slot_given = true; + *create_slot = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "nocreate slot") == 0 && create_slot) + { + if (create_slot_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + create_slot_given = true; + *create_slot = !defGetBoolean(defel); + } + else if (strcmp(defel->defname, "slot name") == 0 && slot_name) + { + if (*slot_name) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *slot_name = defGetString(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Auxiliary function to return a text array out of a list of String nodes. + */ +static Datum +publicationListToArray(List *publist) +{ + ArrayType *arr; + Datum *datums; + int j = 0; + ListCell *cell; + MemoryContext memcxt; + MemoryContext oldcxt; + + /* Create memory context for temporary allocations. */ + memcxt = AllocSetContextCreate(CurrentMemoryContext, + "publicationListToArray to array", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(memcxt); + + datums = palloc(sizeof(text *) * list_length(publist)); + foreach(cell, publist) + { + char *name = strVal(lfirst(cell)); + ListCell *pcell; + + /* Check for duplicates. */ + foreach(pcell, publist) + { + char *pname = strVal(lfirst(cell)); + + if (name == pname) + break; + + if (strcmp(name, pname) == 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" used more than once", + pname))); + } + + datums[j++] = CStringGetTextDatum(name); + } + + MemoryContextSwitchTo(oldcxt); + + arr = construct_array(datums, list_length(publist), + TEXTOID, -1, false, 'i'); + MemoryContextDelete(memcxt); + + return PointerGetDatum(arr); +} + +/* + * Create new subscription. + */ +ObjectAddress +CreateSubscription(CreateSubscriptionStmt *stmt) +{ + Relation rel; + ObjectAddress myself; + Oid subid; + bool nulls[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + bool enabled_given; + bool enabled; + char *conninfo; + char *slotname; + char originname[NAMEDATALEN]; + bool create_slot; + List *publications; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to create subscriptions")))); + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + /* Check if name is used */ + subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(stmt->subname)); + if (OidIsValid(subid)) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("subscription \"%s\" already exists", + stmt->subname))); + } + + /* + * Parse and check options. + * Connection and publication should not be specified here. + */ + parse_subscription_options(stmt->options, NULL, NULL, + &enabled_given, &enabled, + &create_slot, &slotname); + if (slotname == NULL) + slotname = stmt->subname; + + conninfo = stmt->conninfo; + publications = stmt->publication; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Check the connection info string. */ + walrcv_check_conninfo(conninfo); + + /* Everything ok, form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId); + values[Anum_pg_subscription_subname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); + values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(GetUserId()); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publications); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog. */ + subid = simple_heap_insert(rel, tup); + CatalogUpdateIndexes(rel, tup); + heap_freetuple(tup); + + snprintf(originname, sizeof(originname), "pg_%u", subid); + replorigin_create(originname); + + /* + * If requested, create the replication slot on remote side for our + * newly created subscription. + */ + if (create_slot) + { + XLogRecPtr lsn; + char *err; + WalReceiverConn *wrconn; + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + walrcv_create_slot(wrconn, slotname, false, &lsn); + ereport(NOTICE, + (errmsg("created replication slot \"%s\" on publisher", + slotname))); + + /* And we are done with the remote side. */ + walrcv_disconnect(wrconn); + } + + heap_close(rel, RowExclusiveLock); + + ApplyLauncherWakeupAtCommit(); + + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); + + return myself; +} + +/* + * Alter the existing subscription. + */ +ObjectAddress +AlterSubscription(AlterSubscriptionStmt *stmt) +{ + Relation rel; + ObjectAddress myself; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + Oid subid; + bool enabled_given; + bool enabled; + char *conninfo; + char *slot_name; + List *publications; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(stmt->subname)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", + stmt->subname))); + + /* must be owner */ + if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, + stmt->subname); + + subid = HeapTupleGetOid(tup); + + /* Parse options. */ + parse_subscription_options(stmt->options, &conninfo, &publications, + &enabled_given, &enabled, + NULL, &slot_name); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + if (enabled_given) + { + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + replaces[Anum_pg_subscription_subenabled - 1] = true; + } + if (conninfo) + { + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + replaces[Anum_pg_subscription_subconninfo - 1] = true; + } + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + if (publications != NIL) + { + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publications); + replaces[Anum_pg_subscription_subpublications - 1] = true; + } + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + /* Cleanup. */ + heap_freetuple(tup); + heap_close(rel, RowExclusiveLock); + + InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + + return myself; +} + +/* + * Drop a subscription + */ +void +DropSubscription(DropSubscriptionStmt *stmt) +{ + Relation rel; + ObjectAddress myself; + HeapTuple tup; + Oid subid; + Datum datum; + bool isnull; + char *subname; + char *conninfo; + char *slotname; + char originname[NAMEDATALEN]; + char *err = NULL; + RepOriginId originid; + WalReceiverConn *wrconn = NULL; + StringInfoData cmd; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(stmt->subname)); + + if (!HeapTupleIsValid(tup)) + { + heap_close(rel, NoLock); + + if (!stmt->missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", + stmt->subname))); + else + ereport(NOTICE, + (errmsg("subscription \"%s\" does not exist, skipping", + stmt->subname))); + + return; + } + + subid = HeapTupleGetOid(tup); + + /* must be owner */ + if (!pg_subscription_ownercheck(subid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, + stmt->subname); + + /* DROP hook for the subscription being removed */ + InvokeObjectDropHook(SubscriptionRelationId, subid, 0); + + /* + * Lock the subscription so noboby else can do anything with it + * (including the replication workers). + */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + + /* Get subname */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subname, &isnull); + Assert(!isnull); + subname = pstrdup(NameStr(*DatumGetName(datum))); + + /* Get conninfo */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subconninfo, &isnull); + Assert(!isnull); + conninfo = pstrdup(TextDatumGetCString(datum)); + + /* Get slotname */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subslotname, &isnull); + Assert(!isnull); + slotname = pstrdup(NameStr(*DatumGetName(datum))); + + ObjectAddressSet(myself, SubscriptionRelationId, subid); + EventTriggerSQLDropAddObject(&myself, true, true); + + /* Remove the tuple from catalog. */ + simple_heap_delete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + /* Protect against launcher restarting the worker. */ + LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); + + /* Kill the apply worker so that the slot becomes accessible. */ + logicalrep_worker_stop(subid); + + /* Remove the origin tracking if exists. */ + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, true); + if (originid != InvalidRepOriginId) + replorigin_drop(originid); + + /* If the user asked to not drop the slot, we are done mow.*/ + if (!stmt->drop_slot) + { + heap_close(rel, NoLock); + return; + } + + /* + * Otherwise drop the replication slot at the publisher node using + * the replication connection. + */ + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); + + wrconn = walrcv_connect(conninfo, true, subname, &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", slotname), + errdetail("The error was: %s", err))); + + if (!walrcv_command(wrconn, cmd.data, &err)) + ereport(ERROR, + (errmsg("count not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", err))); + else + ereport(NOTICE, + (errmsg("dropped replication slot \"%s\" on publisher", + slotname))); + + walrcv_disconnect(wrconn); + + pfree(cmd.data); + + heap_close(rel, NoLock); +} + +/* + * Internal workhorse for changing a subscription owner + */ +static void +AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) +{ + Form_pg_subscription form; + + form = (Form_pg_subscription) GETSTRUCT(tup); + + if (form->subowner == newOwnerId) + return; + + if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, + NameStr(form->subname)); + + /* New owner must be a superuser */ + if (!superuser_arg(newOwnerId)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to change owner of subscription \"%s\"", + NameStr(form->subname)), + errhint("The owner of an subscription must be a superuser."))); + + form->subowner = newOwnerId; + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + /* Update owner dependency reference */ + changeDependencyOnOwner(SubscriptionRelationId, + HeapTupleGetOid(tup), + newOwnerId); + + InvokeObjectPostAlterHook(SubscriptionRelationId, + HeapTupleGetOid(tup), 0); +} + +/* + * Change subscription owner -- by name + */ +ObjectAddress +AlterSubscriptionOwner(const char *name, Oid newOwnerId) +{ + Oid subid; + HeapTuple tup; + Relation rel; + ObjectAddress address; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(name)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", name))); + + subid = HeapTupleGetOid(tup); + + AlterSubscriptionOwner_internal(rel, tup, newOwnerId); + + ObjectAddressSet(address, SubscriptionRelationId, subid); + + heap_freetuple(tup); + + heap_close(rel, RowExclusiveLock); + + return address; +} + +/* + * Change subscription owner -- by OID + */ +void +AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) +{ + HeapTuple tup; + Relation rel; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription with OID %u does not exist", subid))); + + AlterSubscriptionOwner_internal(rel, tup, newOwnerId); + + heap_freetuple(tup); + + heap_close(rel, RowExclusiveLock); +} |