aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/subscriptioncmds.c29
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c60
-rw-r--r--src/backend/replication/logical/tablesync.c21
-rw-r--r--src/backend/replication/logical/worker.c6
-rw-r--r--src/backend/replication/walreceiver.c19
5 files changed, 88 insertions, 47 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8aa6de17850..75e195f286e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
PG_TRY();
{
@@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
PG_TRY();
{
@@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (sub->enabled && !slotname)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"slot_name = NONE")));
@@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (!sub->slotname && enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
values[Anum_pg_subscription_subenabled - 1] =
@@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
@@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
@@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options,
@@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi
{
/* ERROR. */
ereport(ERROR,
- (errmsg("could not drop replication slot \"%s\" on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not drop replication slot \"%s\" on publisher: %s",
slotname, res->err)));
}
@@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not receive list of replicated tables from the publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
/* Process tables. */
@@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
}
ereport(ERROR,
- (errmsg("could not connect to publisher when attempting to "
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to publisher when attempting to "
"drop replication slot \"%s\": %s", slotname, err),
/* translator: %s is an SQL ALTER command */
errhint("Use %s to disassociate the subscription from the slot.",
@@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums)
if (strcmp(name, pname) == 0)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("publication name \"%s\" used more than once",
pname)));
}
@@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
oldpublist = lappend(oldpublist, makeString(name));
else if (!addpub && !found)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("publication \"%s\" is not in subscription \"%s\"",
name, subname)));
}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 021c1b36f3e..6eaa84a0315 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
if (conn_opts == NULL)
ereport(ERROR,
- (errmsg("could not parse connection string: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("could not parse connection string: %s",
_("out of memory"))));
/* build a clean connection string from pieces */
@@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive database system identifier and timeline ID from "
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
@@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
PQclear(res);
ereport(ERROR,
- (errmsg("invalid response from primary server"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1)));
}
@@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
strlen(pubnames_str));
if (!pubnames_literal)
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
PQfreemem(pubnames_literal);
@@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
PQclear(res);
@@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
- (errmsg("could not send end-of-streaming message to primary: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send end-of-streaming message to primary: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
*next_tli = 0;
@@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR,
- (errmsg("unexpected result set after end-of-streaming")));
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
PQclear(res);
@@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
/* End the copy */
if (PQendcopy(conn->streamConn))
ereport(ERROR,
- (errmsg("error while shutting down streaming COPY: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("error while shutting down streaming COPY: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */
@@ -540,7 +549,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
- (errmsg("error reading result of streaming command: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("error reading result of streaming command: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);
@@ -548,7 +558,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL)
ereport(ERROR,
- (errmsg("unexpected result after CommandComplete: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result after CommandComplete: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
@@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive timeline history file from "
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive timeline history file from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
@@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
PQclear(res);
ereport(ERROR,
- (errmsg("invalid response from primary server"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
@@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
/* Try consuming some data. */
if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* Now that we've consumed some input, try again */
@@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
return -1;
ereport(ERROR,
- (errmsg("unexpected result after CommandComplete: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(conn->streamConn))));
}
@@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
}
if (rawlen < -1)
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* Return received messages to caller */
@@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
- (errmsg("could not send data to WAL stream: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send data to WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
@@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not create replication slot \"%s\": %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
}
@@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
/* Make sure we got expected number of fields. */
if (nfields != nRetTypes)
ereport(ERROR,
- (errmsg("invalid query response"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid query response"),
errdetail("Expected %d fields, got %d fields.",
nRetTypes, nfields)));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67f907cdd96..cc50eb875b1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname,
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR,
- (errmsg("table \"%s.%s\" not found on publisher",
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not found on publisher",
nspname, relname)));
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
@@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname,
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));
/* We don't know the number of rows coming, so allocate enough space. */
@@ -851,7 +854,8 @@ copy_table(Relation rel)
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
- (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start initial contents copy for table \"%s.%s\": %s",
lrel.nspname, lrel.relname, res->err)));
walrcv_clear_result(res);
@@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_connect(MySubscription->conninfo, true, slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
@@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
- (errmsg("table copy could not start transaction on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("table copy could not start transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);
@@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
- (errmsg("table copy could not finish transaction on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("table copy could not finish transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4b112593c65..bbb659dad06 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (now >= timeout)
ereport(ERROR,
- (errmsg("terminating logical replication worker due to timeout")));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication worker due to timeout")));
/* Check to see if it's time for a ping. */
if (!ping_sent)
@@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg)
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
/*
* We don't really use the output identify_system for anything but it
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b94910bfe9a..faeea9f0cc5 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -279,10 +279,13 @@ WalReceiverMain(void)
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
- wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
+ wrconn = walrcv_connect(conninfo, false,
+ cluster_name[0] ? cluster_name : "walreceiver",
+ &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the primary server: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err)));
/*
* Save user-visible connection string. This clobbers the original
@@ -328,7 +331,8 @@ WalReceiverMain(void)
if (strcmp(primary_sysid, standby_sysid) != 0)
{
ereport(ERROR,
- (errmsg("database system identifier differs between the primary and standby"),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
@@ -339,7 +343,8 @@ WalReceiverMain(void)
*/
if (primaryTLI < startpointTLI)
ereport(ERROR,
- (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("highest timeline %u of the primary is behind recovery timeline %u",
primaryTLI, startpointTLI)));
/*
@@ -425,7 +430,8 @@ WalReceiverMain(void)
*/
if (!RecoveryInProgress())
ereport(FATAL,
- (errmsg("cannot continue WAL streaming, recovery has already ended")));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
@@ -551,7 +557,8 @@ WalReceiverMain(void)
if (now >= timeout)
ereport(ERROR,
- (errmsg("terminating walreceiver due to timeout")));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating walreceiver due to timeout")));
/*
* We didn't receive anything new, for half of