diff options
author | Andres Freund <andres@anarazel.de> | 2017-05-31 16:39:27 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2017-06-01 14:19:33 -0700 |
commit | 3d79013b970d4cc336c06eb77ed526b44308c03e (patch) | |
tree | 2565996996eb72029e8ba1658ee90c8b673fcfae /src/backend/commands/sequence.c | |
parent | e9a3c047a5fc701d2efb776be2b351645ea001c8 (diff) | |
download | postgresql-3d79013b970d4cc336c06eb77ed526b44308c03e.tar.gz postgresql-3d79013b970d4cc336c06eb77ed526b44308c03e.zip |
Make ALTER SEQUENCE, including RESTART, fully transactional.
Previously the changes to the "data" part of the sequence, i.e. the
one containing the current value, were not transactional, whereas the
definition, including minimum and maximum value were. That leads to
odd behaviour if a schema change is rolled back, with the potential
that out-of-bound sequence values can be returned.
To avoid the issue create a new relfilenode fork whenever ALTER
SEQUENCE is executed, similar to how TRUNCATE ... RESTART IDENTITY
already is already handled.
This commit also makes ALTER SEQUENCE RESTART transactional, as it
seems to be too confusing to have some forms of ALTER SEQUENCE behave
transactionally, some forms not. This way setval() and nextval() are
not transactional, but DDL is, which seems to make sense.
This commit also rolls back parts of the changes made in 3d092fe540
and f8dc1985f as they're now not needed anymore.
Author: Andres Freund
Discussion: https://postgr.es/m/20170522154227.nvafbsm62sjpbxvd@alap3.anarazel.de
Backpatch: Bug is in master/v10 only
Diffstat (limited to 'src/backend/commands/sequence.c')
-rw-r--r-- | src/backend/commands/sequence.c | 123 |
1 files changed, 32 insertions, 91 deletions
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 568b3022f2d..4a56f03e8b6 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -98,11 +98,9 @@ static void create_seq_hashtable(void); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); static Form_pg_sequence_data read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple); -static LOCKMODE alter_sequence_get_lock_level(List *options); static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, - bool *changed_seqform, Form_pg_sequence_data seqdataform, List **owned_by); static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -117,7 +115,6 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) { FormData_pg_sequence seqform; FormData_pg_sequence_data seqdataform; - bool changed_seqform = false; /* not used here */ List *owned_by; CreateStmt *stmt = makeNode(CreateStmt); Oid seqoid; @@ -156,7 +153,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) } /* Check and set all option values */ - init_params(pstate, seq->options, seq->for_identity, true, &seqform, &changed_seqform, &seqdataform, &owned_by); + init_params(pstate, seq->options, seq->for_identity, true, &seqform, &seqdataform, &owned_by); /* * Create relation (and fill value[] and null[] for the tuple) @@ -417,19 +414,18 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) SeqTable elm; Relation seqrel; Buffer buf; - HeapTupleData seqdatatuple; + HeapTupleData datatuple; Form_pg_sequence seqform; - Form_pg_sequence_data seqdata; - FormData_pg_sequence_data newseqdata; - bool changed_seqform = false; + Form_pg_sequence_data newdataform; List *owned_by; ObjectAddress address; Relation rel; - HeapTuple tuple; + HeapTuple seqtuple; + HeapTuple newdatatuple; /* Open and lock sequence. */ relid = RangeVarGetRelid(stmt->sequence, - alter_sequence_get_lock_level(stmt->options), + ShareRowExclusiveLock, stmt->missing_ok); if (relid == InvalidOid) { @@ -447,22 +443,26 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) stmt->sequence->relname); rel = heap_open(SequenceRelationId, RowExclusiveLock); - tuple = SearchSysCacheCopy1(SEQRELID, - ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tuple)) + seqtuple = SearchSysCacheCopy1(SEQRELID, + ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(seqtuple)) elog(ERROR, "cache lookup failed for sequence %u", relid); - seqform = (Form_pg_sequence) GETSTRUCT(tuple); + seqform = (Form_pg_sequence) GETSTRUCT(seqtuple); /* lock page's buffer and read tuple into new sequence structure */ - seqdata = read_seq_tuple(seqrel, &buf, &seqdatatuple); + (void) read_seq_tuple(seqrel, &buf, &datatuple); - /* Copy old sequence data into workspace */ - memcpy(&newseqdata, seqdata, sizeof(FormData_pg_sequence_data)); + /* copy the existing sequence data tuple, so it can be modified localy */ + newdatatuple = heap_copytuple(&datatuple); + newdataform = (Form_pg_sequence_data) GETSTRUCT(newdatatuple); + + UnlockReleaseBuffer(buf); /* Check and set new values */ - init_params(pstate, stmt->options, stmt->for_identity, false, seqform, &changed_seqform, &newseqdata, &owned_by); + init_params(pstate, stmt->options, stmt->for_identity, false, seqform, + newdataform, &owned_by); /* Clear local cache so that we don't think we have cached numbers */ /* Note that we do not change the currval() state */ @@ -472,36 +472,19 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) if (RelationNeedsWAL(seqrel)) GetTopTransactionId(); - /* Now okay to update the on-disk tuple */ - START_CRIT_SECTION(); - - memcpy(seqdata, &newseqdata, sizeof(FormData_pg_sequence_data)); - - MarkBufferDirty(buf); - - /* XLOG stuff */ - if (RelationNeedsWAL(seqrel)) - { - xl_seq_rec xlrec; - XLogRecPtr recptr; - Page page = BufferGetPage(buf); - - XLogBeginInsert(); - XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); - - xlrec.node = seqrel->rd_node; - XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); - - XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); - - recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); - - PageSetLSN(page, recptr); - } - - END_CRIT_SECTION(); + /* + * Create a new storage file for the sequence, making the state changes + * transactional. We want to keep the sequence's relfrozenxid at 0, since + * it won't contain any unfrozen XIDs. Same with relminmxid, since a + * sequence will never contain multixacts. + */ + RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence, + InvalidTransactionId, InvalidMultiXactId); - UnlockReleaseBuffer(buf); + /* + * Insert the modified tuple into the new storage file. + */ + fill_seq_with_data(seqrel, newdatatuple); /* process OWNED BY if given */ if (owned_by) @@ -511,10 +494,9 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) ObjectAddressSet(address, RelationRelationId, relid); - if (changed_seqform) - CatalogTupleUpdate(rel, &tuple->t_self, tuple); - heap_close(rel, RowExclusiveLock); + CatalogTupleUpdate(rel, &seqtuple->t_self, seqtuple); + heap_close(rel, RowExclusiveLock); relation_close(seqrel, NoLock); return address; @@ -1220,30 +1202,6 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) } /* - * Check the sequence options list and return the appropriate lock level for - * ALTER SEQUENCE. - * - * Most sequence option changes require a self-exclusive lock and should block - * concurrent nextval() et al. But RESTART does not, because it's not - * transactional. Also take a lower lock if no option at all is present. - */ -static LOCKMODE -alter_sequence_get_lock_level(List *options) -{ - ListCell *option; - - foreach(option, options) - { - DefElem *defel = (DefElem *) lfirst(option); - - if (strcmp(defel->defname, "restart") != 0) - return ShareRowExclusiveLock; - } - - return RowExclusiveLock; -} - -/* * init_params: process the options list of CREATE or ALTER SEQUENCE, and * store the values into appropriate fields of seqform, for changes that go * into the pg_sequence catalog, and seqdataform for changes to the sequence @@ -1258,7 +1216,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, - bool *changed_seqform, Form_pg_sequence_data seqdataform, List **owned_by) { @@ -1378,8 +1335,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, defel->defname); } - *changed_seqform = false; - /* * We must reset log_cnt when isInit or when changing any parameters that * would affect future nextval allocations. @@ -1420,19 +1375,16 @@ init_params(ParseState *pstate, List *options, bool for_identity, } seqform->seqtypid = newtypid; - *changed_seqform = true; } else if (isInit) { seqform->seqtypid = INT8OID; - *changed_seqform = true; } /* INCREMENT BY */ if (increment_by != NULL) { seqform->seqincrement = defGetInt64(increment_by); - *changed_seqform = true; if (seqform->seqincrement == 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1442,28 +1394,24 @@ init_params(ParseState *pstate, List *options, bool for_identity, else if (isInit) { seqform->seqincrement = 1; - *changed_seqform = true; } /* CYCLE */ if (is_cycled != NULL) { seqform->seqcycle = intVal(is_cycled->arg); - *changed_seqform = true; Assert(BoolIsValid(seqform->seqcycle)); seqdataform->log_cnt = 0; } else if (isInit) { seqform->seqcycle = false; - *changed_seqform = true; } /* MAXVALUE (null arg means NO MAXVALUE) */ if (max_value != NULL && max_value->arg) { seqform->seqmax = defGetInt64(max_value); - *changed_seqform = true; seqdataform->log_cnt = 0; } else if (isInit || max_value != NULL || reset_max_value) @@ -1480,7 +1428,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, } else seqform->seqmax = -1; /* descending seq */ - *changed_seqform = true; seqdataform->log_cnt = 0; } @@ -1502,7 +1449,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (min_value != NULL && min_value->arg) { seqform->seqmin = defGetInt64(min_value); - *changed_seqform = true; seqdataform->log_cnt = 0; } else if (isInit || min_value != NULL || reset_min_value) @@ -1519,7 +1465,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, } else seqform->seqmin = 1; /* ascending seq */ - *changed_seqform = true; seqdataform->log_cnt = 0; } @@ -1555,7 +1500,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (start_value != NULL) { seqform->seqstart = defGetInt64(start_value); - *changed_seqform = true; } else if (isInit) { @@ -1563,7 +1507,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, seqform->seqstart = seqform->seqmin; /* ascending seq */ else seqform->seqstart = seqform->seqmax; /* descending seq */ - *changed_seqform = true; } /* crosscheck START */ @@ -1638,7 +1581,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (cache_value != NULL) { seqform->seqcache = defGetInt64(cache_value); - *changed_seqform = true; if (seqform->seqcache <= 0) { char buf[100]; @@ -1654,7 +1596,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, else if (isInit) { seqform->seqcache = 1; - *changed_seqform = true; } } |