aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/parallel.c18
-rw-r--r--src/backend/commands/async.c5
-rw-r--r--src/backend/commands/variable.c24
-rw-r--r--src/backend/libpq/pqformat.c30
-rw-r--r--src/backend/libpq/pqmq.c2
-rw-r--r--src/include/commands/async.h4
-rw-r--r--src/include/libpq/pqformat.h1
7 files changed, 78 insertions, 6 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 088700e17cb..eef1dc2b184 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'A': /* NotifyResponse */
{
/* Propagate NotifyResponse. */
- pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+ int32 pid;
+ const char *channel;
+ const char *payload;
+
+ pid = pq_getmsgint(msg, 4);
+ channel = pq_getmsgrawstring(msg);
+ payload = pq_getmsgrawstring(msg);
+ pq_endmessage(msg);
+
+ NotifyMyFrontEnd(channel, payload, pid);
+
break;
}
@@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg)
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id);
+ /*
+ * Set the client encoding to the database encoding, since that is what
+ * the leader will expect.
+ */
+ SetClientEncoding(GetDatabaseEncoding());
+
/* Restore GUC values from launching backend. */
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
Assert(gucspace != NULL);
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3aeef0..716f1c33183 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
char *page_buffer);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(const char *channel,
- const char *payload,
- int32 srcPid);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
/*
* Send NOTIFY message to my front end.
*/
-static void
+void
NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{
if (whereToSendOutput == DestRemote)
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 962d75db6e4..4ad8266a51c 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra)
{
int encoding = *((int *) extra);
+ /*
+ * Parallel workers send data to the leader, not the client. They always
+ * send data using the database encoding.
+ */
+ if (IsParallelWorker())
+ {
+ /*
+ * During parallel worker startup, we want to accept the leader's
+ * client_encoding setting so that anyone who looks at the value in
+ * the worker sees the same value that they would see in the leader.
+ */
+ if (InitializingParallelWorker)
+ return;
+
+ /*
+ * A change other than during startup, for example due to a SET clause
+ * attached to a function definition, should be rejected, as there is
+ * nothing we can do inside the worker to make it take effect.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot change client_encoding in a parallel worker")));
+ }
+
/* We do not expect an error if PrepareClientEncoding succeeded */
if (SetClientEncoding(encoding) < 0)
elog(LOG, "SetClientEncoding(%d) failed", encoding);
diff --git a/src/backend/libpq/pqformat.c b/src/backend/libpq/pqformat.c
index 4ddea8285fc..b5d9d64e547 100644
--- a/src/backend/libpq/pqformat.c
+++ b/src/backend/libpq/pqformat.c
@@ -65,6 +65,7 @@
* pq_copymsgbytes - copy raw data from a message buffer
* pq_getmsgtext - get a counted text string (with conversion)
* pq_getmsgstring - get a null-terminated text string (with conversion)
+ * pq_getmsgrawstring - get a null-terminated text string - NO conversion
* pq_getmsgend - verify message fully consumed
*/
@@ -640,6 +641,35 @@ pq_getmsgstring(StringInfo msg)
}
/* --------------------------------
+ * pq_getmsgrawstring - get a null-terminated text string - NO conversion
+ *
+ * Returns a pointer directly into the message buffer.
+ * --------------------------------
+ */
+const char *
+pq_getmsgrawstring(StringInfo msg)
+{
+ char *str;
+ int slen;
+
+ str = &msg->data[msg->cursor];
+
+ /*
+ * It's safe to use strlen() here because a StringInfo is guaranteed to
+ * have a trailing null byte. But check we found a null inside the
+ * message.
+ */
+ slen = strlen(str);
+ if (msg->cursor + slen >= msg->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid string in message")));
+ msg->cursor += slen + 1;
+
+ return str;
+}
+
+/* --------------------------------
* pq_getmsgend - verify message fully consumed
* --------------------------------
*/
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 3225c1fa0e7..0dcdee03db5 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
pq_getmsgend(msg);
break;
}
- value = pq_getmsgstring(msg);
+ value = pq_getmsgrawstring(msg);
switch (code)
{
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fac4a3..95559df19fe 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void);
+extern void NotifyMyFrontEnd(const char *channel,
+ const char *payload,
+ int32 srcPid);
+
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
diff --git a/src/include/libpq/pqformat.h b/src/include/libpq/pqformat.h
index 65ebf37fbc0..3c0d4b2622b 100644
--- a/src/include/libpq/pqformat.h
+++ b/src/include/libpq/pqformat.h
@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
extern const char *pq_getmsgstring(StringInfo msg);
+extern const char *pq_getmsgrawstring(StringInfo msg);
extern void pq_getmsgend(StringInfo msg);
#endif /* PQFORMAT_H */