diff options
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 200 | ||||
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 17 | ||||
-rw-r--r-- | src/backend/storage/buffer/xlog_bufmgr.c | 142 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 177 | ||||
-rw-r--r-- | src/backend/storage/smgr/md.c | 162 | ||||
-rw-r--r-- | src/backend/storage/smgr/mm.c | 6 | ||||
-rw-r--r-- | src/backend/storage/smgr/smgr.c | 134 |
7 files changed, 556 insertions, 282 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 9c9bda5035c..8d40e8d952f 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.93 2000/11/08 22:09:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -709,23 +709,28 @@ refcount = %ld, file: %s, line: %d\n", #endif /* - * FlushBuffer -- like WriteBuffer, but force the page to disk. + * FlushBuffer -- like WriteBuffer, but write the page immediately, + * rather than just marking it dirty. On success return, the buffer will + * no longer be dirty. * * 'buffer' is known to be dirty/pinned, so there should not be a * problem reading the BufferDesc members without the BufMgrLock * (nobody should be able to change tags out from under us). * - * Unpin if 'release' is TRUE. + * If 'sync' is true, a synchronous write is wanted (wait for buffer to hit + * the disk). Otherwise it's sufficient to issue the kernel write call. + * + * Unpin buffer if 'release' is true. */ int -FlushBuffer(Buffer buffer, bool release) +FlushBuffer(Buffer buffer, bool sync, bool release) { BufferDesc *bufHdr; Relation bufrel; int status; if (BufferIsLocal(buffer)) - return FlushLocalBuffer(buffer, release) ? STATUS_OK : STATUS_ERROR; + return FlushLocalBuffer(buffer, sync, release) ? STATUS_OK : STATUS_ERROR; if (BAD_BUFFER_ID(buffer)) return STATUS_ERROR; @@ -755,12 +760,16 @@ FlushBuffer(Buffer buffer, bool release) */ LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE); - status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, - (char *) MAKE_PTR(bufHdr->data)); + if (sync) + status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, + (char *) MAKE_PTR(bufHdr->data)); + else + status = smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, + (char *) MAKE_PTR(bufHdr->data)); LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK); - /* drop relcache refcnt incremented by RelationIdCacheGetRelation */ + /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */ RelationDecrementReferenceCount(bufrel); if (status == SM_FAIL) @@ -926,7 +935,7 @@ SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr) /* * drop relcache refcnt incremented by - * RelationIdCacheGetRelation + * RelationNodeCacheGetRelation */ RelationDecrementReferenceCount(reln); } @@ -1123,7 +1132,7 @@ BufferSync() bufHdr->flags &= ~BM_DIRTY; } - /* drop refcnt obtained by RelationIdCacheGetRelation */ + /* drop refcnt obtained by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) RelationDecrementReferenceCount(reln); } @@ -1154,7 +1163,7 @@ BufferSync() /* * drop relcache refcnt incremented by - * RelationIdCacheGetRelation + * RelationNodeCacheGetRelation */ RelationDecrementReferenceCount(reln); @@ -1458,7 +1467,7 @@ BufferReplace(BufferDesc *bufHdr) SpinAcquire(BufMgrLock); - /* drop relcache refcnt incremented by RelationIdCacheGetRelation */ + /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) RelationDecrementReferenceCount(reln); @@ -1495,21 +1504,23 @@ RelationGetNumberOfBlocks(Relation relation) } /* --------------------------------------------------------------------- - * ReleaseRelationBuffers + * DropRelationBuffers * * This function removes all the buffered pages for a relation * from the buffer pool. Dirty pages are simply dropped, without - * bothering to write them out first. This is used when the - * relation is about to be deleted. We assume that the caller - * holds an exclusive lock on the relation, which should assure - * that no new buffers will be acquired for the rel meanwhile. + * bothering to write them out first. This is NOT rollback-able, + * and so should be used only with extreme caution! + * + * We assume that the caller holds an exclusive lock on the relation, + * which should assure that no new buffers will be acquired for the rel + * meanwhile. * * XXX currently it sequentially searches the buffer pool, should be * changed to more clever ways of searching. * -------------------------------------------------------------------- */ void -ReleaseRelationBuffers(Relation rel) +DropRelationBuffers(Relation rel) { int i; BufferDesc *bufHdr; @@ -1589,7 +1600,104 @@ recheck: * this rel, since we hold exclusive lock on this rel. */ if (RelFileNodeEquals(rel->rd_node, - BufferTagLastDirtied[i - 1].rnode)) + BufferTagLastDirtied[i - 1].rnode)) + BufferDirtiedByMe[i - 1] = false; + } + + SpinRelease(BufMgrLock); +} + +/* --------------------------------------------------------------------- + * DropRelFileNodeBuffers + * + * This is the same as DropRelationBuffers, except that the target + * relation is specified by RelFileNode. + * + * This is NOT rollback-able. One legitimate use is to clear the + * buffer cache of buffers for a relation that is being deleted + * during transaction abort. + * -------------------------------------------------------------------- + */ +void +DropRelFileNodeBuffers(RelFileNode rnode) +{ + int i; + BufferDesc *bufHdr; + + /* We have to search both local and shared buffers... */ + + for (i = 0; i < NLocBuffer; i++) + { + bufHdr = &LocalBufferDescriptors[i]; + if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) + { + bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + LocalRefCount[i] = 0; + bufHdr->tag.rnode.relNode = InvalidOid; + } + } + + SpinAcquire(BufMgrLock); + for (i = 1; i <= NBuffers; i++) + { + bufHdr = &BufferDescriptors[i - 1]; +recheck: + if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) + { + + /* + * If there is I/O in progress, better wait till it's done; + * don't want to delete the relation out from under someone + * who's just trying to flush the buffer! + */ + if (bufHdr->flags & BM_IO_IN_PROGRESS) + { + WaitIO(bufHdr, BufMgrLock); + + /* + * By now, the buffer very possibly belongs to some other + * rel, so check again before proceeding. + */ + goto recheck; + } + /* Now we can do what we came for */ + bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + + /* + * Release any refcount we may have. + * + * This is very probably dead code, and if it isn't then it's + * probably wrong. I added the Assert to find out --- tgl + * 11/99. + */ + if (!(bufHdr->flags & BM_FREE)) + { + /* Assert checks that buffer will actually get freed! */ + Assert(PrivateRefCount[i - 1] == 1 && + bufHdr->refcount == 1); + /* ReleaseBuffer expects we do not hold the lock at entry */ + SpinRelease(BufMgrLock); + ReleaseBuffer(i); + SpinAcquire(BufMgrLock); + } + /* + * And mark the buffer as no longer occupied by this rel. + */ + BufTableDelete(bufHdr); + } + + /* + * Also check to see if BufferDirtiedByMe info for this buffer + * refers to the target relation, and clear it if so. This is + * independent of whether the current contents of the buffer + * belong to the target relation! + * + * NOTE: we have no way to clear BufferDirtiedByMe info in other + * backends, but hopefully there are none with that bit set for + * this rel, since we hold exclusive lock on this rel. + */ + if (RelFileNodeEquals(rnode, + BufferTagLastDirtied[i - 1].rnode)) BufferDirtiedByMe[i - 1] = false; } @@ -1604,7 +1712,7 @@ recheck: * bothering to write them out first. This is used when we destroy a * database, to avoid trying to flush data to disk when the directory * tree no longer exists. Implementation is pretty similar to - * ReleaseRelationBuffers() which is for destroying just one relation. + * DropRelationBuffers() which is for destroying just one relation. * -------------------------------------------------------------------- */ void @@ -1757,33 +1865,32 @@ BufferPoolBlowaway() /* --------------------------------------------------------------------- * FlushRelationBuffers * - * This function flushes all dirty pages of a relation out to disk. + * This function writes all dirty pages of a relation out to disk. * Furthermore, pages that have blocknumber >= firstDelBlock are * actually removed from the buffer pool. An error code is returned * if we fail to dump a dirty buffer or if we find one of * the target pages is pinned into the cache. * - * This is used by VACUUM before truncating the relation to the given - * number of blocks. (TRUNCATE TABLE also uses it in the same way.) - * It might seem unnecessary to flush dirty pages before firstDelBlock, - * since VACUUM should already have committed its changes. However, - * it is possible for there still to be dirty pages: if some page - * had unwritten on-row tuple status updates from a prior transaction, - * and VACUUM had no additional changes to make to that page, then - * VACUUM won't have written it. This is harmless in most cases but - * will break pg_upgrade, which relies on VACUUM to ensure that *all* - * tuples have correct on-row status. So, we check and flush all - * dirty pages of the rel regardless of block number. - * - * This is also used by RENAME TABLE (with firstDelBlock = 0) - * to clear out the buffer cache before renaming the physical files of - * a relation. Without that, some other backend might try to do a - * blind write of a buffer page (relying on the BlindId of the buffer) - * and fail because it's not got the right filename anymore. + * This is called by DROP TABLE to clear buffers for the relation + * from the buffer pool. Note that we must write dirty buffers, + * rather than just dropping the changes, because our transaction + * might abort later on; we want to roll back safely in that case. + * + * This is also called by VACUUM before truncating the relation to the + * given number of blocks. It might seem unnecessary for VACUUM to + * write dirty pages before firstDelBlock, since VACUUM should already + * have committed its changes. However, it is possible for there still + * to be dirty pages: if some page had unwritten on-row tuple status + * updates from a prior transaction, and VACUUM had no additional + * changes to make to that page, then VACUUM won't have written it. + * This is harmless in most cases but will break pg_upgrade, which + * relies on VACUUM to ensure that *all* tuples have correct on-row + * status. So, we check and flush all dirty pages of the rel + * regardless of block number. * * In all cases, the caller should be holding AccessExclusiveLock on * the target relation to ensure that no other backend is busy reading - * more blocks of the relation. + * more blocks of the relation (or might do so before we commit). * * Formerly, we considered it an error condition if we found dirty * buffers here. However, since BufferSync no longer forces out all @@ -1812,7 +1919,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) { if (bufHdr->flags & BM_DIRTY) { - if (FlushBuffer(-i - 1, false) != STATUS_OK) + if (FlushBuffer(-i - 1, false, false) != STATUS_OK) { elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it", RelationGetRelationName(rel), firstDelBlock, @@ -1840,15 +1947,17 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) for (i = 0; i < NBuffers; i++) { bufHdr = &BufferDescriptors[i]; -recheck: if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node)) { if (bufHdr->flags & BM_DIRTY) { PinBuffer(bufHdr); SpinRelease(BufMgrLock); - if (FlushBuffer(i + 1, true) != STATUS_OK) + if (FlushBuffer(i + 1, false, false) != STATUS_OK) { + SpinAcquire(BufMgrLock); + UnpinBuffer(bufHdr); + SpinRelease(BufMgrLock); elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it", RelationGetRelationName(rel), firstDelBlock, bufHdr->tag.blockNum, @@ -1856,12 +1965,7 @@ recheck: return -1; } SpinAcquire(BufMgrLock); - - /* - * Buffer could already be reassigned, so must recheck - * whether it still belongs to rel before freeing it! - */ - goto recheck; + UnpinBuffer(bufHdr); } if (!(bufHdr->flags & BM_FREE)) { diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index faa3304b4f6..352f519bdc0 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -16,7 +16,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.34 2000/11/08 22:09:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -183,7 +183,7 @@ WriteLocalBuffer(Buffer buffer, bool release) * flushes a local buffer */ int -FlushLocalBuffer(Buffer buffer, bool release) +FlushLocalBuffer(Buffer buffer, bool sync, bool release) { int bufid; Relation bufrel; @@ -199,13 +199,18 @@ FlushLocalBuffer(Buffer buffer, bool release) bufHdr = &LocalBufferDescriptors[bufid]; bufHdr->flags &= ~BM_DIRTY; bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode); - Assert(bufrel != NULL); - smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, - (char *) MAKE_PTR(bufHdr->data)); + + if (sync) + smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, + (char *) MAKE_PTR(bufHdr->data)); + else + smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, + (char *) MAKE_PTR(bufHdr->data)); + LocalBufferFlushCount++; - /* drop relcache refcount incremented by RelationIdCacheGetRelation */ + /* drop relcache refcount incremented by RelationNodeCacheGetRelation */ RelationDecrementReferenceCount(bufrel); if (release) diff --git a/src/backend/storage/buffer/xlog_bufmgr.c b/src/backend/storage/buffer/xlog_bufmgr.c index dcd377b7eb3..15c4321405e 100644 --- a/src/backend/storage/buffer/xlog_bufmgr.c +++ b/src/backend/storage/buffer/xlog_bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.2 2000/11/08 22:09:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -838,7 +838,7 @@ BufferSync() SpinRelease(BufMgrLock); - /* drop refcnt obtained by RelationIdCacheGetRelation */ + /* drop refcnt obtained by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) { RelationDecrementReferenceCount(reln); @@ -1128,7 +1128,7 @@ BufferReplace(BufferDesc *bufHdr) false); /* no fsync */ } - /* drop relcache refcnt incremented by RelationIdCacheGetRelation */ + /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) RelationDecrementReferenceCount(reln); @@ -1159,21 +1159,23 @@ RelationGetNumberOfBlocks(Relation relation) } /* --------------------------------------------------------------------- - * ReleaseRelationBuffers + * DropRelationBuffers * * This function removes all the buffered pages for a relation * from the buffer pool. Dirty pages are simply dropped, without - * bothering to write them out first. This is used when the - * relation is about to be deleted. We assume that the caller - * holds an exclusive lock on the relation, which should assure - * that no new buffers will be acquired for the rel meanwhile. + * bothering to write them out first. This is NOT rollback-able, + * and so should be used only with extreme caution! + * + * We assume that the caller holds an exclusive lock on the relation, + * which should assure that no new buffers will be acquired for the rel + * meanwhile. * * XXX currently it sequentially searches the buffer pool, should be * changed to more clever ways of searching. * -------------------------------------------------------------------- */ void -ReleaseRelationBuffers(Relation rel) +DropRelationBuffers(Relation rel) { int i; BufferDesc *bufHdr; @@ -1249,6 +1251,91 @@ recheck: } /* --------------------------------------------------------------------- + * DropRelFileNodeBuffers + * + * This is the same as DropRelationBuffers, except that the target + * relation is specified by RelFileNode. + * + * This is NOT rollback-able. One legitimate use is to clear the + * buffer cache of buffers for a relation that is being deleted + * during transaction abort. + * -------------------------------------------------------------------- + */ +void +DropRelFileNodeBuffers(RelFileNode rnode) +{ + int i; + BufferDesc *bufHdr; + + /* We have to search both local and shared buffers... */ + + for (i = 0; i < NLocBuffer; i++) + { + bufHdr = &LocalBufferDescriptors[i]; + if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) + { + bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + bufHdr->cntxDirty = false; + LocalRefCount[i] = 0; + bufHdr->tag.rnode.relNode = InvalidOid; + } + } + + SpinAcquire(BufMgrLock); + for (i = 1; i <= NBuffers; i++) + { + bufHdr = &BufferDescriptors[i - 1]; +recheck: + if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) + { + + /* + * If there is I/O in progress, better wait till it's done; + * don't want to delete the relation out from under someone + * who's just trying to flush the buffer! + */ + if (bufHdr->flags & BM_IO_IN_PROGRESS) + { + WaitIO(bufHdr, BufMgrLock); + + /* + * By now, the buffer very possibly belongs to some other + * rel, so check again before proceeding. + */ + goto recheck; + } + /* Now we can do what we came for */ + bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + bufHdr->cntxDirty = false; + + /* + * Release any refcount we may have. + * + * This is very probably dead code, and if it isn't then it's + * probably wrong. I added the Assert to find out --- tgl + * 11/99. + */ + if (!(bufHdr->flags & BM_FREE)) + { + /* Assert checks that buffer will actually get freed! */ + Assert(PrivateRefCount[i - 1] == 1 && + bufHdr->refcount == 1); + /* ReleaseBuffer expects we do not hold the lock at entry */ + SpinRelease(BufMgrLock); + ReleaseBuffer(i); + SpinAcquire(BufMgrLock); + } + /* + * And mark the buffer as no longer occupied by this rel. + */ + BufTableDelete(bufHdr); + } + } + + SpinRelease(BufMgrLock); +} + +/* --------------------------------------------------------------------- * DropBuffers * * This function removes all the buffers in the buffer cache for a @@ -1256,7 +1343,7 @@ recheck: * bothering to write them out first. This is used when we destroy a * database, to avoid trying to flush data to disk when the directory * tree no longer exists. Implementation is pretty similar to - * ReleaseRelationBuffers() which is for destroying just one relation. + * DropRelationBuffers() which is for destroying just one relation. * -------------------------------------------------------------------- */ void @@ -1399,33 +1486,32 @@ BufferPoolBlowaway() /* --------------------------------------------------------------------- * FlushRelationBuffers * - * This function flushes all dirty pages of a relation out to disk. + * This function writes all dirty pages of a relation out to disk. * Furthermore, pages that have blocknumber >= firstDelBlock are * actually removed from the buffer pool. An error code is returned * if we fail to dump a dirty buffer or if we find one of * the target pages is pinned into the cache. * - * This is used by VACUUM before truncating the relation to the given - * number of blocks. (TRUNCATE TABLE also uses it in the same way.) - * It might seem unnecessary to flush dirty pages before firstDelBlock, - * since VACUUM should already have committed its changes. However, - * it is possible for there still to be dirty pages: if some page - * had unwritten on-row tuple status updates from a prior transaction, - * and VACUUM had no additional changes to make to that page, then - * VACUUM won't have written it. This is harmless in most cases but - * will break pg_upgrade, which relies on VACUUM to ensure that *all* - * tuples have correct on-row status. So, we check and flush all - * dirty pages of the rel regardless of block number. + * This is called by DROP TABLE to clear buffers for the relation + * from the buffer pool. Note that we must write dirty buffers, + * rather than just dropping the changes, because our transaction + * might abort later on; we want to roll back safely in that case. * - * This is also used by RENAME TABLE (with firstDelBlock = 0) - * to clear out the buffer cache before renaming the physical files of - * a relation. Without that, some other backend might try to do a - * blind write of a buffer page (relying on the BlindId of the buffer) - * and fail because it's not got the right filename anymore. + * This is also called by VACUUM before truncating the relation to the + * given number of blocks. It might seem unnecessary for VACUUM to + * write dirty pages before firstDelBlock, since VACUUM should already + * have committed its changes. However, it is possible for there still + * to be dirty pages: if some page had unwritten on-row tuple status + * updates from a prior transaction, and VACUUM had no additional + * changes to make to that page, then VACUUM won't have written it. + * This is harmless in most cases but will break pg_upgrade, which + * relies on VACUUM to ensure that *all* tuples have correct on-row + * status. So, we check and flush all dirty pages of the rel + * regardless of block number. * * In all cases, the caller should be holding AccessExclusiveLock on * the target relation to ensure that no other backend is busy reading - * more blocks of the relation. + * more blocks of the relation (or might do so before we commit). * * Formerly, we considered it an error condition if we found dirty * buffers here. However, since BufferSync no longer forces out all diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 23a2dcf1e24..14325e53183 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.71 2000/07/17 03:05:08 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.72 2000/11/08 22:10:00 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -453,7 +453,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod) bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) { - XIDLookupEnt *result, + XIDLookupEnt *xident, item; HTAB *xidTable; bool found; @@ -559,9 +559,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) /* * Find or create an xid entry with this tag */ - result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found); - if (!result) + if (!xident) { SpinRelease(masterLock); elog(NOTICE, "LockAcquire: xid table corrupted"); @@ -573,16 +573,41 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ if (!found) { - result->nHolding = 0; - MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES); - ProcAddLock(&result->queue); - XID_PRINT("LockAcquire: new", result); + xident->nHolding = 0; + MemSet((char *) xident->holders, 0, sizeof(int) * MAX_LOCKMODES); + ProcAddLock(&xident->queue); + XID_PRINT("LockAcquire: new", xident); } else { - XID_PRINT("LockAcquire: found", result); - Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0)); - Assert(result->nHolding <= lock->nActive); + int i; + + XID_PRINT("LockAcquire: found", xident); + Assert((xident->nHolding > 0) && (xident->holders[lockmode] >= 0)); + Assert(xident->nHolding <= lock->nActive); + /* + * Issue warning if we already hold a lower-level lock on this + * object and do not hold a lock of the requested level or higher. + * This indicates a deadlock-prone coding practice (eg, we'd have + * a deadlock if another backend were following the same code path + * at about the same time). + * + * XXX Doing numeric comparison on the lockmodes is a hack; + * it'd be better to use a table. For now, though, this works. + */ + for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) + { + if (xident->holders[i] > 0) + { + if (i >= (int) lockmode) + break; /* safe: we have a lock >= req level */ + elog(DEBUG, "Deadlock risk: raising lock level" + " from %s to %s on object %u/%u/%u", + lock_types[i], lock_types[lockmode], + lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); + break; + } + } } /* ---------------- @@ -601,12 +626,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * hold this lock. * -------------------- */ - if (result->nHolding == lock->nActive || result->holders[lockmode] != 0) + if (xident->nHolding == lock->nActive || xident->holders[lockmode] != 0) { - result->holders[lockmode]++; - result->nHolding++; - XID_PRINT("LockAcquire: owning", result); - Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); + xident->holders[lockmode]++; + xident->nHolding++; + XID_PRINT("LockAcquire: owning", xident); + Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0)); GrantLock(lock, lockmode); SpinRelease(masterLock); return TRUE; @@ -623,27 +648,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * If I don't hold locks or my locks don't conflict with waiters * then force to sleep. */ - if (result->nHolding > 0) + if (xident->nHolding > 0) { for (; i <= lockMethodTable->ctl->numLockModes; i++) { - if (result->holders[i] > 0 && + if (xident->holders[i] > 0 && lockMethodTable->ctl->conflictTab[i] & lock->waitMask) break; /* conflict */ } } - if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes) + if (xident->nHolding == 0 || i > lockMethodTable->ctl->numLockModes) { XID_PRINT("LockAcquire: higher priority proc waiting", - result); + xident); status = STATUS_FOUND; } else - status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); + status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident); } else - status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); + status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident); if (status == STATUS_OK) GrantLock(lock, lockmode); @@ -657,17 +682,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ if (lockmethod == USER_LOCKMETHOD) { - if (!result->nHolding) + if (!xident->nHolding) { - SHMQueueDelete(&result->queue); - result = (XIDLookupEnt *) hash_search(xidTable, - (Pointer) result, + SHMQueueDelete(&xident->queue); + xident = (XIDLookupEnt *) hash_search(xidTable, + (Pointer) xident, HASH_REMOVE, &found); - if (!result || !found) + if (!xident || !found) elog(NOTICE, "LockAcquire: remove xid, table corrupted"); } else - XID_PRINT("LockAcquire: NHOLDING", result); + XID_PRINT("LockAcquire: NHOLDING", xident); lock->nHolding--; lock->holders[lockmode]--; LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode); @@ -682,7 +707,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * Construct bitmask of locks we hold before going to sleep. */ MyProc->holdLock = 0; - if (result->nHolding > 0) + if (xident->nHolding > 0) { int i, tmpMask = 2; @@ -690,7 +715,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++, tmpMask <<= 1) { - if (result->holders[i] > 0) + if (xident->holders[i] > 0) MyProc->holdLock |= tmpMask; } Assert(MyProc->holdLock != 0); @@ -702,15 +727,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * Check the xid entry status, in case something in the ipc * communication doesn't work correctly. */ - if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) + if (!((xident->nHolding > 0) && (xident->holders[lockmode] > 0))) { - XID_PRINT("LockAcquire: INCONSISTENT", result); + XID_PRINT("LockAcquire: INCONSISTENT", xident); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ SpinRelease(masterLock); return FALSE; } - XID_PRINT("LockAcquire: granted", result); + XID_PRINT("LockAcquire: granted", xident); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } @@ -738,7 +763,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, TransactionId xid, XIDLookupEnt *xidentP) /* xident ptr or NULL */ { - XIDLookupEnt *result, + XIDLookupEnt *xident, item; int *myHolders; int numLockModes; @@ -758,7 +783,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * A pointer to the xid entry was supplied from the caller. * Actually only LockAcquire can do it. */ - result = xidentP; + xident = xidentP; } else { @@ -788,9 +813,9 @@ LockResolveConflicts(LOCKMETHOD lockmethod, /* * Find or create an xid entry with this tag */ - result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found); - if (!result) + if (!xident) { elog(NOTICE, "LockResolveConflicts: xid table corrupted"); return STATUS_ERROR; @@ -808,14 +833,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * the lock stats. * --------------- */ - MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders))); - result->nHolding = 0; - XID_PRINT("LockResolveConflicts: NOT FOUND", result); + MemSet(xident->holders, 0, numLockModes * sizeof(*(lock->holders))); + xident->nHolding = 0; + XID_PRINT("LockResolveConflicts: NOT FOUND", xident); } else - XID_PRINT("LockResolveConflicts: found", result); + XID_PRINT("LockResolveConflicts: found", xident); } - Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0)); + Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0)); /* ---------------------------- * first check for global conflicts: If no locks conflict @@ -829,10 +854,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod, */ if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask)) { - result->holders[lockmode]++; - result->nHolding++; - XID_PRINT("LockResolveConflicts: no conflict", result); - Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); + xident->holders[lockmode]++; + xident->nHolding++; + XID_PRINT("LockResolveConflicts: no conflict", xident); + Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0)); return STATUS_OK; } @@ -842,7 +867,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * that does not reflect our own locks. * ------------------------ */ - myHolders = result->holders; + myHolders = xident->holders; bitmask = 0; tmpMask = 2; for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) @@ -861,14 +886,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod, if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask)) { /* no conflict. Get the lock and go on */ - result->holders[lockmode]++; - result->nHolding++; - XID_PRINT("LockResolveConflicts: resolved", result); - Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); + xident->holders[lockmode]++; + xident->nHolding++; + XID_PRINT("LockResolveConflicts: resolved", xident); + Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0)); return STATUS_OK; } - XID_PRINT("LockResolveConflicts: conflicting", result); + XID_PRINT("LockResolveConflicts: conflicting", xident); return STATUS_FOUND; } @@ -965,7 +990,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) SPINLOCK masterLock; bool found; LOCKMETHODTABLE *lockMethodTable; - XIDLookupEnt *result, + XIDLookupEnt *xident, item; HTAB *xidTable; TransactionId xid; @@ -1053,9 +1078,9 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * Find an xid entry with this tag */ xidTable = lockMethodTable->xidHash; - result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_FIND_SAVE, &found); - if (!result || !found) + if (!xident || !found) { SpinRelease(masterLock); #ifdef USER_LOCKS @@ -1066,23 +1091,23 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) elog(NOTICE, "LockRelease: xid table corrupted"); return FALSE; } - XID_PRINT("LockRelease: found", result); - Assert(result->tag.lock == MAKE_OFFSET(lock)); + XID_PRINT("LockRelease: found", xident); + Assert(xident->tag.lock == MAKE_OFFSET(lock)); /* * Check that we are actually holding a lock of the type we want to * release. */ - if (!(result->holders[lockmode] > 0)) + if (!(xident->holders[lockmode] > 0)) { SpinRelease(masterLock); - XID_PRINT("LockAcquire: WRONGTYPE", result); + XID_PRINT("LockAcquire: WRONGTYPE", xident); elog(NOTICE, "LockRelease: you don't own a lock of type %s", lock_types[lockmode]); - Assert(result->holders[lockmode] >= 0); + Assert(xident->holders[lockmode] >= 0); return FALSE; } - Assert(result->nHolding > 0); + Assert(xident->nHolding > 0); /* * fix the general lock stats @@ -1147,27 +1172,27 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * now check to see if I have any private locks. If I do, decrement * the counts associated with them. */ - result->holders[lockmode]--; - result->nHolding--; - XID_PRINT("LockRelease: updated", result); - Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0)); + xident->holders[lockmode]--; + xident->nHolding--; + XID_PRINT("LockRelease: updated", xident); + Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0)); /* * If this was my last hold on this lock, delete my entry in the XID * table. */ - if (!result->nHolding) + if (!xident->nHolding) { - if (result->queue.prev == INVALID_OFFSET) + if (xident->queue.prev == INVALID_OFFSET) elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET"); - if (result->queue.next == INVALID_OFFSET) + if (xident->queue.next == INVALID_OFFSET) elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET"); - if (result->queue.next != INVALID_OFFSET) - SHMQueueDelete(&result->queue); - XID_PRINT("LockRelease: deleting", result); - result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result, + if (xident->queue.next != INVALID_OFFSET) + SHMQueueDelete(&xident->queue); + XID_PRINT("LockRelease: deleting", xident); + xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &xident, HASH_REMOVE_SAVED, &found); - if (!result || !found) + if (!xident || !found) { SpinRelease(masterLock); elog(NOTICE, "LockRelease: remove xid, table corrupted"); @@ -1196,7 +1221,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) int done; XIDLookupEnt *xidLook = NULL; XIDLookupEnt *tmp = NULL; - XIDLookupEnt *result; + XIDLookupEnt *xident; SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SPINLOCK masterLock; LOCKMETHODTABLE *lockMethodTable; @@ -1371,11 +1396,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) */ XID_PRINT("LockReleaseAll: deleting", xidLook); - result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash, + xident = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found); - if (!result || !found) + if (!xident || !found) { SpinRelease(masterLock); elog(NOTICE, "LockReleaseAll: xid table corrupted"); diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index da466afe9f8..c97a46ba4b4 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -8,17 +8,17 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.78 2000/11/08 22:10:00 tgl Exp $ * *------------------------------------------------------------------------- */ +#include "postgres.h" + #include <errno.h> #include <unistd.h> #include <fcntl.h> #include <sys/file.h> -#include "postgres.h" - #include "catalog/catalog.h" #include "miscadmin.h" #include "storage/smgr.h" @@ -123,63 +123,39 @@ mdinit() int mdcreate(Relation reln) { + char *path; int fd, vfd; - char *path; - Assert(reln->rd_unlinked && reln->rd_fd < 0); + Assert(reln->rd_fd < 0); path = relpath(reln->rd_node); - fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600); - /* - * For cataloged relations, pg_class is guaranteed to have a unique - * record with the same relname by the unique index. So we are able to - * reuse existent files for new cataloged relations. Currently we reuse - * them in the following cases. 1. they are empty. 2. they are used - * for Index relations and their size == BLCKSZ * 2. - * - * During bootstrap processing, we skip that check, because pg_time, - * pg_variable, and pg_log get created before their .bki file entries - * are processed. - */ + fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600); if (fd < 0) { int save_errno = errno; - if (!IsBootstrapProcessingMode() && - reln->rd_rel->relkind == RELKIND_UNCATALOGED) - return -1; - - fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600); + /* + * During bootstrap, there are cases where a system relation will be + * accessed (by internal backend processes) before the bootstrap + * script nominally creates it. Therefore, allow the file to exist + * already, but in bootstrap mode only. (See also mdopen) + */ + if (IsBootstrapProcessingMode()) + fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600); if (fd < 0) { + pfree(path); /* be sure to return the error reported by create, not open */ errno = save_errno; return -1; } - if (!IsBootstrapProcessingMode()) - { - bool reuse = false; - long len = FileSeek(fd, 0L, SEEK_END); - - if (len == 0) - reuse = true; - else if (reln->rd_rel->relkind == RELKIND_INDEX && - len == BLCKSZ * 2) - reuse = true; - if (!reuse) - { - FileClose(fd); - /* be sure to return the error reported by create */ - errno = save_errno; - return -1; - } - } errno = 0; } - reln->rd_unlinked = false; + + pfree(path); vfd = _fdvec_alloc(); if (vfd < 0) @@ -187,12 +163,10 @@ mdcreate(Relation reln) Md_fdvec[vfd].mdfd_vfd = fd; Md_fdvec[vfd].mdfd_flags = (uint16) 0; + Md_fdvec[vfd].mdfd_lstbcnt = 0; #ifndef LET_OS_MANAGE_FILESIZE Md_fdvec[vfd].mdfd_chain = (MdfdVec *) NULL; #endif - Md_fdvec[vfd].mdfd_lstbcnt = 0; - - pfree(path); return vfd; } @@ -201,65 +175,50 @@ mdcreate(Relation reln) * mdunlink() -- Unlink a relation. */ int -mdunlink(Relation reln) +mdunlink(RelFileNode rnode) { - int nblocks; - int fd; - MdfdVec *v; - - /* - * If the relation is already unlinked,we have nothing to do any more. - */ - if (reln->rd_unlinked && reln->rd_fd < 0) - return SM_SUCCESS; - - /* - * Force all segments of the relation to be opened, so that we won't - * miss deleting any of them. - */ - nblocks = mdnblocks(reln); + int status = SM_SUCCESS; + int save_errno = 0; + char *path; - /* - * Clean out the mdfd vector, letting fd.c unlink the physical files. - * - * NOTE: We truncate the file(s) before deleting 'em, because if other - * backends are holding the files open, the unlink will fail on some - * platforms (think Microsoft). Better a zero-size file gets left - * around than a big file. Those other backends will be forced to - * close the relation by cache invalidation, but that probably hasn't - * happened yet. - */ - fd = RelationGetFile(reln); - if (fd < 0) /* should not happen */ - elog(ERROR, "mdunlink: mdnblocks didn't open relation"); + path = relpath(rnode); - Md_fdvec[fd].mdfd_flags = (uint16) 0; + /* Delete the first segment, or only segment if not doing segmenting */ + if (unlink(path) < 0) + { + status = SM_FAIL; + save_errno = errno; + } #ifndef LET_OS_MANAGE_FILESIZE - for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;) + /* Get the additional segments, if any */ + if (status == SM_SUCCESS) { - MdfdVec *ov = v; + char *segpath = (char *) palloc(strlen(path) + 12); + int segno; - FileTruncate(v->mdfd_vfd, 0); - FileUnlink(v->mdfd_vfd); - v = v->mdfd_chain; - if (ov != &Md_fdvec[fd]) - pfree(ov); + for (segno = 1; ; segno++) + { + sprintf(segpath, "%s.%d", path, segno); + if (unlink(segpath) < 0) + { + /* ENOENT is expected after the last segment... */ + if (errno != ENOENT) + { + status = SM_FAIL; + save_errno = errno; + } + break; + } + } + pfree(segpath); } - Md_fdvec[fd].mdfd_chain = (MdfdVec *) NULL; -#else - v = &Md_fdvec[fd]; - FileTruncate(v->mdfd_vfd, 0); - FileUnlink(v->mdfd_vfd); #endif - _fdvec_free(fd); - - /* be sure to mark relation closed && unlinked */ - reln->rd_fd = -1; - reln->rd_unlinked = true; + pfree(path); - return SM_SUCCESS; + errno = save_errno; + return status; } /* @@ -327,24 +286,29 @@ mdopen(Relation reln) int vfd; Assert(reln->rd_fd < 0); + path = relpath(reln->rd_node); fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600); + if (fd < 0) { - /* in bootstrap mode, accept mdopen as substitute for mdcreate */ + /* + * During bootstrap, there are cases where a system relation will be + * accessed (by internal backend processes) before the bootstrap + * script nominally creates it. Therefore, accept mdopen() as a + * substitute for mdcreate() in bootstrap mode only. (See mdcreate) + */ if (IsBootstrapProcessingMode()) fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600); if (fd < 0) { - elog(NOTICE, "mdopen: couldn't open %s: %m", path); - /* mark relation closed and unlinked */ - reln->rd_fd = -1; - reln->rd_unlinked = true; + pfree(path); return -1; } } - reln->rd_unlinked = false; + + pfree(path); vfd = _fdvec_alloc(); if (vfd < 0) @@ -362,8 +326,6 @@ mdopen(Relation reln) #endif #endif - pfree(path); - return vfd; } diff --git a/src/backend/storage/smgr/mm.c b/src/backend/storage/smgr/mm.c index a5b22cbcc5c..d64aeb6a418 100644 --- a/src/backend/storage/smgr/mm.c +++ b/src/backend/storage/smgr/mm.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.19 2000/04/10 23:41:51 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.20 2000/11/08 22:10:00 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -204,9 +204,11 @@ mmcreate(Relation reln) /* * mmunlink() -- Unlink a relation. + * + * XXX currently broken: needs to accept RelFileNode, not Relation */ int -mmunlink(Relation reln) +mmunlink(RelFileNode rnode) { int i; Oid reldbid; diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index d2a940a76e5..01a7877e80a 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -11,13 +11,16 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.43 2000/11/08 22:10:00 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "storage/bufmgr.h" #include "storage/smgr.h" +#include "utils/memutils.h" + static void smgrshutdown(void); @@ -26,7 +29,7 @@ typedef struct f_smgr int (*smgr_init) (void); /* may be NULL */ int (*smgr_shutdown) (void); /* may be NULL */ int (*smgr_create) (Relation reln); - int (*smgr_unlink) (Relation reln); + int (*smgr_unlink) (RelFileNode rnode); int (*smgr_extend) (Relation reln, char *buffer); int (*smgr_open) (Relation reln); int (*smgr_close) (Relation reln); @@ -60,10 +63,11 @@ static f_smgr smgrsw[] = { {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose, mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty, #ifdef XLOG - mdnblocks, mdtruncate, mdcommit, mdabort, mdsync}, + mdnblocks, mdtruncate, mdcommit, mdabort, mdsync #else - mdnblocks, mdtruncate, mdcommit, mdabort}, + mdnblocks, mdtruncate, mdcommit, mdabort #endif + }, #ifdef STABLE_MEMORY_STORAGE /* main memory */ @@ -94,6 +98,31 @@ static bool smgrwo[] = { static int NSmgr = lengthof(smgrsw); /* + * We keep a list of all relations (represented as RelFileNode values) + * that have been created or deleted in the current transaction. When + * a relation is created, we create the physical file immediately, but + * remember it so that we can delete the file again if the current + * transaction is aborted. Conversely, a deletion request is NOT + * executed immediately, but is just entered in the list. When and if + * the transaction commits, we can delete the physical file. + * + * NOTE: the list is kept in TopMemoryContext to be sure it won't disappear + * unbetimes. It'd probably be OK to keep it in TopTransactionContext, + * but I'm being paranoid. + */ + +typedef struct PendingRelDelete +{ + RelFileNode relnode; /* relation that may need to be deleted */ + int16 which; /* which storage manager? */ + bool atCommit; /* T=delete at commit; F=delete at abort */ + struct PendingRelDelete *next; /* linked-list link */ +} PendingRelDelete; + +static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */ + + +/* * smgrinit(), smgrshutdown() -- Initialize or shut down all storage * managers. * @@ -147,27 +176,58 @@ int smgrcreate(int16 which, Relation reln) { int fd; + PendingRelDelete *pending; if ((fd = (*(smgrsw[which].smgr_create)) (reln)) < 0) elog(ERROR, "cannot create %s: %m", RelationGetRelationName(reln)); + /* Add the relation to the list of stuff to delete at abort */ + pending = (PendingRelDelete *) + MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete)); + pending->relnode = reln->rd_node; + pending->which = which; + pending->atCommit = false; /* delete if abort */ + pending->next = pendingDeletes; + pendingDeletes = pending; + return fd; } /* * smgrunlink() -- Unlink a relation. * - * The relation is removed from the store. + * The relation is removed from the store. Actually, we just remember + * that we want to do this at transaction commit. */ int smgrunlink(int16 which, Relation reln) { - int status; - - if ((status = (*(smgrsw[which].smgr_unlink)) (reln)) == SM_FAIL) - elog(ERROR, "cannot unlink %s: %m", RelationGetRelationName(reln)); + PendingRelDelete *pending; + + /* Make sure the file is closed */ + if (reln->rd_fd >= 0) + smgrclose(which, reln); + + /* Add the relation to the list of stuff to delete at commit */ + pending = (PendingRelDelete *) + MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete)); + pending->relnode = reln->rd_node; + pending->which = which; + pending->atCommit = true; /* delete if commit */ + pending->next = pendingDeletes; + pendingDeletes = pending; + + /* + * NOTE: if the relation was created in this transaction, it will now + * be present in the pending-delete list twice, once with atCommit true + * and once with atCommit false. Hence, it will be physically deleted + * at end of xact in either case (and the other entry will be ignored + * by smgrDoPendingDeletes, so no error will occur). We could instead + * remove the existing list entry and delete the physical file + * immediately, but for now I'll keep the logic simple. + */ - return status; + return SM_SUCCESS; } /* @@ -193,17 +253,18 @@ smgrextend(int16 which, Relation reln, char *buffer) /* * smgropen() -- Open a relation using a particular storage manager. * - * Returns the fd for the open relation on success, aborts the - * transaction on failure. + * Returns the fd for the open relation on success. + * + * On failure, returns -1 if failOK, else aborts the transaction. */ int -smgropen(int16 which, Relation reln) +smgropen(int16 which, Relation reln, bool failOK) { int fd; - if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0 && - !reln->rd_unlinked) - elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln)); + if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0) + if (! failOK) + elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln)); return fd; } @@ -211,12 +272,6 @@ smgropen(int16 which, Relation reln) /* * smgrclose() -- Close a relation. * - * NOTE: underlying manager should allow case where relation is - * already closed. Indeed relation may have been unlinked! - * This is currently called only from RelationFlushRelation() when - * the relation cache entry is about to be dropped; could be doing - * simple relation cache clear, or finishing up DROP TABLE. - * * Returns SM_SUCCESS on success, aborts on failure. */ int @@ -412,6 +467,41 @@ smgrtruncate(int16 which, Relation reln, int nblocks) } /* + * smgrDoPendingDeletes() -- take care of relation deletes at end of xact. + */ +int +smgrDoPendingDeletes(bool isCommit) +{ + while (pendingDeletes != NULL) + { + PendingRelDelete *pending = pendingDeletes; + + pendingDeletes = pending->next; + if (pending->atCommit == isCommit) + { + /* + * Get rid of any leftover buffers for the rel (shouldn't be + * any in the commit case, but there can be in the abort case). + */ + DropRelFileNodeBuffers(pending->relnode); + /* + * And delete the physical files. + * + * Note: we treat deletion failure as a NOTICE, not an error, + * because we've already decided to commit or abort the current + * xact. + */ + if ((*(smgrsw[pending->which].smgr_unlink)) (pending->relnode) == SM_FAIL) + elog(NOTICE, "cannot unlink %u/%u: %m", + pending->relnode.tblNode, pending->relnode.relNode); + } + pfree(pending); + } + + return SM_SUCCESS; +} + +/* * smgrcommit(), smgrabort() -- Commit or abort changes made during the * current transaction. */ |