diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_recvlogical.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 116 |
1 files changed, 27 insertions, 89 deletions
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index a88ffacc06d..c48ceccf901 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -596,7 +596,6 @@ sighup_handler(int signum) int main(int argc, char **argv) { - PGresult *res; static struct option long_options[] = { /* general options */ {"file", required_argument, NULL, 'f'}, @@ -628,6 +627,7 @@ main(int argc, char **argv) int option_index; uint32 hi, lo; + char *db_name; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); @@ -834,124 +834,62 @@ main(int argc, char **argv) #endif /* - * don't really need this but it actually helps to get more precise error - * messages about authentication, required GUCs and such without starting - * to loop around connection attempts lateron. + * Obtain a connection to server. This is not really necessary but it + * helps to get more precise error messages about authentification, + * required GUC parameters and such. */ - { - conn = GetConnection(); - if (!conn) - /* Error message already written in GetConnection() */ - exit(1); + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); - /* - * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog - * position. - */ - res = PQexec(conn, "IDENTIFY_SYSTEM"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); - disconnect_and_exit(1); - } + /* + * Run IDENTIFY_SYSTEM to make sure we connected using a database specific + * replication connection. + */ + if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) + disconnect_and_exit(1); - if (PQntuples(res) != 1 || PQnfields(res) < 4) - { - fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 4); - disconnect_and_exit(1); - } - PQclear(res); + if (db_name == NULL) + { + fprintf(stderr, + _("%s: failed to establish database specific replication connection\n"), + progname); + disconnect_and_exit(1); } - - /* - * drop a replication slot - */ + /* Drop a replication slot. */ if (do_drop_slot) { - char query[256]; - if (verbose) fprintf(stderr, _("%s: dropping replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", - replication_slot); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); + if (!DropReplicationSlot(conn, replication_slot)) disconnect_and_exit(1); - } - - if (PQntuples(res) != 0 || PQnfields(res) != 0) - { - fprintf(stderr, - _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0); - disconnect_and_exit(1); - } - - PQclear(res); - disconnect_and_exit(0); } - /* - * create a replication slot - */ + /* Create a replication slot. */ if (do_create_slot) { - char query[256]; - if (verbose) fprintf(stderr, _("%s: creating replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", - replication_slot, plugin); - - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); + if (!CreateReplicationSlot(conn, replication_slot, plugin, + &startpos, false)) disconnect_and_exit(1); - } - - if (PQntuples(res) != 1 || PQnfields(res) != 4) - { - fprintf(stderr, - _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4); - disconnect_and_exit(1); - } - - if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) - { - fprintf(stderr, - _("%s: could not parse transaction log location \"%s\"\n"), - progname, PQgetvalue(res, 0, 1)); - disconnect_and_exit(1); - } - startpos = ((uint64) hi) << 32 | lo; - - replication_slot = strdup(PQgetvalue(res, 0, 0)); - PQclear(res); } - if (!do_start_slot) disconnect_and_exit(0); + /* Stream loop */ while (true) { - StreamLog(); + StreamLogicalLog(); if (time_to_abort) { /* |