aboutsummaryrefslogtreecommitdiff
path: root/src/fe_utils/parallel_slot.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fe_utils/parallel_slot.c')
-rw-r--r--src/fe_utils/parallel_slot.c91
1 files changed, 63 insertions, 28 deletions
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c
index 3987a4702b5..b625deb2545 100644
--- a/src/fe_utils/parallel_slot.c
+++ b/src/fe_utils/parallel_slot.c
@@ -30,7 +30,7 @@
static void init_slot(ParallelSlot *slot, PGconn *conn);
static int select_loop(int maxFd, fd_set *workerset);
-static bool processQueryResult(PGconn *conn, PGresult *result);
+static bool processQueryResult(ParallelSlot *slot, PGresult *result);
static void
init_slot(ParallelSlot *slot, PGconn *conn)
@@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn)
slot->connection = conn;
/* Initially assume connection is idle */
slot->isFree = true;
+ ParallelSlotClearHandler(slot);
}
/*
- * Process (and delete) a query result. Returns true if there's no error,
- * false otherwise -- but errors about trying to work on a missing relation
- * are reported and subsequently ignored.
+ * Process (and delete) a query result. Returns true if there's no problem,
+ * false otherwise. It's up to the handler to decide what cosntitutes a
+ * problem.
*/
static bool
-processQueryResult(PGconn *conn, PGresult *result)
+processQueryResult(ParallelSlot *slot, PGresult *result)
{
- /*
- * If it's an error, report it. Errors about a missing table are harmless
- * so we continue processing; but die for other errors.
- */
- if (PQresultStatus(result) != PGRES_COMMAND_OK)
- {
- char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+ Assert(slot->handler != NULL);
- pg_log_error("processing of database \"%s\" failed: %s",
- PQdb(conn), PQerrorMessage(conn));
-
- if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
- {
- PQclear(result);
- return false;
- }
- }
+ /* On failure, the handler should return NULL after freeing the result */
+ if (!slot->handler(result, slot->connection, slot->handler_context))
+ return false;
+ /* Ok, we have to free it ourself */
PQclear(result);
return true;
}
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
* Note that this will block if the connection is busy.
*/
static bool
-consumeQueryResult(PGconn *conn)
+consumeQueryResult(ParallelSlot *slot)
{
bool ok = true;
PGresult *result;
- SetCancelConn(conn);
- while ((result = PQgetResult(conn)) != NULL)
+ SetCancelConn(slot->connection);
+ while ((result = PQgetResult(slot->connection)) != NULL)
{
- if (!processQueryResult(conn, result))
+ if (!processQueryResult(slot, result))
ok = false;
}
ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
if (result != NULL)
{
- /* Check and discard the command result */
- if (!processQueryResult(slots[i].connection, result))
+ /* Handle and discard the command result */
+ if (!processQueryResult(slots + i, result))
return NULL;
}
else
{
/* This connection has become idle */
slots[i].isFree = true;
+ ParallelSlotClearHandler(slots + i);
if (firstFree < 0)
firstFree = i;
break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
for (i = 0; i < numslots; i++)
{
- if (!consumeQueryResult((slots + i)->connection))
+ if (!consumeQueryResult(slots + i))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * TableCommandResultHandler
+ *
+ * ParallelSlotResultHandler for results of commands (not queries) against
+ * tables.
+ *
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
+ * a missing table. This is useful for utilities that compile a list of tables
+ * to process and then run commands (vacuum, reindex, or whatever) against
+ * those tables, as there is a race condition between the time the list is
+ * compiled and the time the command attempts to open the table.
+ *
+ * For missing tables, logs an error but allows processing to continue.
+ *
+ * For all other errors, logs an error and terminates further processing.
+ *
+ * res: PGresult from the query executed on the slot's connection
+ * conn: connection belonging to the slot
+ * context: unused
+ */
+bool
+TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
+{
+ /*
+ * If it's an error, report it. Errors about a missing table are harmless
+ * so we continue processing; but die for other errors.
+ */
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+ pg_log_error("processing of database \"%s\" failed: %s",
+ PQdb(conn), PQerrorMessage(conn));
+
+ if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+ {
+ PQclear(res);
return false;
+ }
}
return true;