diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/analyze.c | 30 | ||||
-rw-r--r-- | src/backend/commands/cluster.c | 2 | ||||
-rw-r--r-- | src/backend/commands/copy.c | 42 | ||||
-rw-r--r-- | src/backend/commands/copyfromparse.c | 17 | ||||
-rw-r--r-- | src/backend/commands/copyto.c | 2 | ||||
-rw-r--r-- | src/backend/commands/dbcommands.c | 14 | ||||
-rw-r--r-- | src/backend/commands/explain.c | 21 | ||||
-rw-r--r-- | src/backend/commands/foreigncmds.c | 1 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 42 | ||||
-rw-r--r-- | src/backend/commands/matview.c | 3 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 24 | ||||
-rw-r--r-- | src/backend/commands/schemacmds.c | 1 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 408 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 82 | ||||
-rw-r--r-- | src/backend/commands/tablespace.c | 2 | ||||
-rw-r--r-- | src/backend/commands/trigger.c | 74 | ||||
-rw-r--r-- | src/backend/commands/typecmds.c | 14 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 121 |
18 files changed, 675 insertions, 225 deletions
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 4fffb76e557..40d66537ad7 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -76,7 +76,7 @@ static BufferAccessStrategy vac_strategy; static void do_analyze_rel(Relation onerel, - VacuumParams *params, List *va_cols, + const VacuumParams params, List *va_cols, AcquireSampleRowsFunc acquirefunc, BlockNumber relpages, bool inh, bool in_outer_xact, int elevel); static void compute_index_stats(Relation onerel, double totalrows, @@ -107,7 +107,7 @@ static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull); */ void analyze_rel(Oid relid, RangeVar *relation, - VacuumParams *params, List *va_cols, bool in_outer_xact, + const VacuumParams params, List *va_cols, bool in_outer_xact, BufferAccessStrategy bstrategy) { Relation onerel; @@ -116,7 +116,7 @@ analyze_rel(Oid relid, RangeVar *relation, BlockNumber relpages = 0; /* Select logging level */ - if (params->options & VACOPT_VERBOSE) + if (params.options & VACOPT_VERBOSE) elevel = INFO; else elevel = DEBUG2; @@ -138,8 +138,8 @@ analyze_rel(Oid relid, RangeVar *relation, * * Make sure to generate only logs for ANALYZE in this case. */ - onerel = vacuum_open_relation(relid, relation, params->options & ~(VACOPT_VACUUM), - params->log_min_duration >= 0, + onerel = vacuum_open_relation(relid, relation, params.options & ~(VACOPT_VACUUM), + params.log_min_duration >= 0, ShareUpdateExclusiveLock); /* leave if relation could not be opened or locked */ @@ -155,7 +155,7 @@ analyze_rel(Oid relid, RangeVar *relation, */ if (!vacuum_is_permitted_for_relation(RelationGetRelid(onerel), onerel->rd_rel, - params->options & ~VACOPT_VACUUM)) + params.options & ~VACOPT_VACUUM)) { relation_close(onerel, ShareUpdateExclusiveLock); return; @@ -227,7 +227,7 @@ analyze_rel(Oid relid, RangeVar *relation, else { /* No need for a WARNING if we already complained during VACUUM */ - if (!(params->options & VACOPT_VACUUM)) + if (!(params.options & VACOPT_VACUUM)) ereport(WARNING, (errmsg("skipping \"%s\" --- cannot analyze non-tables or special system tables", RelationGetRelationName(onerel)))); @@ -275,7 +275,7 @@ analyze_rel(Oid relid, RangeVar *relation, * appropriate acquirefunc for each child table. */ static void -do_analyze_rel(Relation onerel, VacuumParams *params, +do_analyze_rel(Relation onerel, const VacuumParams params, List *va_cols, AcquireSampleRowsFunc acquirefunc, BlockNumber relpages, bool inh, bool in_outer_xact, int elevel) @@ -309,9 +309,9 @@ do_analyze_rel(Relation onerel, VacuumParams *params, PgStat_Counter startreadtime = 0; PgStat_Counter startwritetime = 0; - verbose = (params->options & VACOPT_VERBOSE) != 0; + verbose = (params.options & VACOPT_VERBOSE) != 0; instrument = (verbose || (AmAutoVacuumWorkerProcess() && - params->log_min_duration >= 0)); + params.log_min_duration >= 0)); if (inh) ereport(elevel, (errmsg("analyzing \"%s.%s\" inheritance tree", @@ -690,8 +690,8 @@ do_analyze_rel(Relation onerel, VacuumParams *params, * only do it for inherited stats. (We're never called for not-inherited * stats on partitioned tables anyway.) * - * Reset the changes_since_analyze counter only if we analyzed all - * columns; otherwise, there is still work for auto-analyze to do. + * Reset the mod_since_analyze counter only if we analyzed all columns; + * otherwise, there is still work for auto-analyze to do. */ if (!inh) pgstat_report_analyze(onerel, totalrows, totaldeadrows, @@ -706,7 +706,7 @@ do_analyze_rel(Relation onerel, VacuumParams *params, * amvacuumcleanup() when called in ANALYZE-only mode. The only exception * among core index AMs is GIN/ginvacuumcleanup(). */ - if (!(params->options & VACOPT_VACUUM)) + if (!(params.options & VACOPT_VACUUM)) { for (ind = 0; ind < nindexes; ind++) { @@ -736,9 +736,9 @@ do_analyze_rel(Relation onerel, VacuumParams *params, { TimestampTz endtime = GetCurrentTimestamp(); - if (verbose || params->log_min_duration == 0 || + if (verbose || params.log_min_duration == 0 || TimestampDifferenceExceeds(starttime, endtime, - params->log_min_duration)) + params.log_min_duration)) { long delay_in_ms; WalUsage walusage; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 54a08e4102e..b55221d44cd 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -917,7 +917,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * not to be aggressive about this. */ memset(¶ms, 0, sizeof(VacuumParams)); - vacuum_get_cutoffs(OldHeap, ¶ms, &cutoffs); + vacuum_get_cutoffs(OldHeap, params, &cutoffs); /* * FreezeXid will become the table's new relfrozenxid, and that mustn't go diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 74ae42b19a7..fae9c41db65 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -322,11 +322,13 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, } /* - * Extract a CopyHeaderChoice value from a DefElem. This is like - * defGetBoolean() but also accepts the special value "match". + * Extract the CopyFormatOptions.header_line value from a DefElem. + * + * Parses the HEADER option for COPY, which can be a boolean, a non-negative + * integer (number of lines to skip), or the special value "match". */ -static CopyHeaderChoice -defGetCopyHeaderChoice(DefElem *def, bool is_from) +static int +defGetCopyHeaderOption(DefElem *def, bool is_from) { /* * If no parameter value given, assume "true" is meant. @@ -335,20 +337,27 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from) return COPY_HEADER_TRUE; /* - * Allow 0, 1, "true", "false", "on", "off", or "match". + * Allow 0, 1, "true", "false", "on", "off", a non-negative integer, or + * "match". */ switch (nodeTag(def->arg)) { case T_Integer: - switch (intVal(def->arg)) { - case 0: - return COPY_HEADER_FALSE; - case 1: - return COPY_HEADER_TRUE; - default: - /* otherwise, error out below */ - break; + int ival = intVal(def->arg); + + if (ival < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("a negative integer value cannot be " + "specified for %s", def->defname))); + + if (!is_from && ival > 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use multi-line header in COPY TO"))); + + return ival; } break; default: @@ -381,7 +390,8 @@ defGetCopyHeaderChoice(DefElem *def, bool is_from) } ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s requires a Boolean value or \"match\"", + errmsg("%s requires a Boolean value, a non-negative integer, " + "or the string \"match\"", def->defname))); return COPY_HEADER_FALSE; /* keep compiler quiet */ } @@ -566,7 +576,7 @@ ProcessCopyOptions(ParseState *pstate, if (header_specified) errorConflictingDefElem(defel, pstate); header_specified = true; - opts_out->header_line = defGetCopyHeaderChoice(defel, is_from); + opts_out->header_line = defGetCopyHeaderOption(defel, is_from); } else if (strcmp(defel->defname, "quote") == 0) { @@ -769,7 +779,7 @@ ProcessCopyOptions(ParseState *pstate, errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim))); /* Check header */ - if (opts_out->binary && opts_out->header_line) + if (opts_out->binary && opts_out->header_line != COPY_HEADER_FALSE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */ diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f52f2477df1..b1ae97b833d 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -771,21 +771,30 @@ static pg_attribute_always_inline bool NextCopyFromRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; - bool done; + bool done = false; /* only available for text or csv input */ Assert(!cstate->opts.binary); /* on input check that the header line is correct if needed */ - if (cstate->cur_lineno == 0 && cstate->opts.header_line) + if (cstate->cur_lineno == 0 && cstate->opts.header_line != COPY_HEADER_FALSE) { ListCell *cur; TupleDesc tupDesc; + int lines_to_skip = cstate->opts.header_line; + + /* If set to "match", one header line is skipped */ + if (cstate->opts.header_line == COPY_HEADER_MATCH) + lines_to_skip = 1; tupDesc = RelationGetDescr(cstate->rel); - cstate->cur_lineno++; - done = CopyReadLine(cstate, is_csv); + for (int i = 0; i < lines_to_skip; i++) + { + cstate->cur_lineno++; + if ((done = CopyReadLine(cstate, is_csv))) + break; + } if (cstate->opts.header_line == COPY_HEADER_MATCH) { diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index ea6f18f2c80..67b94b91cae 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -199,7 +199,7 @@ CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) cstate->file_encoding); /* if a header has been requested send the line */ - if (cstate->opts.header_line) + if (cstate->opts.header_line == COPY_HEADER_TRUE) { ListCell *cur; bool hdr_delim = false; diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index c95eb945016..502a45163c8 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -570,8 +570,8 @@ CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid, * any CREATE DATABASE commands. */ if (!IsBinaryUpgrade) - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | - CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL); + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | + CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED); /* * Iterate through all tablespaces of the template database, and copy each @@ -673,7 +673,7 @@ CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid, * strategy that avoids these problems. */ if (!IsBinaryUpgrade) - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT); } @@ -1870,7 +1870,7 @@ dropdb(const char *dbname, bool missing_ok, bool force) * Force a checkpoint to make sure the checkpointer has received the * message sent by ForgetDatabaseSyncRequests. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT); /* Close all smgr fds in all backends. */ WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE)); @@ -2120,8 +2120,8 @@ movedb(const char *dbname, const char *tblspcname) * On Windows, this also ensures that background procs don't hold any open * files, which would cause rmdir() to fail. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT - | CHECKPOINT_FLUSH_ALL); + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT + | CHECKPOINT_FLUSH_UNLOGGED); /* Close all smgr fds in all backends. */ WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE)); @@ -2252,7 +2252,7 @@ movedb(const char *dbname, const char *tblspcname) * any unlogged operations done in the new DB tablespace before the * next checkpoint. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT); /* * Force synchronous commit, thus minimizing the window between diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7e2792ead71..8345bc0264b 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3582,6 +3582,7 @@ static void show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es) { Plan *plan = ((PlanState *) mstate)->plan; + Memoize *mplan = (Memoize *) plan; ListCell *lc; List *context; StringInfoData keystr; @@ -3602,7 +3603,7 @@ show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es) plan, ancestors); - foreach(lc, ((Memoize *) plan)->param_exprs) + foreach(lc, mplan->param_exprs) { Node *expr = (Node *) lfirst(lc); @@ -3618,6 +3619,24 @@ show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es) pfree(keystr.data); + if (es->costs) + { + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + appendStringInfo(es->str, "Estimates: capacity=%u distinct keys=%.0f lookups=%.0f hit percent=%.2f%%\n", + mplan->est_entries, mplan->est_unique_keys, + mplan->est_calls, mplan->est_hit_ratio * 100.0); + } + else + { + ExplainPropertyUInteger("Estimated Capacity", NULL, mplan->est_entries, es); + ExplainPropertyFloat("Estimated Distinct Lookup Keys", NULL, mplan->est_unique_keys, 0, es); + ExplainPropertyFloat("Estimated Lookups", NULL, mplan->est_calls, 0, es); + ExplainPropertyFloat("Estimated Hit Percent", NULL, mplan->est_hit_ratio * 100.0, 2, es); + } + } + if (!es->analyze) return; diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index 8d2d7431544..77f8461f42e 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -1588,6 +1588,7 @@ ImportForeignSchema(ImportForeignSchemaStmt *stmt) pstmt->utilityStmt = (Node *) cstmt; pstmt->stmt_location = rs->stmt_location; pstmt->stmt_len = rs->stmt_len; + pstmt->planOrigin = PLAN_STMT_INTERNAL; /* Execute statement */ ProcessUtility(pstmt, cmd, false, diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index c3ec2076a52..6f753ab6d7a 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -2469,8 +2469,8 @@ GetOperatorFromCompareType(Oid opclass, Oid rhstype, CompareType cmptype, cmptype == COMPARE_EQ ? errmsg("could not identify an equality operator for type %s", format_type_be(opcintype)) : cmptype == COMPARE_OVERLAP ? errmsg("could not identify an overlaps operator for type %s", format_type_be(opcintype)) : cmptype == COMPARE_CONTAINED_BY ? errmsg("could not identify a contained-by operator for type %s", format_type_be(opcintype)) : 0, - errdetail("Could not translate compare type %d for operator family \"%s\", input type %s, access method \"%s\".", - cmptype, get_opfamily_name(opfamily, false), format_type_be(opcintype), get_am_name(amid))); + errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".", + cmptype, get_opfamily_name(opfamily, false), get_am_name(amid))); /* * We parameterize rhstype so foreign keys can ask for a <@ operator @@ -2592,7 +2592,9 @@ makeObjectName(const char *name1, const char *name2, const char *label) * constraint names.) * * Note: it is theoretically possible to get a collision anyway, if someone - * else chooses the same name concurrently. This is fairly unlikely to be + * else chooses the same name concurrently. We shorten the race condition + * window by checking for conflicting relations using SnapshotDirty, but + * that doesn't close the window entirely. This is fairly unlikely to be * a problem in practice, especially if one is holding an exclusive lock on * the relation identified by name1. However, if choosing multiple names * within a single command, you'd better create the new object and do @@ -2608,15 +2610,45 @@ ChooseRelationName(const char *name1, const char *name2, int pass = 0; char *relname = NULL; char modlabel[NAMEDATALEN]; + SnapshotData SnapshotDirty; + Relation pgclassrel; + + /* prepare to search pg_class with a dirty snapshot */ + InitDirtySnapshot(SnapshotDirty); + pgclassrel = table_open(RelationRelationId, AccessShareLock); /* try the unmodified label first */ strlcpy(modlabel, label, sizeof(modlabel)); for (;;) { + ScanKeyData key[2]; + SysScanDesc scan; + bool collides; + relname = makeObjectName(name1, name2, modlabel); - if (!OidIsValid(get_relname_relid(relname, namespaceid))) + /* is there any conflicting relation name? */ + ScanKeyInit(&key[0], + Anum_pg_class_relname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(relname)); + ScanKeyInit(&key[1], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(namespaceid)); + + scan = systable_beginscan(pgclassrel, ClassNameNspIndexId, + true /* indexOK */ , + &SnapshotDirty, + 2, key); + + collides = HeapTupleIsValid(systable_getnext(scan)); + + systable_endscan(scan); + + /* break out of loop if no conflict */ + if (!collides) { if (!isconstraint || !ConstraintNameExists(relname, namespaceid)) @@ -2628,6 +2660,8 @@ ChooseRelationName(const char *name1, const char *name2, snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass); } + table_close(pgclassrel, AccessShareLock); + return relname; } diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 27c2cb26ef5..188e26f0e6e 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -835,7 +835,8 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, if (!foundUniqueIndex) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("could not find suitable unique index on materialized view")); + errmsg("could not find suitable unique index on materialized view \"%s\"", + RelationGetRelationName(matviewRel))); appendStringInfoString(&querybuf, " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) " diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0b23d94c38e..803c26ab216 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -2113,25 +2113,25 @@ AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId) static char defGetGeneratedColsOption(DefElem *def) { - char *sval; + char *sval = ""; /* - * If no parameter value given, assume "stored" is meant. + * A parameter value is required. */ - if (!def->arg) - return PUBLISH_GENCOLS_STORED; - - sval = defGetString(def); + if (def->arg) + { + sval = defGetString(def); - if (pg_strcasecmp(sval, "none") == 0) - return PUBLISH_GENCOLS_NONE; - if (pg_strcasecmp(sval, "stored") == 0) - return PUBLISH_GENCOLS_STORED; + if (pg_strcasecmp(sval, "none") == 0) + return PUBLISH_GENCOLS_NONE; + if (pg_strcasecmp(sval, "stored") == 0) + return PUBLISH_GENCOLS_STORED; + } ereport(ERROR, errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s requires a \"none\" or \"stored\" value", - def->defname)); + errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval), + errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored")); return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */ } diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c index 546160f0941..0f03d9743d2 100644 --- a/src/backend/commands/schemacmds.c +++ b/src/backend/commands/schemacmds.c @@ -215,6 +215,7 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString, wrapper->utilityStmt = stmt; wrapper->stmt_location = stmt_location; wrapper->stmt_len = stmt_len; + wrapper->planOrigin = PLAN_STMT_INTERNAL; /* do this step */ ProcessUtility(wrapper, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..cd6c3684482 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retaindeadtuples; char *origin; XLogRecPtr lsn; } SubOpts; @@ -105,8 +108,10 @@ typedef struct SubOpts static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname); + bool retain_dead_tuples, char *origin, + Oid *subrel_local_oids, int subrel_count, + char *subname); +static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + opts->retaindeadtuples = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) && + strcmp(defel->defname, "retain_dead_tuples") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; + opts->retaindeadtuples = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } + /* Ensure that we can enable retain_dead_tuples */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) opts.slot_name = stmt->subname; @@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, - opts.origin, NULL, 0, stmt->subname); + opts.retaindeadtuples, opts.origin, + NULL, 0, stmt->subname); + + if (opts.retaindeadtuples) + check_pub_dead_tuple_retention(wrconn); /* * Set sync state based on if we were asked to do data copy or @@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); check_publications_origin(wrconn, sub->publications, copy_data, - sub->origin, subrel_local_oids, - subrel_count, sub->name); + sub->retaindeadtuples, sub->origin, + subrel_local_oids, subrel_count, sub->name); /* * Rels that we want to remove from subscription and drop any slots @@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Common checks for altering failover and two_phase options. + * Common checks for altering failover, two_phase, and retain_dead_tuples + * options. */ static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel) { + Assert(strcmp(option, "failover") == 0 || + strcmp(option, "two_phase") == 0 || + strcmp(option, "retain_dead_tuples") == 0); + /* - * The checks in this function are required only for failover and - * two_phase options. + * Altering the retain_dead_tuples option does not update the slot on the + * publisher. */ - Assert(strcmp(option, "failover") == 0 || - strcmp(option, "two_phase") == 0); + Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0); /* * Do not allow changing the option if the subscription is enabled. This @@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option, * the publisher by the existing walsender, so we could have allowed that * even when the subscription is enabled. But we kept this restriction for * the sake of consistency and simplicity. + * + * Additionally, do not allow changing the retain_dead_tuples option when + * the subscription is enabled to prevent race conditions arising from the + * new option value being acknowledged asynchronously by the launcher and + * apply workers. + * + * Without the restriction, a race condition may arise when a user + * disables and immediately re-enables the retain_dead_tuples option. In + * this case, the launcher might drop the slot upon noticing the disabled + * action, while the apply worker may keep maintaining + * oldest_nonremovable_xid without noticing the option change. During this + * period, a transaction ID wraparound could falsely make this ID appear + * as if it originates from the future w.r.t the transaction ID stored in + * the slot maintained by launcher. + * + * Similarly, if the user enables retain_dead_tuples concurrently with the + * launcher starting the worker, the apply worker may start calculating + * oldest_nonremovable_xid before the launcher notices the enable action. + * Consequently, the launcher may update slot.xmin to a newer value than + * that maintained by the worker. In subsequent cycles, upon integrating + * the worker's oldest_nonremovable_xid, the launcher might detect a + * retreat in the calculated xmin, necessitating additional handling. + * + * XXX To address the above race conditions, we can define + * oldest_nonremovable_xid as FullTransactionID and adds the check to + * disallow retreating the conflict slot's xmin. For now, we kept the + * implementation simple by disallowing change to the retain_dead_tuples, + * but in the future we can change this after some more analysis. + * + * Note that we could restrict only the enabling of retain_dead_tuples to + * avoid the race conditions described above, but we maintain the + * restriction for both enable and disable operations for the sake of + * consistency. */ if (sub->enabled) ereport(ERROR, @@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool check_pub_rdt = false; + bool retain_dead_tuples; + char *origin; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub = GetSubscription(subid, false); + retain_dead_tuples = sub->retaindeadtuples; + origin = sub->origin; + /* * Don't allow non-superuser modification of a subscription with * password_required=false. @@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1267,7 +1337,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("slot_name and two_phase cannot be altered at the same time"))); + errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time"))); /* * Note that workers may still survive even if the @@ -1283,7 +1353,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (logicalrep_workers_find(subid, true, true)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot alter two_phase when logical replication worker is still running"), + errmsg("cannot alter \"two_phase\" when logical replication worker is still running"), errhint("Try again after some time."))); /* @@ -1297,7 +1367,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, LookupGXactBySubid(subid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot disable two_phase when prepared transactions are present"), + errmsg("cannot disable \"two_phase\" when prepared transactions exist"), errhint("Resolve these transactions and try again."))); /* Change system catalog accordingly */ @@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + { + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); + + /* + * Workers may continue running even after the + * subscription has been disabled. + * + * To prevent race conditions (as described in + * CheckAlterSubOption()), ensure that all worker + * processes have already exited before proceeding. + */ + if (logicalrep_workers_find(subid, true, true)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"), + errhint("Try again after some time."))); + + /* + * Remind the user that enabling subscription will prevent + * the accumulation of dead tuples. + */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + check_pub_rdt = opts.retaindeadtuples; + retain_dead_tuples = opts.retaindeadtuples; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); replaces[Anum_pg_subscription_suborigin - 1] = true; + + /* + * Check if changes from different origins may be received + * from the publisher when the origin is changed to ANY + * and retain_dead_tuples is enabled. + */ + check_pub_rdt = retain_dead_tuples && + pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0; + + origin = opts.origin; } update_tuple = true; @@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); + /* + * Check track_commit_timestamp only when enabling the + * subscription in case it was disabled after creation. See + * comments atop CheckSubDeadTupleRetention() for details. + */ + if (sub->retaindeadtuples) + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_dead_tuples=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + check_pub_rdt = sub->retaindeadtuples && opts.enabled; break; } @@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_dead_tuples. + */ + check_pub_rdt = sub->retaindeadtuples; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1539,7 +1684,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X", + errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X", LSN_FORMAT_ARGS(opts.lsn), LSN_FORMAT_ARGS(remote_lsn)))); } @@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_dead_tuples. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || check_pub_rdt) { bool must_use_password; char *err; @@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + if (retain_dead_tuples) + check_pub_dead_tuple_retention(wrconn); + + check_publications_origin(wrconn, sub->publications, false, + retain_dead_tuples, origin, NULL, 0, + sub->name); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * Check and log a warning if the publisher has subscribed to the same table, * its partition ancestors (if it's a partition), or its partition children (if * it's a partitioned table), from some other publishers. This check is - * required only if "copy_data = true" and "origin = none" for CREATE - * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the - * user that data having origin might have been copied. + * required in the following scenarios: * - * This check need not be performed on the tables that are already added - * because incremental sync for those tables will happen through WAL and the - * origin of the data can be identified from the WAL records. + * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "copy_data = true" and "origin = none": + * - Warn the user that data with an origin might have been copied. + * - This check is skipped for tables already added, as incremental sync via + * WAL allows origin tracking. The list of such tables is in + * subrel_local_oids. * - * subrel_local_oids contains the list of relation oids that are already - * present on the subscriber. + * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "retain_dead_tuples = true" and "origin = any", and for ALTER + * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or + * when the publisher's status changes (e.g., due to a connection string + * update): + * - Warn the user that only conflict detection info for local changes on + * the publisher is retained. Data from other origins may lack sufficient + * details for reliable conflict detection. + * - See comments atop worker.c for more details. */ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, - bool copydata, char *origin, Oid *subrel_local_oids, + bool copydata, bool retain_dead_tuples, + char *origin, Oid *subrel_local_oids, int subrel_count, char *subname) { WalRcvExecResult *res; @@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, Oid tableRow[1] = {TEXTOID}; List *publist = NIL; int i; + bool check_rdt; + bool check_table_sync; + bool origin_none = origin && + pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0; + + /* + * Enable retain_dead_tuples checks only when origin is set to 'any', + * since with origin='none' only local changes are replicated to the + * subscriber. + */ + check_rdt = retain_dead_tuples && !origin_none; + + /* + * Enable table synchronization checks only when origin is 'none', to + * ensure that data from other origins is not inadvertently copied. + */ + check_table_sync = copydata && origin_none; - if (!copydata || !origin || - (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + /* retain_dead_tuples and table sync checks occur separately */ + Assert(!(check_rdt && check_table_sync)); + + /* Return if no checks are required */ + if (!check_rdt && !check_table_sync) return; initStringInfo(&cmd); @@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, /* * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables. + * This check should be skipped for these tables if checking for table + * sync scenario. However, when handling the retain_dead_tuples scenario, + * ensure all tables are checked, as some existing tables may now include + * changes from other origins due to newly created subscriptions on the + * publisher. */ - for (i = 0; i < subrel_count; i++) + if (check_table_sync) { - Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, * XXX: For simplicity, we don't check whether the table has any data or * not. If the table doesn't have any data then we don't need to * distinguish between data having origin and data not having origin so we - * can avoid logging a warning in that case. + * can avoid logging a warning for table sync scenario. */ if (publist) { StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); /* Prepare the list of publication(s) for warning message. */ GetPublicationsStr(publist, pubnames, false); + + if (check_table_sync) + { + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + else + { + appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"), + subname); + appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples.")); + } + ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", - subname), - errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", - "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.", + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.", list_length(publist), pubnames->data), - errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + errhint_internal("%s", err_hint->data)); } ExecDropSingleTupleTableSlot(slot); @@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, } /* + * Determine whether the retain_dead_tuples can be enabled based on the + * publisher's status. + * + * This option is disallowed if the publisher is running a version earlier + * than the PG19, or if the publisher is in recovery (i.e., it is a standby + * server). + * + * See comments atop worker.c for a detailed explanation. + */ +static void +check_pub_dead_tuple_retention(WalReceiverConn *wrconn) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (walrcv_server_version(wrconn) < 19000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_dead_tuples if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Check if the subscriber's configuration is adequate to enable the + * retain_dead_tuples option. + * + * Issue an ERROR if the wal_level does not support the use of replication + * slots when check_guc is set to true. + * + * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is + * set to true. This is only to highlight the importance of enabling + * track_commit_timestamp instead of catching all the misconfigurations, as + * this setting can be adjusted after subscription creation. Without it, the + * apply worker will simply skip conflict detection. + * + * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an + * ERROR since users can only modify retain_dead_tuples for disabled + * subscriptions. And as long as the subscription is enabled promptly, it will + * not pose issues. + */ +void +CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, + int elevel_for_sub_disabled) +{ + Assert(elevel_for_sub_disabled == NOTICE || + elevel_for_sub_disabled == WARNING); + + if (check_guc && wal_level < WAL_LEVEL_REPLICA) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), + errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); + + if (check_guc && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (sub_disabled) + ereport(elevel_for_sub_disabled, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + (elevel_for_sub_disabled > NOTICE) + ? errhint("Consider setting %s to false.", + "retain_dead_tuples") : 0); +} + +/* * Get the list of tables which belong to specified publications on the * publisher connection. * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ea96947d813..cb811520c29 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2711,8 +2711,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, RelationGetRelationName(relation)))); /* If existing rel is temp, it must belong to this session */ - if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !relation->rd_islocaltemp) + if (RELATION_IS_OTHER_TEMP(relation)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg(!is_partition @@ -7374,7 +7373,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, /* make sure datatype is legal for a column */ CheckAttributeType(NameStr(attribute->attname), attribute->atttypid, attribute->attcollation, list_make1_oid(rel->rd_rel->reltype), - 0); + (attribute->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL ? CHKATYPE_IS_VIRTUAL : 0)); InsertPgAttributeTuples(attrdesc, tupdesc, myrelid, NULL, NULL); @@ -8609,7 +8608,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName, rel->rd_att->constr && rel->rd_att->constr->num_check > 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns on tables with check constraints"), + errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables with check constraints"), errdetail("Column \"%s\" of relation \"%s\" is a virtual generated column.", colName, RelationGetRelationName(rel)))); @@ -8627,7 +8626,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName, GetRelationPublications(RelationGetRelid(rel)) != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns on tables that are part of a publication"), + errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables that are part of a publication"), errdetail("Column \"%s\" of relation \"%s\" is a virtual generated column.", colName, RelationGetRelationName(rel)))); @@ -10189,7 +10188,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (pk_has_without_overlaps && !with_period) ereport(ERROR, errcode(ERRCODE_INVALID_FOREIGN_KEY), - errmsg("foreign key must use PERIOD when referencing a primary using WITHOUT OVERLAPS")); + errmsg("foreign key must use PERIOD when referencing a primary key using WITHOUT OVERLAPS")); /* * Now we can check permissions. @@ -10330,8 +10329,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, for_overlaps ? errmsg("could not identify an overlaps operator for foreign key") : errmsg("could not identify an equality operator for foreign key"), - errdetail("Could not translate compare type %d for operator family \"%s\", input type %s, access method \"%s\".", - cmptype, get_opfamily_name(opfamily, false), format_type_be(opcintype), get_am_name(amid))); + errdetail("Could not translate compare type %d for operator family \"%s\" of access method \"%s\".", + cmptype, get_opfamily_name(opfamily, false), get_am_name(amid))); /* * There had better be a primary equality operator for the index. @@ -12913,8 +12912,9 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, con->contype != CONSTRAINT_NOTNULL) ereport(ERROR, errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key, check, or not-null constraint", - constrName, RelationGetRelationName(rel))); + errmsg("cannot validate constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)), + errdetail("This operation is not supported for this type of constraint.")); if (!con->conenforced) ereport(ERROR, @@ -14426,7 +14426,7 @@ ATPrepAlterColumnType(List **wqueue, /* make sure datatype is legal for a column */ CheckAttributeType(colName, targettype, targetcollid, list_make1_oid(rel->rd_rel->reltype), - 0); + (attTup->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL ? CHKATYPE_IS_VIRTUAL : 0)); if (attTup->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL) { @@ -14484,6 +14484,9 @@ ATPrepAlterColumnType(List **wqueue, /* Fix collations after all else */ assign_expr_collations(pstate, transform); + /* Expand virtual generated columns in the expr. */ + transform = expand_generated_columns_in_expr(transform, rel, 1); + /* Plan the expr now so we can accurately assess the need to rewrite. */ transform = (Node *) expression_planner((Expr *) transform); @@ -15411,9 +15414,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) /* * Re-parse the index and constraint definitions, and attach them to the * appropriate work queue entries. We do this before dropping because in - * the case of a FOREIGN KEY constraint, we might not yet have exclusive - * lock on the table the constraint is attached to, and we need to get - * that before reparsing/dropping. + * the case of a constraint on another table, we might not yet have + * exclusive lock on the table the constraint is attached to, and we need + * to get that before reparsing/dropping. (That's possible at least for + * FOREIGN KEY, CHECK, and EXCLUSION constraints; in non-FK cases it + * requires a dependency on the target table's composite type in the other + * table's constraint expressions.) * * We can't rely on the output of deparsing to tell us which relation to * operate on, because concurrent activity might have made the name @@ -15429,7 +15435,6 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) Form_pg_constraint con; Oid relid; Oid confrelid; - char contype; bool conislocal; tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(oldId)); @@ -15446,7 +15451,6 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) elog(ERROR, "could not identify relation associated with constraint %u", oldId); } confrelid = con->confrelid; - contype = con->contype; conislocal = con->conislocal; ReleaseSysCache(tup); @@ -15464,12 +15468,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) continue; /* - * When rebuilding an FK constraint that references the table we're - * modifying, we might not yet have any lock on the FK's table, so get - * one now. We'll need AccessExclusiveLock for the DROP CONSTRAINT - * step, so there's no value in asking for anything weaker. + * When rebuilding another table's constraint that references the + * table we're modifying, we might not yet have any lock on the other + * table, so get one now. We'll need AccessExclusiveLock for the DROP + * CONSTRAINT step, so there's no value in asking for anything weaker. */ - if (relid != tab->relid && contype == CONSTRAINT_FOREIGN) + if (relid != tab->relid) LockRelationOid(relid, AccessExclusiveLock); ATPostAlterTypeParse(oldId, relid, confrelid, @@ -15483,6 +15487,14 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) Oid relid; relid = IndexGetRelation(oldId, false); + + /* + * As above, make sure we have lock on the index's table if it's not + * the same table. + */ + if (relid != tab->relid) + LockRelationOid(relid, AccessExclusiveLock); + ATPostAlterTypeParse(oldId, relid, InvalidOid, (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); @@ -15499,6 +15511,20 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) Oid relid; relid = StatisticsGetRelation(oldId, false); + + /* + * As above, make sure we have lock on the statistics object's table + * if it's not the same table. However, we take + * ShareUpdateExclusiveLock here, aligning with the lock level used in + * CreateStatistics and RemoveStatisticsById. + * + * CAUTION: this should be done after all cases that grab + * AccessExclusiveLock, else we risk causing deadlock due to needing + * to promote our table lock. + */ + if (relid != tab->relid) + LockRelationOid(relid, ShareUpdateExclusiveLock); + ATPostAlterTypeParse(oldId, relid, InvalidOid, (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); @@ -15722,7 +15748,7 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, { AlterDomainStmt *stmt = (AlterDomainStmt *) stm; - if (stmt->subtype == 'C') /* ADD CONSTRAINT */ + if (stmt->subtype == AD_AddConstraint) { Constraint *con = castNode(Constraint, stmt->def); AlterTableCmd *cmd = makeNode(AlterTableCmd); @@ -17225,15 +17251,13 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode) RelationGetRelationName(parent_rel)))); /* If parent rel is temp, it must belong to this session */ - if (parent_rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !parent_rel->rd_islocaltemp) + if (RELATION_IS_OTHER_TEMP(parent_rel)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot inherit from temporary relation of another session"))); /* Ditto for the child */ - if (child_rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !child_rel->rd_islocaltemp) + if (RELATION_IS_OTHER_TEMP(child_rel)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot inherit to temporary relation of another session"))); @@ -20304,15 +20328,13 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, RelationGetRelationName(rel)))); /* If the parent is temp, it must belong to this session */ - if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !rel->rd_islocaltemp) + if (RELATION_IS_OTHER_TEMP(rel)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot attach as partition of temporary relation of another session"))); /* Ditto for the partition */ - if (attachrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !attachrel->rd_islocaltemp) + if (RELATION_IS_OTHER_TEMP(attachrel)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot attach temporary relation of another session as partition"))); diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c index a9005cc7212..df31eace47a 100644 --- a/src/backend/commands/tablespace.c +++ b/src/backend/commands/tablespace.c @@ -500,7 +500,7 @@ DropTableSpace(DropTableSpaceStmt *stmt) * mustn't delete. So instead, we force a checkpoint which will clean * out any lingering files, and try again. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT); /* * On Windows, an unlinked file persists in the directory listing diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 67f8e70f9c1..7dc121f73f1 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -80,6 +80,7 @@ static bool GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, + bool do_epq_recheck, TupleTableSlot **epqslot, TM_Result *tmresultp, TM_FailureData *tmfdp); @@ -2693,7 +2694,8 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, - TM_FailureData *tmfd) + TM_FailureData *tmfd, + bool is_merge_delete) { TupleTableSlot *slot = ExecGetTriggerOldSlot(estate, relinfo); TriggerDesc *trigdesc = relinfo->ri_TrigDesc; @@ -2708,9 +2710,17 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, { TupleTableSlot *epqslot_candidate = NULL; + /* + * Get a copy of the on-disk tuple we are planning to delete. In + * general, if the tuple has been concurrently updated, we should + * recheck it using EPQ. However, if this is a MERGE DELETE action, + * we skip this EPQ recheck and leave it to the caller (it must do + * additional rechecking, and might end up executing a different + * action entirely). + */ if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - LockTupleExclusive, slot, &epqslot_candidate, - tmresult, tmfd)) + LockTupleExclusive, slot, !is_merge_delete, + &epqslot_candidate, tmresult, tmfd)) return false; /* @@ -2800,6 +2810,7 @@ ExecARDeleteTriggers(EState *estate, tupleid, LockTupleExclusive, slot, + false, NULL, NULL, NULL); @@ -2944,7 +2955,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, - TM_FailureData *tmfd) + TM_FailureData *tmfd, + bool is_merge_update) { TriggerDesc *trigdesc = relinfo->ri_TrigDesc; TupleTableSlot *oldslot = ExecGetTriggerOldSlot(estate, relinfo); @@ -2965,10 +2977,17 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, { TupleTableSlot *epqslot_candidate = NULL; - /* get a copy of the on-disk tuple we are planning to update */ + /* + * Get a copy of the on-disk tuple we are planning to update. In + * general, if the tuple has been concurrently updated, we should + * recheck it using EPQ. However, if this is a MERGE UPDATE action, + * we skip this EPQ recheck and leave it to the caller (it must do + * additional rechecking, and might end up executing a different + * action entirely). + */ if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - lockmode, oldslot, &epqslot_candidate, - tmresult, tmfd)) + lockmode, oldslot, !is_merge_update, + &epqslot_candidate, tmresult, tmfd)) return false; /* cancel the update action */ /* @@ -3142,6 +3161,7 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, tupleid, LockTupleExclusive, oldslot, + false, NULL, NULL, NULL); @@ -3298,6 +3318,7 @@ GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, + bool do_epq_recheck, TupleTableSlot **epqslot, TM_Result *tmresultp, TM_FailureData *tmfdp) @@ -3357,29 +3378,30 @@ GetTupleForTrigger(EState *estate, if (tmfd.traversed) { /* - * Recheck the tuple using EPQ. For MERGE, we leave this - * to the caller (it must do additional rechecking, and - * might end up executing a different action entirely). + * Recheck the tuple using EPQ, if requested. Otherwise, + * just return that it was concurrently updated. */ - if (estate->es_plannedstmt->commandType == CMD_MERGE) + if (do_epq_recheck) { - if (tmresultp) - *tmresultp = TM_Updated; - return false; + *epqslot = EvalPlanQual(epqstate, + relation, + relinfo->ri_RangeTableIndex, + oldslot); + + /* + * If PlanQual failed for updated tuple - we must not + * process this tuple! + */ + if (TupIsNull(*epqslot)) + { + *epqslot = NULL; + return false; + } } - - *epqslot = EvalPlanQual(epqstate, - relation, - relinfo->ri_RangeTableIndex, - oldslot); - - /* - * If PlanQual failed for updated tuple - we must not - * process this tuple! - */ - if (TupIsNull(*epqslot)) + else { - *epqslot = NULL; + if (tmresultp) + *tmresultp = TM_Updated; return false; } } diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 45ae7472ab5..26d985193ae 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -939,11 +939,19 @@ DefineDomain(ParseState *pstate, CreateDomainStmt *stmt) break; case CONSTR_NOTNULL: - if (nullDefined && !typNotNull) + if (nullDefined) + { + if (!typNotNull) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting NULL/NOT NULL constraints"), + parser_errposition(pstate, constr->location)); + ereport(ERROR, - errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting NULL/NOT NULL constraints"), + errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("redundant NOT NULL constraint definition"), parser_errposition(pstate, constr->location)); + } if (constr->is_no_inherit) ereport(ERROR, errcode(ERRCODE_INVALID_OBJECT_DEFINITION), diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 33a33bf6b1c..733ef40ae7c 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -56,6 +56,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/guc_hooks.h" +#include "utils/injection_point.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -123,7 +124,7 @@ static void vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); -static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, +static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params, BufferAccessStrategy bstrategy); static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); @@ -464,7 +465,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) } /* Now go through the common routine */ - vacuum(vacstmt->rels, ¶ms, bstrategy, vac_context, isTopLevel); + vacuum(vacstmt->rels, params, bstrategy, vac_context, isTopLevel); /* Finally, clean up the vacuum memory context */ MemoryContextDelete(vac_context); @@ -493,7 +494,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) * memory context that will not disappear at transaction commit. */ void -vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, +vacuum(List *relations, const VacuumParams params, BufferAccessStrategy bstrategy, MemoryContext vac_context, bool isTopLevel) { static bool in_vacuum = false; @@ -502,9 +503,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, volatile bool in_outer_xact, use_own_xacts; - Assert(params != NULL); - - stmttype = (params->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; + stmttype = (params.options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; /* * We cannot run VACUUM inside a user transaction block; if we were inside @@ -514,7 +513,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, * * ANALYZE (without VACUUM) can run either way. */ - if (params->options & VACOPT_VACUUM) + if (params.options & VACOPT_VACUUM) { PreventInTransactionBlock(isTopLevel, stmttype); in_outer_xact = false; @@ -537,7 +536,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, * Build list of relation(s) to process, putting any new data in * vac_context for safekeeping. */ - if (params->options & VACOPT_ONLY_DATABASE_STATS) + if (params.options & VACOPT_ONLY_DATABASE_STATS) { /* We don't process any tables in this case */ Assert(relations == NIL); @@ -553,7 +552,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, List *sublist; MemoryContext old_context; - sublist = expand_vacuum_rel(vrel, vac_context, params->options); + sublist = expand_vacuum_rel(vrel, vac_context, params.options); old_context = MemoryContextSwitchTo(vac_context); newrels = list_concat(newrels, sublist); MemoryContextSwitchTo(old_context); @@ -561,7 +560,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, relations = newrels; } else - relations = get_all_vacuum_rels(vac_context, params->options); + relations = get_all_vacuum_rels(vac_context, params.options); /* * Decide whether we need to start/commit our own transactions. @@ -577,11 +576,11 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, * transaction block, and also in an autovacuum worker, use own * transactions so we can release locks sooner. */ - if (params->options & VACOPT_VACUUM) + if (params.options & VACOPT_VACUUM) use_own_xacts = true; else { - Assert(params->options & VACOPT_ANALYZE); + Assert(params.options & VACOPT_ANALYZE); if (AmAutoVacuumWorkerProcess()) use_own_xacts = true; else if (in_outer_xact) @@ -632,13 +631,13 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, { VacuumRelation *vrel = lfirst_node(VacuumRelation, cur); - if (params->options & VACOPT_VACUUM) + if (params.options & VACOPT_VACUUM) { if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy)) continue; } - if (params->options & VACOPT_ANALYZE) + if (params.options & VACOPT_ANALYZE) { /* * If using separate xacts, start one for analyze. Otherwise, @@ -702,8 +701,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, StartTransactionCommand(); } - if ((params->options & VACOPT_VACUUM) && - !(params->options & VACOPT_SKIP_DATABASE_STATS)) + if ((params.options & VACOPT_VACUUM) && + !(params.options & VACOPT_SKIP_DATABASE_STATS)) { /* * Update pg_database.datfrozenxid, and truncate pg_xact if possible. @@ -1101,7 +1100,7 @@ get_all_vacuum_rels(MemoryContext vac_context, int options) * minimum). */ bool -vacuum_get_cutoffs(Relation rel, const VacuumParams *params, +vacuum_get_cutoffs(Relation rel, const VacuumParams params, struct VacuumCutoffs *cutoffs) { int freeze_min_age, @@ -1117,10 +1116,10 @@ vacuum_get_cutoffs(Relation rel, const VacuumParams *params, aggressiveMXIDCutoff; /* Use mutable copies of freeze age parameters */ - freeze_min_age = params->freeze_min_age; - multixact_freeze_min_age = params->multixact_freeze_min_age; - freeze_table_age = params->freeze_table_age; - multixact_freeze_table_age = params->multixact_freeze_table_age; + freeze_min_age = params.freeze_min_age; + multixact_freeze_min_age = params.multixact_freeze_min_age; + freeze_table_age = params.freeze_table_age; + multixact_freeze_table_age = params.multixact_freeze_table_age; /* Set pg_class fields in cutoffs */ cutoffs->relfrozenxid = rel->rd_rel->relfrozenxid; @@ -1997,7 +1996,7 @@ vac_truncate_clog(TransactionId frozenXID, * At entry and exit, we are not inside a transaction. */ static bool -vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, +vacuum_rel(Oid relid, RangeVar *relation, VacuumParams params, BufferAccessStrategy bstrategy) { LOCKMODE lmode; @@ -2008,13 +2007,18 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, Oid save_userid; int save_sec_context; int save_nestlevel; + VacuumParams toast_vacuum_params; - Assert(params != NULL); + /* + * This function scribbles on the parameters, so make a copy early to + * avoid affecting the TOAST table (if we do end up recursing to it). + */ + memcpy(&toast_vacuum_params, ¶ms, sizeof(VacuumParams)); /* Begin a transaction for vacuuming this relation */ StartTransactionCommand(); - if (!(params->options & VACOPT_FULL)) + if (!(params.options & VACOPT_FULL)) { /* * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets @@ -2040,7 +2044,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); MyProc->statusFlags |= PROC_IN_VACUUM; - if (params->is_wraparound) + if (params.is_wraparound) MyProc->statusFlags |= PROC_VACUUM_FOR_WRAPAROUND; ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; LWLockRelease(ProcArrayLock); @@ -2064,12 +2068,12 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either * way, we can be sure that no other backend is vacuuming the same table. */ - lmode = (params->options & VACOPT_FULL) ? + lmode = (params.options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; /* open the relation and get the appropriate lock on it */ - rel = vacuum_open_relation(relid, relation, params->options, - params->log_min_duration >= 0, lmode); + rel = vacuum_open_relation(relid, relation, params.options, + params.log_min_duration >= 0, lmode); /* leave if relation could not be opened or locked */ if (!rel) @@ -2084,8 +2088,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, * This is only safe to do because we hold a session lock on the main * relation that prevents concurrent deletion. */ - if (OidIsValid(params->toast_parent)) - priv_relid = params->toast_parent; + if (OidIsValid(params.toast_parent)) + priv_relid = params.toast_parent; else priv_relid = RelationGetRelid(rel); @@ -2098,7 +2102,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, */ if (!vacuum_is_permitted_for_relation(priv_relid, rel->rd_rel, - params->options & ~VACOPT_ANALYZE)) + params.options & ~VACOPT_ANALYZE)) { relation_close(rel, lmode); PopActiveSnapshot(); @@ -2169,7 +2173,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, * Set index_cleanup option based on index_cleanup reloption if it wasn't * specified in VACUUM command, or when running in an autovacuum worker */ - if (params->index_cleanup == VACOPTVALUE_UNSPECIFIED) + if (params.index_cleanup == VACOPTVALUE_UNSPECIFIED) { StdRdOptIndexCleanup vacuum_index_cleanup; @@ -2180,56 +2184,74 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, ((StdRdOptions *) rel->rd_options)->vacuum_index_cleanup; if (vacuum_index_cleanup == STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO) - params->index_cleanup = VACOPTVALUE_AUTO; + params.index_cleanup = VACOPTVALUE_AUTO; else if (vacuum_index_cleanup == STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON) - params->index_cleanup = VACOPTVALUE_ENABLED; + params.index_cleanup = VACOPTVALUE_ENABLED; else { Assert(vacuum_index_cleanup == STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF); - params->index_cleanup = VACOPTVALUE_DISABLED; + params.index_cleanup = VACOPTVALUE_DISABLED; } } +#ifdef USE_INJECTION_POINTS + if (params.index_cleanup == VACOPTVALUE_AUTO) + INJECTION_POINT("vacuum-index-cleanup-auto", NULL); + else if (params.index_cleanup == VACOPTVALUE_DISABLED) + INJECTION_POINT("vacuum-index-cleanup-disabled", NULL); + else if (params.index_cleanup == VACOPTVALUE_ENABLED) + INJECTION_POINT("vacuum-index-cleanup-enabled", NULL); +#endif + /* * Check if the vacuum_max_eager_freeze_failure_rate table storage * parameter was specified. This overrides the GUC value. */ if (rel->rd_options != NULL && ((StdRdOptions *) rel->rd_options)->vacuum_max_eager_freeze_failure_rate >= 0) - params->max_eager_freeze_failure_rate = + params.max_eager_freeze_failure_rate = ((StdRdOptions *) rel->rd_options)->vacuum_max_eager_freeze_failure_rate; /* * Set truncate option based on truncate reloption or GUC if it wasn't * specified in VACUUM command, or when running in an autovacuum worker */ - if (params->truncate == VACOPTVALUE_UNSPECIFIED) + if (params.truncate == VACOPTVALUE_UNSPECIFIED) { StdRdOptions *opts = (StdRdOptions *) rel->rd_options; if (opts && opts->vacuum_truncate_set) { if (opts->vacuum_truncate) - params->truncate = VACOPTVALUE_ENABLED; + params.truncate = VACOPTVALUE_ENABLED; else - params->truncate = VACOPTVALUE_DISABLED; + params.truncate = VACOPTVALUE_DISABLED; } else if (vacuum_truncate) - params->truncate = VACOPTVALUE_ENABLED; + params.truncate = VACOPTVALUE_ENABLED; else - params->truncate = VACOPTVALUE_DISABLED; + params.truncate = VACOPTVALUE_DISABLED; } +#ifdef USE_INJECTION_POINTS + if (params.truncate == VACOPTVALUE_AUTO) + INJECTION_POINT("vacuum-truncate-auto", NULL); + else if (params.truncate == VACOPTVALUE_DISABLED) + INJECTION_POINT("vacuum-truncate-disabled", NULL); + else if (params.truncate == VACOPTVALUE_ENABLED) + INJECTION_POINT("vacuum-truncate-enabled", NULL); +#endif + /* * Remember the relation's TOAST relation for later, if the caller asked * us to process it. In VACUUM FULL, though, the toast table is * automatically rebuilt by cluster_rel so we shouldn't recurse to it, * unless PROCESS_MAIN is disabled. */ - if ((params->options & VACOPT_PROCESS_TOAST) != 0 && - ((params->options & VACOPT_FULL) == 0 || - (params->options & VACOPT_PROCESS_MAIN) == 0)) + if ((params.options & VACOPT_PROCESS_TOAST) != 0 && + ((params.options & VACOPT_FULL) == 0 || + (params.options & VACOPT_PROCESS_MAIN) == 0)) toast_relid = rel->rd_rel->reltoastrelid; else toast_relid = InvalidOid; @@ -2252,16 +2274,16 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, * table is required (e.g., PROCESS_TOAST is set), we force PROCESS_MAIN * to be set when we recurse to the TOAST table. */ - if (params->options & VACOPT_PROCESS_MAIN) + if (params.options & VACOPT_PROCESS_MAIN) { /* * Do the actual work --- either FULL or "lazy" vacuum */ - if (params->options & VACOPT_FULL) + if (params.options & VACOPT_FULL) { ClusterParams cluster_params = {0}; - if ((params->options & VACOPT_VERBOSE) != 0) + if ((params.options & VACOPT_VERBOSE) != 0) cluster_params.options |= CLUOPT_VERBOSE; /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ @@ -2299,19 +2321,16 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, */ if (toast_relid != InvalidOid) { - VacuumParams toast_vacuum_params; - /* * Force VACOPT_PROCESS_MAIN so vacuum_rel() processes it. Likewise, * set toast_parent so that the privilege checks are done on the main * relation. NB: This is only safe to do because we hold a session * lock on the main relation that prevents concurrent deletion. */ - memcpy(&toast_vacuum_params, params, sizeof(VacuumParams)); toast_vacuum_params.options |= VACOPT_PROCESS_MAIN; toast_vacuum_params.toast_parent = relid; - vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy); + vacuum_rel(toast_relid, NULL, toast_vacuum_params, bstrategy); } /* |