diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/xact.c | 99 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 255 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 33 | ||||
-rw-r--r-- | src/backend/storage/smgr/md.c | 5 | ||||
-rw-r--r-- | src/backend/storage/smgr/smgr.c | 157 |
5 files changed, 511 insertions, 38 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5aeb70f2986..49283ed81ee 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.163 2004/02/10 03:42:43 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.164 2004/02/11 22:55:24 tgl Exp $ * * NOTES * Transaction aborts can now occur two ways: @@ -519,19 +519,32 @@ RecordTransactionCommit(void) if (MyLastRecPtr.xrecoff != 0) { /* Need to emit a commit record */ - XLogRecData rdata; + XLogRecData rdata[2]; xl_xact_commit xlrec; + int nrels; + RelFileNode *rptr; + + nrels = smgrGetPendingDeletes(true, &rptr); xlrec.xtime = time(NULL); - rdata.buffer = InvalidBuffer; - rdata.data = (char *) (&xlrec); - rdata.len = SizeOfXactCommit; - rdata.next = NULL; + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactCommit; + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) rptr; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].next = NULL; + } + else + rdata[0].next = NULL; - /* - * XXX SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP - */ - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); + + if (rptr) + pfree(rptr); } else { @@ -689,26 +702,42 @@ RecordTransactionAbort(void) * We only need to log the abort in XLOG if the transaction made * any transaction-controlled XLOG entries. (Otherwise, its XID * appears nowhere in permanent storage, so no one else will ever - * care if it committed.) We do not flush XLOG to disk in any - * case, since the default assumption after a crash would be that - * we aborted, anyway. + * care if it committed.) We do not flush XLOG to disk unless + * deleting files, since the default assumption after a crash + * would be that we aborted, anyway. */ if (MyLastRecPtr.xrecoff != 0) { - XLogRecData rdata; + XLogRecData rdata[2]; xl_xact_abort xlrec; + int nrels; + RelFileNode *rptr; XLogRecPtr recptr; + nrels = smgrGetPendingDeletes(false, &rptr); + xlrec.xtime = time(NULL); - rdata.buffer = InvalidBuffer; - rdata.data = (char *) (&xlrec); - rdata.len = SizeOfXactAbort; - rdata.next = NULL; + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactAbort; + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) rptr; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].next = NULL; + } + else + rdata[0].next = NULL; - /* - * SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP - */ - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata); + + if (nrels > 0) + XLogFlush(recptr); + + if (rptr) + pfree(rptr); } /* @@ -1774,13 +1803,33 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record) if (info == XLOG_XACT_COMMIT) { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + int nfiles; + int i; + TransactionIdCommit(record->xl_xid); - /* SHOULD REMOVE FILES OF ALL DROPPED RELATIONS */ + /* Make sure files supposed to be dropped are dropped */ + nfiles = (record->xl_len - MinSizeOfXactCommit) / sizeof(RelFileNode); + for (i = 0; i < nfiles; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } } else if (info == XLOG_XACT_ABORT) { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + int nfiles; + int i; + TransactionIdAbort(record->xl_xid); - /* SHOULD REMOVE FILES OF ALL FAILED-TO-BE-CREATED RELATIONS */ + /* Make sure files supposed to be dropped are dropped */ + nfiles = (record->xl_len - MinSizeOfXactAbort) / sizeof(RelFileNode); + for (i = 0; i < nfiles; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } } else elog(PANIC, "xact_redo: unknown op code %u", info); @@ -1810,6 +1859,7 @@ xact_desc(char *buf, uint8 xl_info, char *rec) sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u", tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); + /* XXX can't show RelFileNodes for lack of access to record length */ } else if (info == XLOG_XACT_ABORT) { @@ -1819,6 +1869,7 @@ xact_desc(char *buf, uint8 xl_info, char *rec) sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u", tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); + /* XXX can't show RelFileNodes for lack of access to record length */ } else strcat(buf, "UNKNOWN"); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9056f0b4549..c0e328bf619 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.134 2004/02/10 01:55:24 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.135 2004/02/11 22:55:24 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -439,6 +439,7 @@ static bool InRedo = false; static bool AdvanceXLInsertBuffer(void); +static bool WasteXLInsertBuffer(void); static void XLogWrite(XLogwrtRqst WriteRqst); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); @@ -724,19 +725,51 @@ begin:; dtbuf_rdt[2 * i + 1].next = NULL; } - /* Insert record header */ + /* + * Determine exactly where we will place the new XLOG record. If there + * isn't enough space on the current XLOG page for a record header, + * advance to the next page (leaving the unused space as zeroes). + * If there isn't enough space in the current XLOG segment for the whole + * record, advance to the next segment (inserting wasted-space records). + * This avoids needing a continuation record at the start of a segment + * file, which would conflict with placing a FILE_HEADER record there. + * We assume that no XLOG record can be larger than a segment file... + */ updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { updrqst = AdvanceXLInsertBuffer(); - freespace = BLCKSZ - SizeOfXLogPHD; + freespace = INSERT_FREESPACE(Insert); + } + + if (freespace < (uint32) (SizeOfXLogRecord + write_len)) + { + /* Doesn't fit on this page, so check for overrunning the file */ + uint32 avail; + + /* First figure the space available in remaining pages of file */ + avail = XLogSegSize - BLCKSZ - + (Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize); + avail /= BLCKSZ; /* convert to pages, then usable bytes */ + avail *= (BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord); + avail += freespace; /* add in the current page too */ + if (avail < (uint32) (SizeOfXLogRecord + write_len)) + { + /* It overruns the file, so waste the rest of the file... */ + do { + updrqst = WasteXLInsertBuffer(); + } while ((Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize) != 0); + freespace = INSERT_FREESPACE(Insert); + } } curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; + /* Insert record header */ + record->xl_prev = Insert->PrevRecord; if (no_tran) { @@ -829,6 +862,8 @@ begin:; /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; + /* This assert checks we did not insert a file header record */ + Assert(INSERT_FREESPACE(Insert) == BLCKSZ - SizeOfXLogPHD); /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; @@ -991,17 +1026,109 @@ AdvanceXLInsertBuffer(void) */ MemSet((char *) NewPage, 0, BLCKSZ); - /* And fill the new page's header */ + /* + * Fill the new page's header + */ NewPage->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ NewPage->xlp_sui = ThisStartUpID; NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; + /* + * If first page of an XLOG segment file, add a FILE_HEADER record. + */ + if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0) + { + XLogRecPtr RecPtr; + XLogRecord *record; + XLogFileHeaderData *fhdr; + crc64 crc; + + record = (XLogRecord *) Insert->currpos; + record->xl_prev = Insert->PrevRecord; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + record->xl_xid = InvalidTransactionId; + record->xl_len = SizeOfXLogFHD; + record->xl_info = XLOG_FILE_HEADER; + record->xl_rmid = RM_XLOG_ID; + fhdr = (XLogFileHeaderData *) XLogRecGetData(record); + fhdr->xlfhd_sysid = ControlFile->system_identifier; + fhdr->xlfhd_xlogid = NewPage->xlp_pageaddr.xlogid; + fhdr->xlfhd_segno = NewPage->xlp_pageaddr.xrecoff / XLogSegSize; + fhdr->xlfhd_seg_size = XLogSegSize; + + INIT_CRC64(crc); + COMP_CRC64(crc, fhdr, SizeOfXLogFHD); + COMP_CRC64(crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(crc); + record->xl_crc = crc; + + /* Compute record's XLOG location */ + INSERT_RECPTR(RecPtr, Insert, nextidx); + + /* Record begin of record in appropriate places */ + Insert->PrevRecord = RecPtr; + + Insert->currpos += SizeOfXLogRecord + SizeOfXLogFHD; + } + return update_needed; } /* + * Fill the remainder of the current XLOG page with an XLOG_WASTED_SPACE + * record, and advance to the next page. This has the same calling and + * result conditions as AdvanceXLInsertBuffer, except that + * AdvanceXLInsertBuffer expects the current page to be already filled. + */ +static bool +WasteXLInsertBuffer(void) +{ + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogRecord *record; + XLogRecPtr RecPtr; + uint32 freespace; + uint16 curridx; + crc64 rdata_crc; + + freespace = INSERT_FREESPACE(Insert); + Assert(freespace >= SizeOfXLogRecord); + freespace -= SizeOfXLogRecord; + + curridx = Insert->curridx; + record = (XLogRecord *) Insert->currpos; + + record->xl_prev = Insert->PrevRecord; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + + record->xl_xid = InvalidTransactionId; + record->xl_len = freespace; + record->xl_info = XLOG_WASTED_SPACE; + record->xl_rmid = RM_XLOG_ID; + + INIT_CRC64(rdata_crc); + COMP_CRC64(rdata_crc, XLogRecGetData(record), freespace); + COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(rdata_crc); + record->xl_crc = rdata_crc; + + /* Compute record's XLOG location */ + INSERT_RECPTR(RecPtr, Insert, curridx); + + /* Record begin of record in appropriate places */ + Insert->PrevRecord = RecPtr; + + /* We needn't bother to advance Insert->currpos */ + + return AdvanceXLInsertBuffer(); +} + +/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * Must be called with WALWriteLock held. @@ -2142,6 +2269,7 @@ WriteControlFile(void) ControlFile->catalog_version_no = CATALOG_VERSION_NO; ControlFile->blcksz = BLCKSZ; ControlFile->relseg_size = RELSEG_SIZE; + ControlFile->xlog_seg_size = XLOG_SEG_SIZE; ControlFile->nameDataLen = NAMEDATALEN; ControlFile->funcMaxArgs = FUNC_MAX_ARGS; @@ -2295,6 +2423,13 @@ ReadControlFile(void) " but the server was compiled with RELSEG_SIZE %d.", ControlFile->relseg_size, RELSEG_SIZE), errhint("It looks like you need to recompile or initdb."))); + if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE) + ereport(FATAL, + (errmsg("database files are incompatible with server"), + errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d," + " but the server was compiled with XLOG_SEG_SIZE %d.", + ControlFile->xlog_seg_size, XLOG_SEG_SIZE), + errhint("It looks like you need to recompile or initdb."))); if (ControlFile->nameDataLen != NAMEDATALEN) ereport(FATAL, (errmsg("database files are incompatible with server"), @@ -2484,15 +2619,36 @@ BootStrapXLOG(void) char *buffer; XLogPageHeader page; XLogRecord *record; + XLogFileHeaderData *fhdr; bool use_existent; + uint64 sysidentifier; + struct timeval tv; crc64 crc; + /* + * Select a hopefully-unique system identifier code for this installation. + * We use the result of gettimeofday(), including the fractional seconds + * field, as being about as unique as we can easily get. (Think not to + * use random(), since it hasn't been seeded and there's no portable way + * to seed it other than the system clock value...) The upper half of the + * uint64 value is just the tv_sec part, while the lower half is the XOR + * of tv_sec and tv_usec. This is to ensure that we don't lose uniqueness + * unnecessarily if "uint64" is really only 32 bits wide. A person + * knowing this encoding can determine the initialization time of the + * installation, which could perhaps be useful sometimes. + */ + gettimeofday(&tv, NULL); + sysidentifier = ((uint64) tv.tv_sec) << 32; + sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec); + /* Use malloc() to ensure buffer is MAXALIGNED */ buffer = (char *) malloc(BLCKSZ); page = (XLogPageHeader) buffer; + memset(buffer, 0, BLCKSZ); + /* Set up information for the initial checkpoint record */ checkPoint.redo.xlogid = 0; - checkPoint.redo.xrecoff = SizeOfXLogPHD; + checkPoint.redo.xrecoff = SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD; checkPoint.undo = checkPoint.redo; checkPoint.ThisStartUpID = 0; checkPoint.nextXid = FirstNormalTransactionId; @@ -2503,16 +2659,42 @@ BootStrapXLOG(void) ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; - memset(buffer, 0, BLCKSZ); + /* Set up the XLOG page header */ page->xlp_magic = XLOG_PAGE_MAGIC; page->xlp_info = 0; page->xlp_sui = checkPoint.ThisStartUpID; page->xlp_pageaddr.xlogid = 0; page->xlp_pageaddr.xrecoff = 0; + + /* Insert the file header record */ record = (XLogRecord *) ((char *) page + SizeOfXLogPHD); record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0; - record->xl_xact_prev = record->xl_prev; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + record->xl_xid = InvalidTransactionId; + record->xl_len = SizeOfXLogFHD; + record->xl_info = XLOG_FILE_HEADER; + record->xl_rmid = RM_XLOG_ID; + fhdr = (XLogFileHeaderData *) XLogRecGetData(record); + fhdr->xlfhd_sysid = sysidentifier; + fhdr->xlfhd_xlogid = 0; + fhdr->xlfhd_segno = 0; + fhdr->xlfhd_seg_size = XLogSegSize; + + INIT_CRC64(crc); + COMP_CRC64(crc, fhdr, SizeOfXLogFHD); + COMP_CRC64(crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(crc); + record->xl_crc = crc; + + /* Insert the initial checkpoint record */ + record = (XLogRecord *) ((char *) page + SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD); + record->xl_prev.xlogid = 0; + record->xl_prev.xrecoff = SizeOfXLogPHD; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; record->xl_xid = InvalidTransactionId; record->xl_len = sizeof(checkPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; @@ -2526,9 +2708,11 @@ BootStrapXLOG(void) FIN_CRC64(crc); record->xl_crc = crc; + /* Create first XLOG segment file */ use_existent = false; openLogFile = XLogFileInit(0, 0, &use_existent, false); + /* Write the first page with the initial records */ errno = 0; if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ) { @@ -2552,8 +2736,11 @@ BootStrapXLOG(void) openLogFile = -1; + /* Now create pg_control */ + memset(ControlFile, 0, sizeof(ControlFileData)); /* Initialize pg_control status fields */ + ControlFile->system_identifier = sysidentifier; ControlFile->state = DB_SHUTDOWNED; ControlFile->time = checkPoint.time; ControlFile->logId = 0; @@ -2638,11 +2825,9 @@ StartupXLOG(void) /* This is just to allow attaching to startup process with a debugger */ #ifdef XLOG_REPLAY_DELAY -#ifdef WAL_DEBUG - if (XLOG_DEBUG && ControlFile->state != DB_SHUTDOWNED) + if (ControlFile->state != DB_SHUTDOWNED) sleep(60); #endif -#endif /* * Get the last valid checkpoint record. If the latest one according @@ -3241,7 +3426,7 @@ CreateCheckPoint(bool shutdown, bool force) { (void) AdvanceXLInsertBuffer(); /* OK to ignore update return flag, since we will do flush anyway */ - freespace = BLCKSZ - SizeOfXLogPHD; + freespace = INSERT_FREESPACE(Insert); } INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx); @@ -3468,6 +3653,38 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) /* Any later WAL records should be run with the then-active SUI */ ThisStartUpID = checkPoint.ThisStartUpID; } + else if (info == XLOG_FILE_HEADER) + { + XLogFileHeaderData fhdr; + + memcpy(&fhdr, XLogRecGetData(record), sizeof(XLogFileHeaderData)); + if (fhdr.xlfhd_sysid != ControlFile->system_identifier) + { + char fhdrident_str[32]; + char sysident_str[32]; + + /* + * Format sysids separately to keep platform-dependent format + * code out of the translatable message string. + */ + snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, + fhdr.xlfhd_sysid); + snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, + ControlFile->system_identifier); + ereport(PANIC, + (errmsg("WAL file is from different system"), + errdetail("WAL file SYSID is %s, pg_control SYSID is %s", + fhdrident_str, sysident_str))); + } + if (fhdr.xlfhd_seg_size != XLogSegSize) + ereport(PANIC, + (errmsg("WAL file is from different system"), + errdetail("Incorrect XLOG_SEG_SIZE in file header."))); + } + else if (info == XLOG_WASTED_SPACE) + { + /* ignore */ + } } void @@ -3500,6 +3717,22 @@ xlog_desc(char *buf, uint8 xl_info, char *rec) memcpy(&nextOid, rec, sizeof(Oid)); sprintf(buf + strlen(buf), "nextOid: %u", nextOid); } + else if (info == XLOG_FILE_HEADER) + { + XLogFileHeaderData *fhdr = (XLogFileHeaderData *) rec; + + sprintf(buf + strlen(buf), + "file header: sysid " UINT64_FORMAT "; " + "xlogid %X segno %X; seg_size %X", + fhdr->xlfhd_sysid, + fhdr->xlfhd_xlogid, + fhdr->xlfhd_segno, + fhdr->xlfhd_seg_size); + } + else if (info == XLOG_WASTED_SPACE) + { + strcat(buf, "wasted space"); + } else strcat(buf, "UNKNOWN"); } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 0271742ce0a..a7c8d3bf52b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.29 2004/02/10 01:55:24 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.30 2004/02/11 22:55:24 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -319,6 +319,9 @@ XLogCloseRelationCache(void) _xlrelarr = NULL; } +/* + * Open a relation during XLOG replay + */ Relation XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode) { @@ -386,3 +389,31 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode) return (&(res->reldata)); } + +/* + * Close a relation during XLOG replay + * + * This is called when the relation is about to be deleted; we need to ensure + * that there is no dangling smgr reference in the xlog relation cache. + * + * Currently, we don't bother to physically remove the relation from the + * cache, we just let it age out normally. + */ +void +XLogCloseRelation(RelFileNode rnode) +{ + XLogRelDesc *rdesc; + XLogRelCacheEntry *hentry; + + hentry = (XLogRelCacheEntry *) + hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL); + + if (!hentry) + return; /* not in cache so no work */ + + rdesc = hentry->rdesc; + + if (rdesc->reldata.rd_smgr != NULL) + smgrclose(rdesc->reldata.rd_smgr); + rdesc->reldata.rd_smgr = NULL; +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 58629218a3c..7d27b9bde9d 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.102 2004/02/10 01:55:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.103 2004/02/11 22:55:25 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -93,6 +93,9 @@ mdcreate(SMgrRelation reln, bool isRedo) char *path; File fd; + if (isRedo && reln->md_fd != NULL) + return true; /* created and opened already... */ + Assert(reln->md_fd == NULL); path = relpath(reln->smgr_rnode); diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 09ee4144c50..d242744a4d7 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.69 2004/02/10 01:55:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.70 2004/02/11 22:55:25 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -94,6 +94,29 @@ typedef struct PendingRelDelete static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */ +/* + * Declarations for smgr-related XLOG records + * + * Note: we log file creation and truncation here, but logging of deletion + * actions is handled by xact.c, because it is part of transaction commit. + */ + +/* XLOG gives us high 4 bits */ +#define XLOG_SMGR_CREATE 0x10 +#define XLOG_SMGR_TRUNCATE 0x20 + +typedef struct xl_smgr_create +{ + RelFileNode rnode; +} xl_smgr_create; + +typedef struct xl_smgr_truncate +{ + BlockNumber blkno; + RelFileNode rnode; +} xl_smgr_truncate; + + /* local function prototypes */ static void smgrshutdown(int code, Datum arg); static void smgr_internal_unlink(RelFileNode rnode, int which, @@ -274,6 +297,9 @@ smgrclosenode(RelFileNode rnode) void smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo) { + XLogRecPtr lsn; + XLogRecData rdata; + xl_smgr_create xlrec; PendingRelDelete *pending; if (! (*(smgrsw[reln->smgr_which].smgr_create)) (reln, isRedo)) @@ -286,6 +312,20 @@ smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo) if (isRedo) return; + /* + * Make a non-transactional XLOG entry showing the file creation. It's + * non-transactional because we should replay it whether the transaction + * commits or not; if not, the file will be dropped at abort time. + */ + xlrec.rnode = reln->smgr_rnode; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof(xlrec); + rdata.next = NULL; + + lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLOG_NO_TRAN, &rdata); + /* Add the relation to the list of stuff to delete at abort */ pending = (PendingRelDelete *) MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete)); @@ -488,6 +528,9 @@ BlockNumber smgrtruncate(SMgrRelation reln, BlockNumber nblocks) { BlockNumber newblks; + XLogRecPtr lsn; + XLogRecData rdata; + xl_smgr_truncate xlrec; /* * Tell the free space map to forget anything it may have stored @@ -496,6 +539,7 @@ smgrtruncate(SMgrRelation reln, BlockNumber nblocks) */ FreeSpaceMapTruncateRel(&reln->smgr_rnode, nblocks); + /* Do the truncation */ newblks = (*(smgrsw[reln->smgr_which].smgr_truncate)) (reln, nblocks); if (newblks == InvalidBlockNumber) ereport(ERROR, @@ -505,6 +549,21 @@ smgrtruncate(SMgrRelation reln, BlockNumber nblocks) reln->smgr_rnode.relNode, nblocks))); + /* + * Make a non-transactional XLOG entry showing the file truncation. It's + * non-transactional because we should replay it whether the transaction + * commits or not; the underlying file change is certainly not reversible. + */ + xlrec.blkno = newblks; + xlrec.rnode = reln->smgr_rnode; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof(xlrec); + rdata.next = NULL; + + lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLOG_NO_TRAN, &rdata); + return newblks; } @@ -529,6 +588,41 @@ smgrDoPendingDeletes(bool isCommit) } /* + * smgrGetPendingDeletes() -- Get a list of relations to be deleted. + * + * The return value is the number of relations scheduled for termination. + * *ptr is set to point to a freshly-palloc'd array of RelFileNodes. + * If there are no relations to be deleted, *ptr is set to NULL. + */ +int +smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr) +{ + int nrels; + RelFileNode *rptr; + PendingRelDelete *pending; + + nrels = 0; + for (pending = pendingDeletes; pending != NULL; pending = pending->next) + { + if (pending->atCommit == forCommit) + nrels++; + } + if (nrels == 0) + { + *ptr = NULL; + return 0; + } + rptr = (RelFileNode *) palloc(nrels * sizeof(RelFileNode)); + *ptr = rptr; + for (pending = pendingDeletes; pending != NULL; pending = pending->next) + { + if (pending->atCommit == forCommit) + *rptr++ = pending->relnode; + } + return nrels; +} + +/* * smgrcommit() -- Prepare to commit changes made during the current * transaction. * @@ -595,14 +689,75 @@ smgrsync(void) void smgr_redo(XLogRecPtr lsn, XLogRecord *record) { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(record); + SMgrRelation reln; + + reln = smgropen(xlrec->rnode); + smgrcreate(reln, false, true); + } + else if (info == XLOG_SMGR_TRUNCATE) + { + xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record); + SMgrRelation reln; + BlockNumber newblks; + + reln = smgropen(xlrec->rnode); + + /* Can't use smgrtruncate because it would try to xlog */ + + /* + * Tell the free space map to forget anything it may have stored + * for the about-to-be-deleted blocks. We want to be sure it + * won't return bogus block numbers later on. + */ + FreeSpaceMapTruncateRel(&reln->smgr_rnode, xlrec->blkno); + + /* Do the truncation */ + newblks = (*(smgrsw[reln->smgr_which].smgr_truncate)) (reln, + xlrec->blkno); + if (newblks == InvalidBlockNumber) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not truncate relation %u/%u to %u blocks: %m", + reln->smgr_rnode.tblNode, + reln->smgr_rnode.relNode, + xlrec->blkno))); + } + else + elog(PANIC, "smgr_redo: unknown op code %u", info); } void smgr_undo(XLogRecPtr lsn, XLogRecord *record) { + /* Since we have no transactional WAL entries, should never undo */ + elog(PANIC, "smgr_undo: cannot undo"); } void smgr_desc(char *buf, uint8 xl_info, char *rec) { + uint8 info = xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) rec; + + sprintf(buf + strlen(buf), "file create: %u/%u", + xlrec->rnode.tblNode, xlrec->rnode.relNode); + } + else if (info == XLOG_SMGR_TRUNCATE) + { + xl_smgr_truncate *xlrec = (xl_smgr_truncate *) rec; + + sprintf(buf + strlen(buf), "file truncate: %u/%u to %u blocks", + xlrec->rnode.tblNode, xlrec->rnode.relNode, + xlrec->blkno); + } + else + strcat(buf, "UNKNOWN"); } |