aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/analyze.c30
-rw-r--r--src/backend/commands/cluster.c2
-rw-r--r--src/backend/commands/copy.c42
-rw-r--r--src/backend/commands/copyfromparse.c17
-rw-r--r--src/backend/commands/copyto.c2
-rw-r--r--src/backend/commands/dbcommands.c14
-rw-r--r--src/backend/commands/explain.c21
-rw-r--r--src/backend/commands/foreigncmds.c1
-rw-r--r--src/backend/commands/indexcmds.c42
-rw-r--r--src/backend/commands/matview.c3
-rw-r--r--src/backend/commands/publicationcmds.c24
-rw-r--r--src/backend/commands/schemacmds.c1
-rw-r--r--src/backend/commands/subscriptioncmds.c408
-rw-r--r--src/backend/commands/tablecmds.c82
-rw-r--r--src/backend/commands/tablespace.c2
-rw-r--r--src/backend/commands/trigger.c74
-rw-r--r--src/backend/commands/typecmds.c14
-rw-r--r--src/backend/commands/vacuum.c121
18 files changed, 675 insertions, 225 deletions
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 4fffb76e557..40d66537ad7 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -76,7 +76,7 @@ static BufferAccessStrategy vac_strategy;
static void do_analyze_rel(Relation onerel,
- VacuumParams *params, List *va_cols,
+ const VacuumParams params, List *va_cols,
AcquireSampleRowsFunc acquirefunc, BlockNumber relpages,
bool inh, bool in_outer_xact, int elevel);
static void compute_index_stats(Relation onerel, double totalrows,
@@ -107,7 +107,7 @@ static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
*/
void
analyze_rel(Oid relid, RangeVar *relation,
- VacuumParams *params, List *va_cols, bool in_outer_xact,
+ const VacuumParams params, List *va_cols, bool in_outer_xact,
BufferAccessStrategy bstrategy)
{
Relation onerel;
@@ -116,7 +116,7 @@ analyze_rel(Oid relid, RangeVar *relation,
BlockNumber relpages = 0;
/* Select logging level */
- if (params->options & VACOPT_VERBOSE)
+ if (params.options & VACOPT_VERBOSE)
elevel = INFO;
else
elevel = DEBUG2;
@@ -138,8 +138,8 @@ analyze_rel(Oid relid, RangeVar *relation,
*
* Make sure to generate only logs for ANALYZE in this case.
*/
- onerel = vacuum_open_relation(relid, relation, params->options & ~(VACOPT_VACUUM),
- params->log_min_duration >= 0,
+ onerel = vacuum_open_relation(relid, relation, params.options & ~(VACOPT_VACUUM),
+ params.log_min_duration >= 0,
ShareUpdateExclusiveLock);
/* leave if relation could not be opened or locked */
@@ -155,7 +155,7 @@ analyze_rel(Oid relid, RangeVar *relation,
*/
if (!vacuum_is_permitted_for_relation(RelationGetRelid(onerel),
onerel->rd_rel,
- params->options & ~VACOPT_VACUUM))
+ params.options & ~VACOPT_VACUUM))
{
relation_close(onerel, ShareUpdateExclusiveLock);
return;
@@ -227,7 +227,7 @@ analyze_rel(Oid relid, RangeVar *relation,
else
{
/* No need for a WARNING if we already complained during VACUUM */
- if (!(params->options & VACOPT_VACUUM))
+ if (!(params.options & VACOPT_VACUUM))
ereport(WARNING,
(errmsg("skipping \"%s\" --- cannot analyze non-tables or special system tables",
RelationGetRelationName(onerel))));
@@ -275,7 +275,7 @@ analyze_rel(Oid relid, RangeVar *relation,
* appropriate acquirefunc for each child table.
*/
static void
-do_analyze_rel(Relation onerel, VacuumParams *params,
+do_analyze_rel(Relation onerel, const VacuumParams params,
List *va_cols, AcquireSampleRowsFunc acquirefunc,
BlockNumber relpages, bool inh, bool in_outer_xact,
int elevel)
@@ -309,9 +309,9 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
PgStat_Counter startreadtime = 0;
PgStat_Counter startwritetime = 0;
- verbose = (params->options & VACOPT_VERBOSE) != 0;
+ verbose = (params.options & VACOPT_VERBOSE) != 0;
instrument = (verbose || (AmAutoVacuumWorkerProcess() &&
- params->log_min_duration >= 0));
+ params.log_min_duration >= 0));
if (inh)
ereport(elevel,
(errmsg("analyzing \"%s.%s\" inheritance tree",
@@ -690,8 +690,8 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
* only do it for inherited stats. (We're never called for not-inherited
* stats on partitioned tables anyway.)
*
- * Reset the changes_since_analyze counter only if we analyzed all
- * columns; otherwise, there is still work for auto-analyze to do.
+ * Reset the mod_since_analyze counter only if we analyzed all columns;
+ * otherwise, there is still work for auto-analyze to do.
*/
if (!inh)
pgstat_report_analyze(onerel, totalrows, totaldeadrows,
@@ -706,7 +706,7 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
* amvacuumcleanup() when called in ANALYZE-only mode. The only exception
* among core index AMs is GIN/ginvacuumcleanup().
*/
- if (!(params->options & VACOPT_VACUUM))
+ if (!(params.options & VACOPT_VACUUM))
{
for (ind = 0; ind < nindexes; ind++)
{
@@ -736,9 +736,9 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
{
TimestampTz endtime = GetCurrentTimestamp();
- if (verbose || params->log_min_duration == 0 ||
+ if (verbose || params.log_min_duration == 0 ||
TimestampDifferenceExceeds(starttime, endtime,
- params->log_min_duration))
+ params.log_min_duration))
{
long delay_in_ms;
WalUsage walusage;
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 54a08e4102e..b55221d44cd 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -917,7 +917,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
* not to be aggressive about this.
*/
memset(&params, 0, sizeof(VacuumParams));
- vacuum_get_cutoffs(OldHeap, &params, &cutoffs);
+ vacuum_get_cutoffs(OldHeap, params, &cutoffs);
/*
* FreezeXid will become the table's new relfrozenxid, and that mustn't go
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 74ae42b19a7..fae9c41db65 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -322,11 +322,13 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
}
/*
- * Extract a CopyHeaderChoice value from a DefElem. This is like
- * defGetBoolean() but also accepts the special value "match".
+ * Extract the CopyFormatOptions.header_line value from a DefElem.
+ *
+ * Parses the HEADER option for COPY, which can be a boolean, a non-negative
+ * integer (number of lines to skip), or the special value "match".
*/
-static CopyHeaderChoice
-defGetCopyHeaderChoice(DefElem *def, bool is_from)
+static int
+defGetCopyHeaderOption(DefElem *def, bool is_from)
{
/*
* If no parameter value given, assume "true" is meant.
@@ -335,20 +337,27 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from)
return COPY_HEADER_TRUE;
/*
- * Allow 0, 1, "true", "false", "on", "off", or "match".
+ * Allow 0, 1, "true", "false", "on", "off", a non-negative integer, or
+ * "match".
*/
switch (nodeTag(def->arg))
{
case T_Integer:
- switch (intVal(def->arg))
{
- case 0:
- return COPY_HEADER_FALSE;
- case 1:
- return COPY_HEADER_TRUE;
- default:
- /* otherwise, error out below */
- break;
+ int ival = intVal(def->arg);
+
+ if (ival < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("a negative integer value cannot be "
+ "specified for %s", def->defname)));
+
+ if (!is_from && ival > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use multi-line header in COPY TO")));
+
+ return ival;
}
break;
default:
@@ -381,7 +390,8 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from)
}
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("%s requires a Boolean value or \"match\"",
+ errmsg("%s requires a Boolean value, a non-negative integer, "
+ "or the string \"match\"",
def->defname)));
return COPY_HEADER_FALSE; /* keep compiler quiet */
}
@@ -566,7 +576,7 @@ ProcessCopyOptions(ParseState *pstate,
if (header_specified)
errorConflictingDefElem(defel, pstate);
header_specified = true;
- opts_out->header_line = defGetCopyHeaderChoice(defel, is_from);
+ opts_out->header_line = defGetCopyHeaderOption(defel, is_from);
}
else if (strcmp(defel->defname, "quote") == 0)
{
@@ -769,7 +779,7 @@ ProcessCopyOptions(ParseState *pstate,
errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim)));
/* Check header */
- if (opts_out->binary && opts_out->header_line)
+ if (opts_out->binary && opts_out->header_line != COPY_HEADER_FALSE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
/*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f52f2477df1..b1ae97b833d 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -771,21 +771,30 @@ static pg_attribute_always_inline bool
NextCopyFromRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
{
int fldct;
- bool done;
+ bool done = false;
/* only available for text or csv input */
Assert(!cstate->opts.binary);
/* on input check that the header line is correct if needed */
- if (cstate->cur_lineno == 0 && cstate->opts.header_line)
+ if (cstate->cur_lineno == 0 && cstate->opts.header_line != COPY_HEADER_FALSE)
{
ListCell *cur;
TupleDesc tupDesc;
+ int lines_to_skip = cstate->opts.header_line;
+
+ /* If set to "match", one header line is skipped */
+ if (cstate->opts.header_line == COPY_HEADER_MATCH)
+ lines_to_skip = 1;
tupDesc = RelationGetDescr(cstate->rel);
- cstate->cur_lineno++;
- done = CopyReadLine(cstate, is_csv);
+ for (int i = 0; i < lines_to_skip; i++)
+ {
+ cstate->cur_lineno++;
+ if ((done = CopyReadLine(cstate, is_csv)))
+ break;
+ }
if (cstate->opts.header_line == COPY_HEADER_MATCH)
{
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index ea6f18f2c80..67b94b91cae 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -199,7 +199,7 @@ CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
cstate->file_encoding);
/* if a header has been requested send the line */
- if (cstate->opts.header_line)
+ if (cstate->opts.header_line == COPY_HEADER_TRUE)
{
ListCell *cur;
bool hdr_delim = false;
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c95eb945016..502a45163c8 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -570,8 +570,8 @@ CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
* any CREATE DATABASE commands.
*/
if (!IsBinaryUpgrade)
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
- CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
+ CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
/*
* Iterate through all tablespaces of the template database, and copy each
@@ -673,7 +673,7 @@ CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
* strategy that avoids these problems.
*/
if (!IsBinaryUpgrade)
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
CHECKPOINT_WAIT);
}
@@ -1870,7 +1870,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
* Force a checkpoint to make sure the checkpointer has received the
* message sent by ForgetDatabaseSyncRequests.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/* Close all smgr fds in all backends. */
WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
@@ -2120,8 +2120,8 @@ movedb(const char *dbname, const char *tblspcname)
* On Windows, this also ensures that background procs don't hold any open
* files, which would cause rmdir() to fail.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
- | CHECKPOINT_FLUSH_ALL);
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
+ | CHECKPOINT_FLUSH_UNLOGGED);
/* Close all smgr fds in all backends. */
WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
@@ -2252,7 +2252,7 @@ movedb(const char *dbname, const char *tblspcname)
* any unlogged operations done in the new DB tablespace before the
* next checkpoint.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/*
* Force synchronous commit, thus minimizing the window between
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7e2792ead71..8345bc0264b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3582,6 +3582,7 @@ static void
show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es)
{
Plan *plan = ((PlanState *) mstate)->plan;
+ Memoize *mplan = (Memoize *) plan;
ListCell *lc;
List *context;
StringInfoData keystr;
@@ -3602,7 +3603,7 @@ show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es)
plan,
ancestors);
- foreach(lc, ((Memoize *) plan)->param_exprs)
+ foreach(lc, mplan->param_exprs)
{
Node *expr = (Node *) lfirst(lc);
@@ -3618,6 +3619,24 @@ show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es)
pfree(keystr.data);
+ if (es->costs)
+ {
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ ExplainIndentText(es);
+ appendStringInfo(es->str, "Estimates: capacity=%u distinct keys=%.0f lookups=%.0f hit percent=%.2f%%\n",
+ mplan->est_entries, mplan->est_unique_keys,
+ mplan->est_calls, mplan->est_hit_ratio * 100.0);
+ }
+ else
+ {
+ ExplainPropertyUInteger("Estimated Capacity", NULL, mplan->est_entries, es);
+ ExplainPropertyFloat("Estimated Distinct Lookup Keys", NULL, mplan->est_unique_keys, 0, es);
+ ExplainPropertyFloat("Estimated Lookups", NULL, mplan->est_calls, 0, es);
+ ExplainPropertyFloat("Estimated Hit Percent", NULL, mplan->est_hit_ratio * 100.0, 2, es);
+ }
+ }
+
if (!es->analyze)
return;
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 8d2d7431544..77f8461f42e 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -1588,6 +1588,7 @@ ImportForeignSchema(ImportForeignSchemaStmt *stmt)
pstmt->utilityStmt = (Node *) cstmt;
pstmt->stmt_location = rs->stmt_location;
pstmt->stmt_len = rs->stmt_len;
+ pstmt->planOrigin = PLAN_STMT_INTERNAL;
/* Execute statement */
ProcessUtility(pstmt, cmd, false,
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index c3ec2076a52..6f753ab6d7a 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2469,8 +2469,8 @@ GetOperatorFromCompareType(Oid opclass, Oid rhstype, CompareType cmptype,
cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) :
cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) :
cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0,
- errdetail("Could not translate compare type %d for operator family \"%s\", input type %s, access method \"%s\".",
- cmptype, get_opfamily_name(opfamily, false), format_type_be(opcintype), get_am_name(amid)));
+ errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".",
+ cmptype, get_opfamily_name(opfamily, false), get_am_name(amid)));
/*
* We parameterize rhstype so foreign keys can ask for a <@ operator
@@ -2592,7 +2592,9 @@ makeObjectName(const char *name1, const char *name2, const char *label)
* constraint names.)
*
* Note: it is theoretically possible to get a collision anyway, if someone
- * else chooses the same name concurrently. This is fairly unlikely to be
+ * else chooses the same name concurrently. We shorten the race condition
+ * window by checking for conflicting relations using SnapshotDirty, but
+ * that doesn't close the window entirely. This is fairly unlikely to be
* a problem in practice, especially if one is holding an exclusive lock on
* the relation identified by name1. However, if choosing multiple names
* within a single command, you'd better create the new object and do
@@ -2608,15 +2610,45 @@ ChooseRelationName(const char *name1, const char *name2,
int pass = 0;
char *relname = NULL;
char modlabel[NAMEDATALEN];
+ SnapshotData SnapshotDirty;
+ Relation pgclassrel;
+
+ /* prepare to search pg_class with a dirty snapshot */
+ InitDirtySnapshot(SnapshotDirty);
+ pgclassrel = table_open(RelationRelationId, AccessShareLock);
/* try the unmodified label first */
strlcpy(modlabel, label, sizeof(modlabel));
for (;;)
{
+ ScanKeyData key[2];
+ SysScanDesc scan;
+ bool collides;
+
relname = makeObjectName(name1, name2, modlabel);
- if (!OidIsValid(get_relname_relid(relname, namespaceid)))
+ /* is there any conflicting relation name? */
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(relname));
+ ScanKeyInit(&key[1],
+ Anum_pg_class_relnamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(namespaceid));
+
+ scan = systable_beginscan(pgclassrel, ClassNameNspIndexId,
+ true /* indexOK */ ,
+ &SnapshotDirty,
+ 2, key);
+
+ collides = HeapTupleIsValid(systable_getnext(scan));
+
+ systable_endscan(scan);
+
+ /* break out of loop if no conflict */
+ if (!collides)
{
if (!isconstraint ||
!ConstraintNameExists(relname, namespaceid))
@@ -2628,6 +2660,8 @@ ChooseRelationName(const char *name1, const char *name2,
snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
}
+ table_close(pgclassrel, AccessShareLock);
+
return relname;
}
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 27c2cb26ef5..188e26f0e6e 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -835,7 +835,8 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
if (!foundUniqueIndex)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("could not find suitable unique index on materialized view"));
+ errmsg("could not find suitable unique index on materialized view \"%s\"",
+ RelationGetRelationName(matviewRel)));
appendStringInfoString(&querybuf,
" AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 0b23d94c38e..803c26ab216 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -2113,25 +2113,25 @@ AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
static char
defGetGeneratedColsOption(DefElem *def)
{
- char *sval;
+ char *sval = "";
/*
- * If no parameter value given, assume "stored" is meant.
+ * A parameter value is required.
*/
- if (!def->arg)
- return PUBLISH_GENCOLS_STORED;
-
- sval = defGetString(def);
+ if (def->arg)
+ {
+ sval = defGetString(def);
- if (pg_strcasecmp(sval, "none") == 0)
- return PUBLISH_GENCOLS_NONE;
- if (pg_strcasecmp(sval, "stored") == 0)
- return PUBLISH_GENCOLS_STORED;
+ if (pg_strcasecmp(sval, "none") == 0)
+ return PUBLISH_GENCOLS_NONE;
+ if (pg_strcasecmp(sval, "stored") == 0)
+ return PUBLISH_GENCOLS_STORED;
+ }
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("%s requires a \"none\" or \"stored\" value",
- def->defname));
+ errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
+ errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
}
diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c
index 546160f0941..0f03d9743d2 100644
--- a/src/backend/commands/schemacmds.c
+++ b/src/backend/commands/schemacmds.c
@@ -215,6 +215,7 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString,
wrapper->utilityStmt = stmt;
wrapper->stmt_location = stmt_location;
wrapper->stmt_len = stmt_len;
+ wrapper->planOrigin = PLAN_STMT_INTERNAL;
/* do this step */
ProcessUtility(wrapper,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4aec73bcc6b..cd6c3684482 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
@@ -71,8 +72,9 @@
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_FAILOVER 0x00002000
-#define SUBOPT_LSN 0x00004000
-#define SUBOPT_ORIGIN 0x00008000
+#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
+#define SUBOPT_LSN 0x00008000
+#define SUBOPT_ORIGIN 0x00010000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -98,6 +100,7 @@ typedef struct SubOpts
bool passwordrequired;
bool runasowner;
bool failover;
+ bool retaindeadtuples;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -105,8 +108,10 @@ typedef struct SubOpts
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin(WalReceiverConn *wrconn,
List *publications, bool copydata,
- char *origin, Oid *subrel_local_oids,
- int subrel_count, char *subname);
+ bool retain_dead_tuples, char *origin,
+ Oid *subrel_local_oids, int subrel_count,
+ char *subname);
+static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_FAILOVER))
opts->failover = false;
+ if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ opts->retaindeadtuples = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (strcmp(opts->slot_name, "none") == 0)
opts->slot_name = NULL;
else
- ReplicationSlotValidateName(opts->slot_name, ERROR);
+ ReplicationSlotValidateName(opts->slot_name, false, ERROR);
}
else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
strcmp(defel->defname, "copy_data") == 0)
@@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_FAILOVER;
opts->failover = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
+ strcmp(defel->defname, "retain_dead_tuples") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
+ opts->retaindeadtuples = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
stmt->subname)));
}
+ /* Ensure that we can enable retain_dead_tuples */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
opts.slot_name = stmt->subname;
@@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
- opts.origin, NULL, 0, stmt->subname);
+ opts.retaindeadtuples, opts.origin,
+ NULL, 0, stmt->subname);
+
+ if (opts.retaindeadtuples)
+ check_pub_dead_tuple_retention(wrconn);
/*
* Set sync state based on if we were asked to do data copy or
@@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
sizeof(Oid), oid_cmp);
check_publications_origin(wrconn, sub->publications, copy_data,
- sub->origin, subrel_local_oids,
- subrel_count, sub->name);
+ sub->retaindeadtuples, sub->origin,
+ subrel_local_oids, subrel_count, sub->name);
/*
* Rels that we want to remove from subscription and drop any slots
@@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
- * Common checks for altering failover and two_phase options.
+ * Common checks for altering failover, two_phase, and retain_dead_tuples
+ * options.
*/
static void
CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel)
{
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0 ||
+ strcmp(option, "retain_dead_tuples") == 0);
+
/*
- * The checks in this function are required only for failover and
- * two_phase options.
+ * Altering the retain_dead_tuples option does not update the slot on the
+ * publisher.
*/
- Assert(strcmp(option, "failover") == 0 ||
- strcmp(option, "two_phase") == 0);
+ Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
/*
* Do not allow changing the option if the subscription is enabled. This
@@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option,
* the publisher by the existing walsender, so we could have allowed that
* even when the subscription is enabled. But we kept this restriction for
* the sake of consistency and simplicity.
+ *
+ * Additionally, do not allow changing the retain_dead_tuples option when
+ * the subscription is enabled to prevent race conditions arising from the
+ * new option value being acknowledged asynchronously by the launcher and
+ * apply workers.
+ *
+ * Without the restriction, a race condition may arise when a user
+ * disables and immediately re-enables the retain_dead_tuples option. In
+ * this case, the launcher might drop the slot upon noticing the disabled
+ * action, while the apply worker may keep maintaining
+ * oldest_nonremovable_xid without noticing the option change. During this
+ * period, a transaction ID wraparound could falsely make this ID appear
+ * as if it originates from the future w.r.t the transaction ID stored in
+ * the slot maintained by launcher.
+ *
+ * Similarly, if the user enables retain_dead_tuples concurrently with the
+ * launcher starting the worker, the apply worker may start calculating
+ * oldest_nonremovable_xid before the launcher notices the enable action.
+ * Consequently, the launcher may update slot.xmin to a newer value than
+ * that maintained by the worker. In subsequent cycles, upon integrating
+ * the worker's oldest_nonremovable_xid, the launcher might detect a
+ * retreat in the calculated xmin, necessitating additional handling.
+ *
+ * XXX To address the above race conditions, we can define
+ * oldest_nonremovable_xid as FullTransactionID and adds the check to
+ * disallow retreating the conflict slot's xmin. For now, we kept the
+ * implementation simple by disallowing change to the retain_dead_tuples,
+ * but in the future we can change this after some more analysis.
+ *
+ * Note that we could restrict only the enabling of retain_dead_tuples to
+ * avoid the race conditions described above, but we maintain the
+ * restriction for both enable and disable operations for the sake of
+ * consistency.
*/
if (sub->enabled)
ereport(ERROR,
@@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
bool update_tuple = false;
bool update_failover = false;
bool update_two_phase = false;
+ bool check_pub_rdt = false;
+ bool retain_dead_tuples;
+ char *origin;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
sub = GetSubscription(subid, false);
+ retain_dead_tuples = sub->retaindeadtuples;
+ origin = sub->origin;
+
/*
* Don't allow non-superuser modification of a subscription with
* password_required=false.
@@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1267,7 +1337,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("slot_name and two_phase cannot be altered at the same time")));
+ errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
/*
* Note that workers may still survive even if the
@@ -1283,7 +1353,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (logicalrep_workers_find(subid, true, true))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot alter two_phase when logical replication worker is still running"),
+ errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
errhint("Try again after some time.")));
/*
@@ -1297,7 +1367,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
LookupGXactBySubid(subid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot disable two_phase when prepared transactions are present"),
+ errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
errhint("Resolve these transactions and try again.")));
/* Change system catalog accordingly */
@@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subfailover - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ {
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
+ replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+
+ CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
+
+ /*
+ * Workers may continue running even after the
+ * subscription has been disabled.
+ *
+ * To prevent race conditions (as described in
+ * CheckAlterSubOption()), ensure that all worker
+ * processes have already exited before proceeding.
+ */
+ if (logicalrep_workers_find(subid, true, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
+ errhint("Try again after some time.")));
+
+ /*
+ * Remind the user that enabling subscription will prevent
+ * the accumulation of dead tuples.
+ */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
+
+ /*
+ * Notify the launcher to manage the replication slot for
+ * conflict detection. This ensures that replication slot
+ * is efficiently handled (created, updated, or dropped)
+ * in response to any configuration changes.
+ */
+ ApplyLauncherWakeupAtCommit();
+
+ check_pub_rdt = opts.retaindeadtuples;
+ retain_dead_tuples = opts.retaindeadtuples;
+ }
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
replaces[Anum_pg_subscription_suborigin - 1] = true;
+
+ /*
+ * Check if changes from different origins may be received
+ * from the publisher when the origin is changed to ANY
+ * and retain_dead_tuples is enabled.
+ */
+ check_pub_rdt = retain_dead_tuples &&
+ pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
+
+ origin = opts.origin;
}
update_tuple = true;
@@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
+ /*
+ * Check track_commit_timestamp only when enabling the
+ * subscription in case it was disabled after creation. See
+ * comments atop CheckSubDeadTupleRetention() for details.
+ */
+ if (sub->retaindeadtuples)
+ CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+ WARNING);
+
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
@@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
ApplyLauncherWakeupAtCommit();
update_tuple = true;
+
+ /*
+ * The subscription might be initially created with
+ * connect=false and retain_dead_tuples=true, meaning the
+ * remote server's status may not be checked. Ensure this
+ * check is conducted now.
+ */
+ check_pub_rdt = sub->retaindeadtuples && opts.enabled;
break;
}
@@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
CStringGetTextDatum(stmt->conninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
update_tuple = true;
+
+ /*
+ * Since the remote server configuration might have changed,
+ * perform a check to ensure it permits enabling
+ * retain_dead_tuples.
+ */
+ check_pub_rdt = sub->retaindeadtuples;
break;
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
@@ -1539,7 +1684,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
+ errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
LSN_FORMAT_ARGS(opts.lsn),
LSN_FORMAT_ARGS(remote_lsn))));
}
@@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering the slot, if
- * needed.
+ * Try to acquire the connection necessary either for modifying the slot
+ * or for checking if the remote server permits enabling
+ * retain_dead_tuples.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (update_failover || update_two_phase)
+ if (update_failover || update_two_phase || check_pub_rdt)
{
bool must_use_password;
char *err;
@@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
+ /*
+ * Try to connect to the publisher, using the new connection string if
+ * available.
+ */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
- sub->name, &err);
+ wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
+ true, true, must_use_password, sub->name,
+ &err);
if (!wrconn)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname,
- update_failover ? &opts.failover : NULL,
- update_two_phase ? &opts.twophase : NULL);
+ if (retain_dead_tuples)
+ check_pub_dead_tuple_retention(wrconn);
+
+ check_publications_origin(wrconn, sub->publications, false,
+ retain_dead_tuples, origin, NULL, 0,
+ sub->name);
+
+ if (update_failover || update_two_phase)
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* Check and log a warning if the publisher has subscribed to the same table,
* its partition ancestors (if it's a partition), or its partition children (if
* it's a partitioned table), from some other publishers. This check is
- * required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * required in the following scenarios:
*
- * This check need not be performed on the tables that are already added
- * because incremental sync for those tables will happen through WAL and the
- * origin of the data can be identified from the WAL records.
+ * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "copy_data = true" and "origin = none":
+ * - Warn the user that data with an origin might have been copied.
+ * - This check is skipped for tables already added, as incremental sync via
+ * WAL allows origin tracking. The list of such tables is in
+ * subrel_local_oids.
*
- * subrel_local_oids contains the list of relation oids that are already
- * present on the subscriber.
+ * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "retain_dead_tuples = true" and "origin = any", and for ALTER
+ * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
+ * when the publisher's status changes (e.g., due to a connection string
+ * update):
+ * - Warn the user that only conflict detection info for local changes on
+ * the publisher is retained. Data from other origins may lack sufficient
+ * details for reliable conflict detection.
+ * - See comments atop worker.c for more details.
*/
static void
check_publications_origin(WalReceiverConn *wrconn, List *publications,
- bool copydata, char *origin, Oid *subrel_local_oids,
+ bool copydata, bool retain_dead_tuples,
+ char *origin, Oid *subrel_local_oids,
int subrel_count, char *subname)
{
WalRcvExecResult *res;
@@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
Oid tableRow[1] = {TEXTOID};
List *publist = NIL;
int i;
+ bool check_rdt;
+ bool check_table_sync;
+ bool origin_none = origin &&
+ pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
+
+ /*
+ * Enable retain_dead_tuples checks only when origin is set to 'any',
+ * since with origin='none' only local changes are replicated to the
+ * subscriber.
+ */
+ check_rdt = retain_dead_tuples && !origin_none;
+
+ /*
+ * Enable table synchronization checks only when origin is 'none', to
+ * ensure that data from other origins is not inadvertently copied.
+ */
+ check_table_sync = copydata && origin_none;
- if (!copydata || !origin ||
- (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+ /* retain_dead_tuples and table sync checks occur separately */
+ Assert(!(check_rdt && check_table_sync));
+
+ /* Return if no checks are required */
+ if (!check_rdt && !check_table_sync)
return;
initStringInfo(&cmd);
@@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
/*
* In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
* the list of relation oids that are already present on the subscriber.
- * This check should be skipped for these tables.
+ * This check should be skipped for these tables if checking for table
+ * sync scenario. However, when handling the retain_dead_tuples scenario,
+ * ensure all tables are checked, as some existing tables may now include
+ * changes from other origins due to newly created subscriptions on the
+ * publisher.
*/
- for (i = 0; i < subrel_count; i++)
+ if (check_table_sync)
{
- Oid relid = subrel_local_oids[i];
- char *schemaname = get_namespace_name(get_rel_namespace(relid));
- char *tablename = get_rel_name(relid);
+ for (i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
- appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
- schemaname, tablename);
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
}
res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
@@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
* XXX: For simplicity, we don't check whether the table has any data or
* not. If the table doesn't have any data then we don't need to
* distinguish between data having origin and data not having origin so we
- * can avoid logging a warning in that case.
+ * can avoid logging a warning for table sync scenario.
*/
if (publist)
{
StringInfo pubnames = makeStringInfo();
+ StringInfo err_msg = makeStringInfo();
+ StringInfo err_hint = makeStringInfo();
/* Prepare the list of publication(s) for warning message. */
GetPublicationsStr(publist, pubnames, false);
+
+ if (check_table_sync)
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+ subname);
+ appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
+ }
+ else
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
+ subname);
+ appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
+ }
+
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
- subname),
- errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
- "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
+ errmsg_internal("%s", err_msg->data),
+ errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
+ "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
list_length(publist), pubnames->data),
- errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+ errhint_internal("%s", err_hint->data));
}
ExecDropSingleTupleTableSlot(slot);
@@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
}
/*
+ * Determine whether the retain_dead_tuples can be enabled based on the
+ * publisher's status.
+ *
+ * This option is disallowed if the publisher is running a version earlier
+ * than the PG19, or if the publisher is in recovery (i.e., it is a standby
+ * server).
+ *
+ * See comments atop worker.c for a detailed explanation.
+ */
+static void
+check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
+{
+ WalRcvExecResult *res;
+ Oid RecoveryRow[1] = {BOOLOID};
+ TupleTableSlot *slot;
+ bool isnull;
+ bool remote_in_recovery;
+
+ if (walrcv_server_version(wrconn) < 19000)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
+
+ res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not obtain recovery progress from the publisher: %s",
+ res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ elog(ERROR, "failed to fetch tuple for the recovery progress");
+
+ remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
+
+ if (remote_in_recovery)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Check if the subscriber's configuration is adequate to enable the
+ * retain_dead_tuples option.
+ *
+ * Issue an ERROR if the wal_level does not support the use of replication
+ * slots when check_guc is set to true.
+ *
+ * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
+ * set to true. This is only to highlight the importance of enabling
+ * track_commit_timestamp instead of catching all the misconfigurations, as
+ * this setting can be adjusted after subscription creation. Without it, the
+ * apply worker will simply skip conflict detection.
+ *
+ * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
+ * ERROR since users can only modify retain_dead_tuples for disabled
+ * subscriptions. And as long as the subscription is enabled promptly, it will
+ * not pose issues.
+ */
+void
+CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+ int elevel_for_sub_disabled)
+{
+ Assert(elevel_for_sub_disabled == NOTICE ||
+ elevel_for_sub_disabled == WARNING);
+
+ if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+ errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+ if (check_guc && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (sub_disabled)
+ ereport(elevel_for_sub_disabled,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+ (elevel_for_sub_disabled > NOTICE)
+ ? errhint("Consider setting %s to false.",
+ "retain_dead_tuples") : 0);
+}
+
+/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index ea96947d813..cb811520c29 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2711,8 +2711,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
RelationGetRelationName(relation))));
/* If existing rel is temp, it must belong to this session */
- if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
- !relation->rd_islocaltemp)
+ if (RELATION_IS_OTHER_TEMP(relation))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg(!is_partition
@@ -7374,7 +7373,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* make sure datatype is legal for a column */
CheckAttributeType(NameStr(attribute->attname), attribute->atttypid, attribute->attcollation,
list_make1_oid(rel->rd_rel->reltype),
- 0);
+ (attribute->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL ? CHKATYPE_IS_VIRTUAL : 0));
InsertPgAttributeTuples(attrdesc, tupdesc, myrelid, NULL, NULL);
@@ -8609,7 +8608,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName,
rel->rd_att->constr && rel->rd_att->constr->num_check > 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns on tables with check constraints"),
+ errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables with check constraints"),
errdetail("Column \"%s\" of relation \"%s\" is a virtual generated column.",
colName, RelationGetRelationName(rel))));
@@ -8627,7 +8626,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName,
GetRelationPublications(RelationGetRelid(rel)) != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns on tables that are part of a publication"),
+ errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables that are part of a publication"),
errdetail("Column \"%s\" of relation \"%s\" is a virtual generated column.",
colName, RelationGetRelationName(rel))));
@@ -10189,7 +10188,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (pk_has_without_overlaps && !with_period)
ereport(ERROR,
errcode(ERRCODE_INVALID_FOREIGN_KEY),
- errmsg("foreign key must use PERIOD when referencing a primary using WITHOUT OVERLAPS"));
+ errmsg("foreign key must use PERIOD when referencing a primary key using WITHOUT OVERLAPS"));
/*
* Now we can check permissions.
@@ -10330,8 +10329,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
for_overlaps
? errmsg("could not identify an overlaps operator for foreign key")
: errmsg("could not identify an equality operator for foreign key"),
- errdetail("Could not translate compare type %d for operator family \"%s\", input type %s, access method \"%s\".",
- cmptype, get_opfamily_name(opfamily, false), format_type_be(opcintype), get_am_name(amid)));
+ errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".",
+ cmptype, get_opfamily_name(opfamily, false), get_am_name(amid)));
/*
* There had better be a primary equality operator for the index.
@@ -12913,8 +12912,9 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
con->contype != CONSTRAINT_NOTNULL)
ereport(ERROR,
errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key, check, or not-null constraint",
- constrName, RelationGetRelationName(rel)));
+ errmsg("cannot validate constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel)),
+ errdetail("This operation is not supported for this type of constraint."));
if (!con->conenforced)
ereport(ERROR,
@@ -14426,7 +14426,7 @@ ATPrepAlterColumnType(List **wqueue,
/* make sure datatype is legal for a column */
CheckAttributeType(colName, targettype, targetcollid,
list_make1_oid(rel->rd_rel->reltype),
- 0);
+ (attTup->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL ? CHKATYPE_IS_VIRTUAL : 0));
if (attTup->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
{
@@ -14484,6 +14484,9 @@ ATPrepAlterColumnType(List **wqueue,
/* Fix collations after all else */
assign_expr_collations(pstate, transform);
+ /* Expand virtual generated columns in the expr. */
+ transform = expand_generated_columns_in_expr(transform, rel, 1);
+
/* Plan the expr now so we can accurately assess the need to rewrite. */
transform = (Node *) expression_planner((Expr *) transform);
@@ -15411,9 +15414,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
/*
* Re-parse the index and constraint definitions, and attach them to the
* appropriate work queue entries. We do this before dropping because in
- * the case of a FOREIGN KEY constraint, we might not yet have exclusive
- * lock on the table the constraint is attached to, and we need to get
- * that before reparsing/dropping.
+ * the case of a constraint on another table, we might not yet have
+ * exclusive lock on the table the constraint is attached to, and we need
+ * to get that before reparsing/dropping. (That's possible at least for
+ * FOREIGN KEY, CHECK, and EXCLUSION constraints; in non-FK cases it
+ * requires a dependency on the target table's composite type in the other
+ * table's constraint expressions.)
*
* We can't rely on the output of deparsing to tell us which relation to
* operate on, because concurrent activity might have made the name
@@ -15429,7 +15435,6 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
Form_pg_constraint con;
Oid relid;
Oid confrelid;
- char contype;
bool conislocal;
tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(oldId));
@@ -15446,7 +15451,6 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
elog(ERROR, "could not identify relation associated with constraint %u", oldId);
}
confrelid = con->confrelid;
- contype = con->contype;
conislocal = con->conislocal;
ReleaseSysCache(tup);
@@ -15464,12 +15468,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
continue;
/*
- * When rebuilding an FK constraint that references the table we're
- * modifying, we might not yet have any lock on the FK's table, so get
- * one now. We'll need AccessExclusiveLock for the DROP CONSTRAINT
- * step, so there's no value in asking for anything weaker.
+ * When rebuilding another table's constraint that references the
+ * table we're modifying, we might not yet have any lock on the other
+ * table, so get one now. We'll need AccessExclusiveLock for the DROP
+ * CONSTRAINT step, so there's no value in asking for anything weaker.
*/
- if (relid != tab->relid && contype == CONSTRAINT_FOREIGN)
+ if (relid != tab->relid)
LockRelationOid(relid, AccessExclusiveLock);
ATPostAlterTypeParse(oldId, relid, confrelid,
@@ -15483,6 +15487,14 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
Oid relid;
relid = IndexGetRelation(oldId, false);
+
+ /*
+ * As above, make sure we have lock on the index's table if it's not
+ * the same table.
+ */
+ if (relid != tab->relid)
+ LockRelationOid(relid, AccessExclusiveLock);
+
ATPostAlterTypeParse(oldId, relid, InvalidOid,
(char *) lfirst(def_item),
wqueue, lockmode, tab->rewrite);
@@ -15499,6 +15511,20 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
Oid relid;
relid = StatisticsGetRelation(oldId, false);
+
+ /*
+ * As above, make sure we have lock on the statistics object's table
+ * if it's not the same table. However, we take
+ * ShareUpdateExclusiveLock here, aligning with the lock level used in
+ * CreateStatistics and RemoveStatisticsById.
+ *
+ * CAUTION: this should be done after all cases that grab
+ * AccessExclusiveLock, else we risk causing deadlock due to needing
+ * to promote our table lock.
+ */
+ if (relid != tab->relid)
+ LockRelationOid(relid, ShareUpdateExclusiveLock);
+
ATPostAlterTypeParse(oldId, relid, InvalidOid,
(char *) lfirst(def_item),
wqueue, lockmode, tab->rewrite);
@@ -15722,7 +15748,7 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd,
{
AlterDomainStmt *stmt = (AlterDomainStmt *) stm;
- if (stmt->subtype == 'C') /* ADD CONSTRAINT */
+ if (stmt->subtype == AD_AddConstraint)
{
Constraint *con = castNode(Constraint, stmt->def);
AlterTableCmd *cmd = makeNode(AlterTableCmd);
@@ -17225,15 +17251,13 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
RelationGetRelationName(parent_rel))));
/* If parent rel is temp, it must belong to this session */
- if (parent_rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
- !parent_rel->rd_islocaltemp)
+ if (RELATION_IS_OTHER_TEMP(parent_rel))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot inherit from temporary relation of another session")));
/* Ditto for the child */
- if (child_rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
- !child_rel->rd_islocaltemp)
+ if (RELATION_IS_OTHER_TEMP(child_rel))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot inherit to temporary relation of another session")));
@@ -20304,15 +20328,13 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
RelationGetRelationName(rel))));
/* If the parent is temp, it must belong to this session */
- if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
- !rel->rd_islocaltemp)
+ if (RELATION_IS_OTHER_TEMP(rel))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach as partition of temporary relation of another session")));
/* Ditto for the partition */
- if (attachrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
- !attachrel->rd_islocaltemp)
+ if (RELATION_IS_OTHER_TEMP(attachrel))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach temporary relation of another session as partition")));
diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c
index a9005cc7212..df31eace47a 100644
--- a/src/backend/commands/tablespace.c
+++ b/src/backend/commands/tablespace.c
@@ -500,7 +500,7 @@ DropTableSpace(DropTableSpaceStmt *stmt)
* mustn't delete. So instead, we force a checkpoint which will clean
* out any lingering files, and try again.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+ RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
/*
* On Windows, an unlinked file persists in the directory listing
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 67f8e70f9c1..7dc121f73f1 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -80,6 +80,7 @@ static bool GetTupleForTrigger(EState *estate,
ItemPointer tid,
LockTupleMode lockmode,
TupleTableSlot *oldslot,
+ bool do_epq_recheck,
TupleTableSlot **epqslot,
TM_Result *tmresultp,
TM_FailureData *tmfdp);
@@ -2693,7 +2694,8 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
HeapTuple fdw_trigtuple,
TupleTableSlot **epqslot,
TM_Result *tmresult,
- TM_FailureData *tmfd)
+ TM_FailureData *tmfd,
+ bool is_merge_delete)
{
TupleTableSlot *slot = ExecGetTriggerOldSlot(estate, relinfo);
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
@@ -2708,9 +2710,17 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
{
TupleTableSlot *epqslot_candidate = NULL;
+ /*
+ * Get a copy of the on-disk tuple we are planning to delete. In
+ * general, if the tuple has been concurrently updated, we should
+ * recheck it using EPQ. However, if this is a MERGE DELETE action,
+ * we skip this EPQ recheck and leave it to the caller (it must do
+ * additional rechecking, and might end up executing a different
+ * action entirely).
+ */
if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
- LockTupleExclusive, slot, &epqslot_candidate,
- tmresult, tmfd))
+ LockTupleExclusive, slot, !is_merge_delete,
+ &epqslot_candidate, tmresult, tmfd))
return false;
/*
@@ -2800,6 +2810,7 @@ ExecARDeleteTriggers(EState *estate,
tupleid,
LockTupleExclusive,
slot,
+ false,
NULL,
NULL,
NULL);
@@ -2944,7 +2955,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
HeapTuple fdw_trigtuple,
TupleTableSlot *newslot,
TM_Result *tmresult,
- TM_FailureData *tmfd)
+ TM_FailureData *tmfd,
+ bool is_merge_update)
{
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
TupleTableSlot *oldslot = ExecGetTriggerOldSlot(estate, relinfo);
@@ -2965,10 +2977,17 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
{
TupleTableSlot *epqslot_candidate = NULL;
- /* get a copy of the on-disk tuple we are planning to update */
+ /*
+ * Get a copy of the on-disk tuple we are planning to update. In
+ * general, if the tuple has been concurrently updated, we should
+ * recheck it using EPQ. However, if this is a MERGE UPDATE action,
+ * we skip this EPQ recheck and leave it to the caller (it must do
+ * additional rechecking, and might end up executing a different
+ * action entirely).
+ */
if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
- lockmode, oldslot, &epqslot_candidate,
- tmresult, tmfd))
+ lockmode, oldslot, !is_merge_update,
+ &epqslot_candidate, tmresult, tmfd))
return false; /* cancel the update action */
/*
@@ -3142,6 +3161,7 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
tupleid,
LockTupleExclusive,
oldslot,
+ false,
NULL,
NULL,
NULL);
@@ -3298,6 +3318,7 @@ GetTupleForTrigger(EState *estate,
ItemPointer tid,
LockTupleMode lockmode,
TupleTableSlot *oldslot,
+ bool do_epq_recheck,
TupleTableSlot **epqslot,
TM_Result *tmresultp,
TM_FailureData *tmfdp)
@@ -3357,29 +3378,30 @@ GetTupleForTrigger(EState *estate,
if (tmfd.traversed)
{
/*
- * Recheck the tuple using EPQ. For MERGE, we leave this
- * to the caller (it must do additional rechecking, and
- * might end up executing a different action entirely).
+ * Recheck the tuple using EPQ, if requested. Otherwise,
+ * just return that it was concurrently updated.
*/
- if (estate->es_plannedstmt->commandType == CMD_MERGE)
+ if (do_epq_recheck)
{
- if (tmresultp)
- *tmresultp = TM_Updated;
- return false;
+ *epqslot = EvalPlanQual(epqstate,
+ relation,
+ relinfo->ri_RangeTableIndex,
+ oldslot);
+
+ /*
+ * If PlanQual failed for updated tuple - we must not
+ * process this tuple!
+ */
+ if (TupIsNull(*epqslot))
+ {
+ *epqslot = NULL;
+ return false;
+ }
}
-
- *epqslot = EvalPlanQual(epqstate,
- relation,
- relinfo->ri_RangeTableIndex,
- oldslot);
-
- /*
- * If PlanQual failed for updated tuple - we must not
- * process this tuple!
- */
- if (TupIsNull(*epqslot))
+ else
{
- *epqslot = NULL;
+ if (tmresultp)
+ *tmresultp = TM_Updated;
return false;
}
}
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 45ae7472ab5..26d985193ae 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -939,11 +939,19 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt)
break;
case CONSTR_NOTNULL:
- if (nullDefined && !typNotNull)
+ if (nullDefined)
+ {
+ if (!typNotNull)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting NULL/NOT NULL constraints"),
+ parser_errposition(pstate, constr->location));
+
ereport(ERROR,
- errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting NULL/NOT NULL constraints"),
+ errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("redundant NOT NULL constraint definition"),
parser_errposition(pstate, constr->location));
+ }
if (constr->is_no_inherit)
ereport(ERROR,
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 33a33bf6b1c..733ef40ae7c 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -56,6 +56,7 @@
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/guc_hooks.h"
+#include "utils/injection_point.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -123,7 +124,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
MultiXactId minMulti,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
BufferAccessStrategy bstrategy);
static double compute_parallel_delay(void);
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
@@ -464,7 +465,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
}
/* Now go through the common routine */
- vacuum(vacstmt->rels, &params, bstrategy, vac_context, isTopLevel);
+ vacuum(vacstmt->rels, params, bstrategy, vac_context, isTopLevel);
/* Finally, clean up the vacuum memory context */
MemoryContextDelete(vac_context);
@@ -493,7 +494,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
* memory context that will not disappear at transaction commit.
*/
void
-vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
+vacuum(List *relations, const VacuumParams params, BufferAccessStrategy bstrategy,
MemoryContext vac_context, bool isTopLevel)
{
static bool in_vacuum = false;
@@ -502,9 +503,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
volatile bool in_outer_xact,
use_own_xacts;
- Assert(params != NULL);
-
- stmttype = (params->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+ stmttype = (params.options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
/*
* We cannot run VACUUM inside a user transaction block; if we were inside
@@ -514,7 +513,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
*
* ANALYZE (without VACUUM) can run either way.
*/
- if (params->options & VACOPT_VACUUM)
+ if (params.options & VACOPT_VACUUM)
{
PreventInTransactionBlock(isTopLevel, stmttype);
in_outer_xact = false;
@@ -537,7 +536,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
* Build list of relation(s) to process, putting any new data in
* vac_context for safekeeping.
*/
- if (params->options & VACOPT_ONLY_DATABASE_STATS)
+ if (params.options & VACOPT_ONLY_DATABASE_STATS)
{
/* We don't process any tables in this case */
Assert(relations == NIL);
@@ -553,7 +552,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
List *sublist;
MemoryContext old_context;
- sublist = expand_vacuum_rel(vrel, vac_context, params->options);
+ sublist = expand_vacuum_rel(vrel, vac_context, params.options);
old_context = MemoryContextSwitchTo(vac_context);
newrels = list_concat(newrels, sublist);
MemoryContextSwitchTo(old_context);
@@ -561,7 +560,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
relations = newrels;
}
else
- relations = get_all_vacuum_rels(vac_context, params->options);
+ relations = get_all_vacuum_rels(vac_context, params.options);
/*
* Decide whether we need to start/commit our own transactions.
@@ -577,11 +576,11 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
* transaction block, and also in an autovacuum worker, use own
* transactions so we can release locks sooner.
*/
- if (params->options & VACOPT_VACUUM)
+ if (params.options & VACOPT_VACUUM)
use_own_xacts = true;
else
{
- Assert(params->options & VACOPT_ANALYZE);
+ Assert(params.options & VACOPT_ANALYZE);
if (AmAutoVacuumWorkerProcess())
use_own_xacts = true;
else if (in_outer_xact)
@@ -632,13 +631,13 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
{
VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
- if (params->options & VACOPT_VACUUM)
+ if (params.options & VACOPT_VACUUM)
{
if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy))
continue;
}
- if (params->options & VACOPT_ANALYZE)
+ if (params.options & VACOPT_ANALYZE)
{
/*
* If using separate xacts, start one for analyze. Otherwise,
@@ -702,8 +701,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
StartTransactionCommand();
}
- if ((params->options & VACOPT_VACUUM) &&
- !(params->options & VACOPT_SKIP_DATABASE_STATS))
+ if ((params.options & VACOPT_VACUUM) &&
+ !(params.options & VACOPT_SKIP_DATABASE_STATS))
{
/*
* Update pg_database.datfrozenxid, and truncate pg_xact if possible.
@@ -1101,7 +1100,7 @@ get_all_vacuum_rels(MemoryContext vac_context, int options)
* minimum).
*/
bool
-vacuum_get_cutoffs(Relation rel, const VacuumParams *params,
+vacuum_get_cutoffs(Relation rel, const VacuumParams params,
struct VacuumCutoffs *cutoffs)
{
int freeze_min_age,
@@ -1117,10 +1116,10 @@ vacuum_get_cutoffs(Relation rel, const VacuumParams *params,
aggressiveMXIDCutoff;
/* Use mutable copies of freeze age parameters */
- freeze_min_age = params->freeze_min_age;
- multixact_freeze_min_age = params->multixact_freeze_min_age;
- freeze_table_age = params->freeze_table_age;
- multixact_freeze_table_age = params->multixact_freeze_table_age;
+ freeze_min_age = params.freeze_min_age;
+ multixact_freeze_min_age = params.multixact_freeze_min_age;
+ freeze_table_age = params.freeze_table_age;
+ multixact_freeze_table_age = params.multixact_freeze_table_age;
/* Set pg_class fields in cutoffs */
cutoffs->relfrozenxid = rel->rd_rel->relfrozenxid;
@@ -1997,7 +1996,7 @@ vac_truncate_clog(TransactionId frozenXID,
* At entry and exit, we are not inside a transaction.
*/
static bool
-vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
+vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params,
BufferAccessStrategy bstrategy)
{
LOCKMODE lmode;
@@ -2008,13 +2007,18 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
Oid save_userid;
int save_sec_context;
int save_nestlevel;
+ VacuumParams toast_vacuum_params;
- Assert(params != NULL);
+ /*
+ * This function scribbles on the parameters, so make a copy early to
+ * avoid affecting the TOAST table (if we do end up recursing to it).
+ */
+ memcpy(&toast_vacuum_params, &params, sizeof(VacuumParams));
/* Begin a transaction for vacuuming this relation */
StartTransactionCommand();
- if (!(params->options & VACOPT_FULL))
+ if (!(params.options & VACOPT_FULL))
{
/*
* In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -2040,7 +2044,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyProc->statusFlags |= PROC_IN_VACUUM;
- if (params->is_wraparound)
+ if (params.is_wraparound)
MyProc->statusFlags |= PROC_VACUUM_FOR_WRAPAROUND;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
LWLockRelease(ProcArrayLock);
@@ -2064,12 +2068,12 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
* vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
* way, we can be sure that no other backend is vacuuming the same table.
*/
- lmode = (params->options & VACOPT_FULL) ?
+ lmode = (params.options & VACOPT_FULL) ?
AccessExclusiveLock : ShareUpdateExclusiveLock;
/* open the relation and get the appropriate lock on it */
- rel = vacuum_open_relation(relid, relation, params->options,
- params->log_min_duration >= 0, lmode);
+ rel = vacuum_open_relation(relid, relation, params.options,
+ params.log_min_duration >= 0, lmode);
/* leave if relation could not be opened or locked */
if (!rel)
@@ -2084,8 +2088,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
* This is only safe to do because we hold a session lock on the main
* relation that prevents concurrent deletion.
*/
- if (OidIsValid(params->toast_parent))
- priv_relid = params->toast_parent;
+ if (OidIsValid(params.toast_parent))
+ priv_relid = params.toast_parent;
else
priv_relid = RelationGetRelid(rel);
@@ -2098,7 +2102,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
*/
if (!vacuum_is_permitted_for_relation(priv_relid,
rel->rd_rel,
- params->options & ~VACOPT_ANALYZE))
+ params.options & ~VACOPT_ANALYZE))
{
relation_close(rel, lmode);
PopActiveSnapshot();
@@ -2169,7 +2173,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
* Set index_cleanup option based on index_cleanup reloption if it wasn't
* specified in VACUUM command, or when running in an autovacuum worker
*/
- if (params->index_cleanup == VACOPTVALUE_UNSPECIFIED)
+ if (params.index_cleanup == VACOPTVALUE_UNSPECIFIED)
{
StdRdOptIndexCleanup vacuum_index_cleanup;
@@ -2180,56 +2184,74 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
((StdRdOptions *) rel->rd_options)->vacuum_index_cleanup;
if (vacuum_index_cleanup == STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO)
- params->index_cleanup = VACOPTVALUE_AUTO;
+ params.index_cleanup = VACOPTVALUE_AUTO;
else if (vacuum_index_cleanup == STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON)
- params->index_cleanup = VACOPTVALUE_ENABLED;
+ params.index_cleanup = VACOPTVALUE_ENABLED;
else
{
Assert(vacuum_index_cleanup ==
STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF);
- params->index_cleanup = VACOPTVALUE_DISABLED;
+ params.index_cleanup = VACOPTVALUE_DISABLED;
}
}
+#ifdef USE_INJECTION_POINTS
+ if (params.index_cleanup == VACOPTVALUE_AUTO)
+ INJECTION_POINT("vacuum-index-cleanup-auto", NULL);
+ else if (params.index_cleanup == VACOPTVALUE_DISABLED)
+ INJECTION_POINT("vacuum-index-cleanup-disabled", NULL);
+ else if (params.index_cleanup == VACOPTVALUE_ENABLED)
+ INJECTION_POINT("vacuum-index-cleanup-enabled", NULL);
+#endif
+
/*
* Check if the vacuum_max_eager_freeze_failure_rate table storage
* parameter was specified. This overrides the GUC value.
*/
if (rel->rd_options != NULL &&
((StdRdOptions *) rel->rd_options)->vacuum_max_eager_freeze_failure_rate >= 0)
- params->max_eager_freeze_failure_rate =
+ params.max_eager_freeze_failure_rate =
((StdRdOptions *) rel->rd_options)->vacuum_max_eager_freeze_failure_rate;
/*
* Set truncate option based on truncate reloption or GUC if it wasn't
* specified in VACUUM command, or when running in an autovacuum worker
*/
- if (params->truncate == VACOPTVALUE_UNSPECIFIED)
+ if (params.truncate == VACOPTVALUE_UNSPECIFIED)
{
StdRdOptions *opts = (StdRdOptions *) rel->rd_options;
if (opts && opts->vacuum_truncate_set)
{
if (opts->vacuum_truncate)
- params->truncate = VACOPTVALUE_ENABLED;
+ params.truncate = VACOPTVALUE_ENABLED;
else
- params->truncate = VACOPTVALUE_DISABLED;
+ params.truncate = VACOPTVALUE_DISABLED;
}
else if (vacuum_truncate)
- params->truncate = VACOPTVALUE_ENABLED;
+ params.truncate = VACOPTVALUE_ENABLED;
else
- params->truncate = VACOPTVALUE_DISABLED;
+ params.truncate = VACOPTVALUE_DISABLED;
}
+#ifdef USE_INJECTION_POINTS
+ if (params.truncate == VACOPTVALUE_AUTO)
+ INJECTION_POINT("vacuum-truncate-auto", NULL);
+ else if (params.truncate == VACOPTVALUE_DISABLED)
+ INJECTION_POINT("vacuum-truncate-disabled", NULL);
+ else if (params.truncate == VACOPTVALUE_ENABLED)
+ INJECTION_POINT("vacuum-truncate-enabled", NULL);
+#endif
+
/*
* Remember the relation's TOAST relation for later, if the caller asked
* us to process it. In VACUUM FULL, though, the toast table is
* automatically rebuilt by cluster_rel so we shouldn't recurse to it,
* unless PROCESS_MAIN is disabled.
*/
- if ((params->options & VACOPT_PROCESS_TOAST) != 0 &&
- ((params->options & VACOPT_FULL) == 0 ||
- (params->options & VACOPT_PROCESS_MAIN) == 0))
+ if ((params.options & VACOPT_PROCESS_TOAST) != 0 &&
+ ((params.options & VACOPT_FULL) == 0 ||
+ (params.options & VACOPT_PROCESS_MAIN) == 0))
toast_relid = rel->rd_rel->reltoastrelid;
else
toast_relid = InvalidOid;
@@ -2252,16 +2274,16 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
* table is required (e.g., PROCESS_TOAST is set), we force PROCESS_MAIN
* to be set when we recurse to the TOAST table.
*/
- if (params->options & VACOPT_PROCESS_MAIN)
+ if (params.options & VACOPT_PROCESS_MAIN)
{
/*
* Do the actual work --- either FULL or "lazy" vacuum
*/
- if (params->options & VACOPT_FULL)
+ if (params.options & VACOPT_FULL)
{
ClusterParams cluster_params = {0};
- if ((params->options & VACOPT_VERBOSE) != 0)
+ if ((params.options & VACOPT_VERBOSE) != 0)
cluster_params.options |= CLUOPT_VERBOSE;
/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
@@ -2299,19 +2321,16 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
*/
if (toast_relid != InvalidOid)
{
- VacuumParams toast_vacuum_params;
-
/*
* Force VACOPT_PROCESS_MAIN so vacuum_rel() processes it. Likewise,
* set toast_parent so that the privilege checks are done on the main
* relation. NB: This is only safe to do because we hold a session
* lock on the main relation that prevents concurrent deletion.
*/
- memcpy(&toast_vacuum_params, params, sizeof(VacuumParams));
toast_vacuum_params.options |= VACOPT_PROCESS_MAIN;
toast_vacuum_params.toast_parent = relid;
- vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy);
+ vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy);
}
/*