aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/sequence.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/sequence.c')
-rw-r--r--src/backend/commands/sequence.c165
1 files changed, 95 insertions, 70 deletions
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 34b74f6c384..fbb6489915e 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -38,7 +38,7 @@
/*
* We don't want to log each fetching of a value from a sequence,
* so we pre-log a few fetches in advance. In the event of
- * crash we can lose as much as we pre-logged.
+ * crash we can lose (skip over) as many values as we pre-logged.
*/
#define SEQ_LOG_VALS 32
@@ -73,7 +73,7 @@ typedef struct SeqTableData
int64 cached; /* last value already cached for nextval */
/* if last != cached, we have not used up all the cached values */
int64 increment; /* copy of sequence's increment field */
- /* note that increment is zero until we first do read_info() */
+ /* note that increment is zero until we first do read_seq_tuple() */
} SeqTableData;
typedef SeqTableData *SeqTable;
@@ -90,7 +90,8 @@ static void fill_seq_with_data(Relation rel, HeapTuple tuple);
static int64 nextval_internal(Oid relid);
static Relation open_share_lock(SeqTable seq);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
-static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);
+static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
+ Buffer *buf, HeapTuple seqtuple);
static void init_params(List *options, bool isInit,
Form_pg_sequence new, List **owned_by);
static void do_setval(Oid relid, int64 next, bool iscalled);
@@ -187,7 +188,7 @@ DefineSequence(CreateSeqStmt *seq)
case SEQ_COL_LOG:
coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
coldef->colname = "log_cnt";
- value[i - 1] = Int64GetDatum((int64) 1);
+ value[i - 1] = Int64GetDatum((int64) 0);
break;
case SEQ_COL_CYCLE:
coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
@@ -247,10 +248,8 @@ ResetSequence(Oid seq_relid)
SeqTable elm;
Form_pg_sequence seq;
Buffer buf;
- Page page;
+ HeapTupleData seqtuple;
HeapTuple tuple;
- HeapTupleData tupledata;
- ItemId lp;
/*
* Read the old sequence. This does a bit more work than really
@@ -258,18 +257,12 @@ ResetSequence(Oid seq_relid)
* indeed a sequence.
*/
init_sequence(seq_relid, &elm, &seq_rel);
- read_info(elm, seq_rel, &buf);
+ (void) read_seq_tuple(elm, seq_rel, &buf, &seqtuple);
/*
* Copy the existing sequence tuple.
*/
- page = BufferGetPage(buf);
- lp = PageGetItemId(page, FirstOffsetNumber);
- Assert(ItemIdIsNormal(lp));
-
- tupledata.t_data = (HeapTupleHeader) PageGetItem(page, lp);
- tupledata.t_len = ItemIdGetLength(lp);
- tuple = heap_copytuple(&tupledata);
+ tuple = heap_copytuple(&seqtuple);
/* Now we're done with the old page */
UnlockReleaseBuffer(buf);
@@ -281,7 +274,7 @@ ResetSequence(Oid seq_relid)
seq = (Form_pg_sequence) GETSTRUCT(tuple);
seq->last_value = seq->start_value;
seq->is_called = false;
- seq->log_cnt = 1;
+ seq->log_cnt = 0;
/*
* Create a new storage file for the sequence. We want to keep the
@@ -378,12 +371,6 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
- Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
-
- /* We do not log first nextval call, so "advance" sequence here */
- /* Note we are scribbling on local tuple, not the disk buffer */
- newseq->is_called = true;
- newseq->log_cnt = 0;
xlrec.node = rel->rd_node;
rdata[0].data = (char *) &xlrec;
@@ -419,7 +406,7 @@ AlterSequence(AlterSeqStmt *stmt)
SeqTable elm;
Relation seqrel;
Buffer buf;
- Page page;
+ HeapTupleData seqtuple;
Form_pg_sequence seq;
FormData_pg_sequence new;
List *owned_by;
@@ -442,8 +429,7 @@ AlterSequence(AlterSeqStmt *stmt)
stmt->sequence->relname);
/* lock page' buffer and read tuple into new sequence structure */
- seq = read_info(elm, seqrel, &buf);
- page = BufferGetPage(buf);
+ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
/* Copy old values of options into workspace */
memcpy(&new, seq, sizeof(FormData_pg_sequence));
@@ -456,10 +442,10 @@ AlterSequence(AlterSeqStmt *stmt)
elm->cached = elm->last;
/* Now okay to update the on-disk tuple */
- memcpy(seq, &new, sizeof(FormData_pg_sequence));
-
START_CRIT_SECTION();
+ memcpy(seq, &new, sizeof(FormData_pg_sequence));
+
MarkBufferDirty(buf);
/* XLOG stuff */
@@ -468,6 +454,7 @@ AlterSequence(AlterSeqStmt *stmt)
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
+ Page page = BufferGetPage(buf);
xlrec.node = seqrel->rd_node;
rdata[0].data = (char *) &xlrec;
@@ -475,9 +462,8 @@ AlterSequence(AlterSeqStmt *stmt)
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
- rdata[1].len = ((PageHeader) page)->pd_special -
- ((PageHeader) page)->pd_upper;
+ rdata[1].data = (char *) seqtuple.t_data;
+ rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
@@ -541,6 +527,7 @@ nextval_internal(Oid relid)
Relation seqrel;
Buffer buf;
Page page;
+ HeapTupleData seqtuple;
Form_pg_sequence seq;
int64 incby,
maxv,
@@ -579,7 +566,7 @@ nextval_internal(Oid relid)
}
/* lock page' buffer and read tuple */
- seq = read_info(elm, seqrel, &buf);
+ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
page = BufferGetPage(buf);
last = next = result = seq->last_value;
@@ -591,9 +578,8 @@ nextval_internal(Oid relid)
if (!seq->is_called)
{
- rescnt++; /* last_value if not called */
+ rescnt++; /* return last_value if not is_called */
fetch--;
- log--;
}
/*
@@ -606,7 +592,7 @@ nextval_internal(Oid relid)
* checkpoint would fail to advance the sequence past the logged values.
* In this case we may as well fetch extra values.
*/
- if (log < fetch)
+ if (log < fetch || !seq->is_called)
{
/* forced log to satisfy local demand for values */
fetch = log = fetch + SEQ_LOG_VALS;
@@ -697,8 +683,18 @@ nextval_internal(Oid relid)
last_used_seq = elm;
+ /* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
+ /*
+ * We must mark the buffer dirty before doing XLogInsert(); see notes in
+ * SyncOneBuffer(). However, we don't apply the desired changes just yet.
+ * This looks like a violation of the buffer update protocol, but it is
+ * in fact safe because we hold exclusive lock on the buffer. Any other
+ * process, including a checkpoint, that tries to examine the buffer
+ * contents will block until we release the lock, and then will see the
+ * final state that we install below.
+ */
MarkBufferDirty(buf);
/* XLOG stuff */
@@ -708,20 +704,26 @@ nextval_internal(Oid relid)
XLogRecPtr recptr;
XLogRecData rdata[2];
- xlrec.node = seqrel->rd_node;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(xl_seq_rec);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ /*
+ * We don't log the current state of the tuple, but rather the state
+ * as it would appear after "log" more fetches. This lets us skip
+ * that many future WAL records, at the cost that we lose those
+ * sequence values if we crash.
+ */
/* set values that will be saved in xlog */
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
- rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
- rdata[1].len = ((PageHeader) page)->pd_special -
- ((PageHeader) page)->pd_upper;
+ xlrec.node = seqrel->rd_node;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(xl_seq_rec);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = (char *) seqtuple.t_data;
+ rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
@@ -731,7 +733,7 @@ nextval_internal(Oid relid)
PageSetTLI(page, ThisTimeLineID);
}
- /* update on-disk data */
+ /* Now update sequence tuple to the intended final state */
seq->last_value = last; /* last fetched number */
seq->is_called = true;
seq->log_cnt = log; /* how much is logged */
@@ -830,6 +832,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
SeqTable elm;
Relation seqrel;
Buffer buf;
+ HeapTupleData seqtuple;
Form_pg_sequence seq;
/* open and AccessShareLock sequence */
@@ -846,7 +849,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
PreventCommandIfReadOnly("setval()");
/* lock page' buffer and read tuple */
- seq = read_info(elm, seqrel, &buf);
+ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
if ((next < seq->min_value) || (next > seq->max_value))
{
@@ -874,8 +877,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
/* In any case, forget any future cached numbers */
elm->cached = elm->last;
+ /* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
+ seq->last_value = next; /* last fetched number */
+ seq->is_called = iscalled;
+ seq->log_cnt = 0;
+
MarkBufferDirty(buf);
/* XLOG stuff */
@@ -892,14 +900,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
- /* set values that will be saved in xlog */
- seq->last_value = next;
- seq->is_called = true;
- seq->log_cnt = 0;
-
- rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
- rdata[1].len = ((PageHeader) page)->pd_special -
- ((PageHeader) page)->pd_upper;
+ rdata[1].data = (char *) seqtuple.t_data;
+ rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
@@ -909,11 +911,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
PageSetTLI(page, ThisTimeLineID);
}
- /* save info in sequence relation */
- seq->last_value = next; /* last fetched number */
- seq->is_called = iscalled;
- seq->log_cnt = (iscalled) ? 0 : 1;
-
END_CRIT_SECTION();
UnlockReleaseBuffer(buf);
@@ -1066,13 +1063,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
}
-/* Given an opened relation, lock the page buffer and find the tuple */
+/*
+ * Given an opened sequence relation, lock the page buffer and find the tuple
+ *
+ * *buf receives the reference to the pinned-and-ex-locked buffer
+ * *seqtuple receives the reference to the sequence tuple proper
+ * (this arg should point to a local variable of type HeapTupleData)
+ *
+ * Function's return value points to the data payload of the tuple
+ */
static Form_pg_sequence
-read_info(SeqTable elm, Relation rel, Buffer *buf)
+read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
{
Page page;
ItemId lp;
- HeapTupleData tuple;
sequence_magic *sm;
Form_pg_sequence seq;
@@ -1088,7 +1092,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsNormal(lp));
- tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /* Note we currently only bother to set these two fields of *seqtuple */
+ seqtuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
+ seqtuple->t_len = ItemIdGetLength(lp);
/*
* Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on
@@ -1098,15 +1105,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
* bit update, ie, don't bother to WAL-log it, since we can certainly do
* this again if the update gets lost.
*/
- if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId)
+ if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
{
- HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId);
- tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
- tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
+ seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
+ seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
SetBufferCommitInfoNeedsSave(*buf);
}
- seq = (Form_pg_sequence) GETSTRUCT(&tuple);
+ seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
/* this is a handy place to update our copy of the increment */
elm->increment = seq->increment_by;
@@ -1210,6 +1217,13 @@ init_params(List *options, bool isInit,
defel->defname);
}
+ /*
+ * We must reset log_cnt when isInit or when changing any parameters
+ * that would affect future nextval allocations.
+ */
+ if (isInit)
+ new->log_cnt = 0;
+
/* INCREMENT BY */
if (increment_by != NULL)
{
@@ -1218,6 +1232,7 @@ init_params(List *options, bool isInit,
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("INCREMENT must not be zero")));
+ new->log_cnt = 0;
}
else if (isInit)
new->increment_by = 1;
@@ -1227,30 +1242,39 @@ init_params(List *options, bool isInit,
{
new->is_cycled = intVal(is_cycled->arg);
Assert(BoolIsValid(new->is_cycled));
+ new->log_cnt = 0;
}
else if (isInit)
new->is_cycled = false;
/* MAXVALUE (null arg means NO MAXVALUE) */
if (max_value != NULL && max_value->arg)
+ {
new->max_value = defGetInt64(max_value);
+ new->log_cnt = 0;
+ }
else if (isInit || max_value != NULL)
{
if (new->increment_by > 0)
new->max_value = SEQ_MAXVALUE; /* ascending seq */
else
new->max_value = -1; /* descending seq */
+ new->log_cnt = 0;
}
/* MINVALUE (null arg means NO MINVALUE) */
if (min_value != NULL && min_value->arg)
+ {
new->min_value = defGetInt64(min_value);
+ new->log_cnt = 0;
+ }
else if (isInit || min_value != NULL)
{
if (new->increment_by > 0)
new->min_value = 1; /* ascending seq */
else
new->min_value = SEQ_MINVALUE; /* descending seq */
+ new->log_cnt = 0;
}
/* crosscheck min/max */
@@ -1312,13 +1336,12 @@ init_params(List *options, bool isInit,
else
new->last_value = new->start_value;
new->is_called = false;
- new->log_cnt = 1;
+ new->log_cnt = 0;
}
else if (isInit)
{
new->last_value = new->start_value;
new->is_called = false;
- new->log_cnt = 1;
}
/* crosscheck RESTART (or current value, if changing MIN/MAX) */
@@ -1361,6 +1384,7 @@ init_params(List *options, bool isInit,
errmsg("CACHE (%s) must be greater than zero",
buf)));
}
+ new->log_cnt = 0;
}
else if (isInit)
new->cache_value = 1;
@@ -1473,6 +1497,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS)
SeqTable elm;
Relation seqrel;
Buffer buf;
+ HeapTupleData seqtuple;
Form_pg_sequence seq;
/* open and AccessShareLock sequence */
@@ -1500,7 +1525,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS)
memset(isnull, 0, sizeof(isnull));
- seq = read_info(elm, seqrel, &buf);
+ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
values[0] = Int64GetDatum(seq->start_value);
values[1] = Int64GetDatum(seq->min_value);
@@ -1555,7 +1580,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
item = (char *) xlrec + sizeof(xl_seq_rec);
itemsz = record->xl_len - sizeof(xl_seq_rec);
- itemsz = MAXALIGN(itemsz);
+
if (PageAddItem(localpage, (Item) item, itemsz,
FirstOffsetNumber, false, false) == InvalidOffsetNumber)
elog(PANIC, "seq_redo: failed to add item to page");