aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/copy.c2
-rw-r--r--src/backend/commands/copyto.c31
-rw-r--r--src/include/commands/copy.h3
-rw-r--r--src/test/modules/Makefile1
-rw-r--r--src/test/modules/meson.build1
-rw-r--r--src/test/modules/test_copy_callbacks/.gitignore4
-rw-r--r--src/test/modules/test_copy_callbacks/Makefile23
-rw-r--r--src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out13
-rw-r--r--src/test/modules/test_copy_callbacks/meson.build34
-rw-r--r--src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql4
-rw-r--r--src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql8
-rw-r--r--src/test/modules/test_copy_callbacks/test_copy_callbacks.c51
-rw-r--r--src/test/modules/test_copy_callbacks/test_copy_callbacks.control4
-rw-r--r--src/tools/pgindent/typedefs.list1
14 files changed, 175 insertions, 5 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476af..db4c9dbc231 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a105..2527e660598 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK /* to callback function */
} CopyDest;
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
}
/* Update the progress */
@@ -336,6 +341,17 @@ EndCopy(CopyToState cstate)
/*
* Setup CopyToState to read tuples from a table or a query for COPY TO.
+ *
+ * 'rel': Relation to be copied
+ * 'raw_query': Query whose results are to be copied
+ * 'queryRelId': OID of base relation to convert to a query (for RLS)
+ * 'filename': Name of server-local file to write, NULL for STDOUT
+ * 'is_program': true if 'filename' is program to execute
+ * 'data_dest_cb': Callback that processes the output data
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
*/
CopyToState
BeginCopyTo(ParseState *pstate,
@@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyToState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
@@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate,
cstate->copy_dest = COPY_FILE; /* default */
- if (pipe)
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
@@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate)
/*
* Copy from relation or query TO file.
+ *
+ * Returns the number of rows processed.
*/
uint64
DoCopyTo(CopyToState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b1327..b77b9350058 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
typedef struct CopyToStateData *CopyToState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
*/
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
extern void EndCopyTo(CopyToState cstate);
extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6c31c8707c2..7b3f2929652 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
snapshot_too_old \
spgist_name_ops \
test_bloomfilter \
+ test_copy_callbacks \
test_ddl_deparse \
test_extensions \
test_ginpostinglist \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index a80e6e2ce29..c2e5f5ffd5a 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
subdir('test_ddl_deparse')
subdir('test_extensions')
subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644
index 00000000000..82e890150d1
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+ $(WIN32RES) \
+ test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - test COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644
index 00000000000..93ebeef1301
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
@@ -0,0 +1,13 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE: COPY TO callback called with data "1 2 3" and length 5
+NOTICE: COPY TO callback called with data "12 34 56" and length 8
+NOTICE: COPY TO callback called with data "123 456 789" and length 11
+NOTICE: COPY TO callback has processed 3 rows
+ test_copy_to_callback
+-----------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644
index 00000000000..43eca8e3d9d
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/meson.build
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+ 'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+ test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_copy_callbacks',
+ '--FILEDESC', 'test_copy_callbacks - test COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+ test_copy_callbacks_sources,
+ kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+ 'test_copy_callbacks.control',
+ 'test_copy_callbacks--1.0.sql',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'test_copy_callbacks',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_copy_callbacks',
+ ],
+ },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644
index 00000000000..2deffba635c
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644
index 00000000000..215cf3fad69
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
+ RETURNS pg_catalog.void
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644
index 00000000000..ecdbe4eee1b
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -0,0 +1,51 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ * Code for testing COPY callbacks.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+ ereport(NOTICE,
+ (errmsg("COPY TO callback called with data \"%s\" and length %d",
+ (char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+ Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+ CopyToState cstate;
+ int64 processed;
+
+ cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, NIL, NIL);
+ processed = DoCopyTo(cstate);
+ EndCopyTo(cstate);
+
+ ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows",
+ (long long) processed)));
+
+ table_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644
index 00000000000..b7ce3f12ff8
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97c9bc18615..d9b839c9799 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3177,6 +3177,7 @@ compare_context
config_var_value
contain_aggs_of_level_context
convert_testexpr_context
+copy_data_dest_cb
copy_data_source_cb
core_YYSTYPE
core_yy_extra_type