aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/copy.c49
-rw-r--r--src/backend/commands/copyfrom.c48
-rw-r--r--src/backend/commands/copyfromparse.c17
3 files changed, 109 insertions, 5 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index fe4cf957d77..c36d7f1daaf 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -395,6 +395,42 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from)
}
/*
+ * Extract a CopySaveErrorToChoice value from a DefElem.
+ */
+static CopySaveErrorToChoice
+defGetCopySaveErrorToChoice(DefElem *def, ParseState *pstate, bool is_from)
+{
+ char *sval;
+
+ if (!is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY SAVE_ERROR_TO cannot be used with COPY TO"),
+ parser_errposition(pstate, def->location)));
+
+ /*
+ * If no parameter value given, assume the default value.
+ */
+ if (def->arg == NULL)
+ return COPY_SAVE_ERROR_TO_ERROR;
+
+ /*
+ * Allow "error", or "none" values.
+ */
+ sval = defGetString(def);
+ if (pg_strcasecmp(sval, "error") == 0)
+ return COPY_SAVE_ERROR_TO_ERROR;
+ if (pg_strcasecmp(sval, "none") == 0)
+ return COPY_SAVE_ERROR_TO_NONE;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY save_error_to \"%s\" not recognized", sval),
+ parser_errposition(pstate, def->location)));
+ return COPY_SAVE_ERROR_TO_ERROR; /* keep compiler quiet */
+}
+
+/*
* Process the statement option list for COPY.
*
* Scan the options list (a list of DefElem) and transpose the information
@@ -419,6 +455,7 @@ ProcessCopyOptions(ParseState *pstate,
bool format_specified = false;
bool freeze_specified = false;
bool header_specified = false;
+ bool save_error_to_specified = false;
ListCell *option;
/* Support external use for option sanity checking */
@@ -571,6 +608,13 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "save_error_to") == 0)
+ {
+ if (save_error_to_specified)
+ errorConflictingDefElem(defel, pstate);
+ save_error_to_specified = true;
+ opts_out->save_error_to = defGetCopySaveErrorToChoice(defel, pstate, is_from);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -598,6 +642,11 @@ ProcessCopyOptions(ParseState *pstate,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify DEFAULT in BINARY mode")));
+ if (opts_out->binary && opts_out->save_error_to != COPY_SAVE_ERROR_TO_ERROR)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify SAVE_ERROR_TO in BINARY mode")));
+
/* Set defaults for omitted options */
if (!opts_out->delim)
opts_out->delim = opts_out->csv_mode ? "," : "\t";
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 37836a769c7..46b23e345b8 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -42,6 +42,7 @@
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "nodes/miscnodes.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
@@ -656,6 +657,9 @@ CopyFrom(CopyFromState cstate)
Assert(cstate->rel);
Assert(list_length(cstate->range_table) == 1);
+ if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR)
+ Assert(cstate->escontext);
+
/*
* The target must be a plain, foreign, or partitioned relation, or have
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
@@ -992,6 +996,25 @@ CopyFrom(CopyFromState cstate)
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
break;
+ if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR &&
+ cstate->escontext->error_occurred)
+ {
+ /*
+ * Soft error occured, skip this tuple and save error information
+ * according to SAVE_ERROR_TO.
+ */
+ if (cstate->opts.save_error_to == COPY_SAVE_ERROR_TO_NONE)
+
+ /*
+ * Just make ErrorSaveContext ready for the next NextCopyFrom.
+ * Since we don't set details_wanted and error_data is not to
+ * be filled, just resetting error_occurred is enough.
+ */
+ cstate->escontext->error_occurred = false;
+
+ continue;
+ }
+
ExecStoreVirtualTuple(myslot);
/*
@@ -1284,6 +1307,14 @@ CopyFrom(CopyFromState cstate)
/* Done, clean up */
error_context_stack = errcallback.previous;
+ if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR &&
+ cstate->num_errors > 0)
+ ereport(NOTICE,
+ errmsg_plural("%zd row were skipped due to data type incompatibility",
+ "%zd rows were skipped due to data type incompatibility",
+ cstate->num_errors,
+ cstate->num_errors));
+
if (bistate != NULL)
FreeBulkInsertState(bistate);
@@ -1419,6 +1450,23 @@ BeginCopyFrom(ParseState *pstate,
}
}
+ /* Set up soft error handler for SAVE_ERROR_TO */
+ if (cstate->opts.save_error_to != COPY_SAVE_ERROR_TO_ERROR)
+ {
+ cstate->escontext = makeNode(ErrorSaveContext);
+ cstate->escontext->type = T_ErrorSaveContext;
+ cstate->escontext->error_occurred = false;
+
+ /*
+ * Currently we only support COPY_SAVE_ERROR_TO_NONE. We'll add other
+ * options later
+ */
+ if (cstate->opts.save_error_to == COPY_SAVE_ERROR_TO_NONE)
+ cstate->escontext->details_wanted = false;
+ }
+ else
+ cstate->escontext = NULL;
+
/* Convert FORCE_NULL name list to per-column flags, check validity */
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_null_all)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index af4c36f6450..7207eb26983 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -70,6 +70,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "nodes/miscnodes.h"
#include "pgstat.h"
#include "port/pg_bswap.h"
#include "utils/builtins.h"
@@ -955,11 +956,17 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
- else
- values[m] = InputFunctionCall(&in_functions[m],
- string,
- typioparams[m],
- att->atttypmod);
+ /* If SAVE_ERROR_TO is specified, skip rows with soft errors */
+ else if (!InputFunctionCallSafe(&in_functions[m],
+ string,
+ typioparams[m],
+ att->atttypmod,
+ (Node *) cstate->escontext,
+ &values[m]))
+ {
+ cstate->num_errors++;
+ return true;
+ }
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;