aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/buffer/bufmgr.c200
-rw-r--r--src/backend/storage/buffer/localbuf.c17
-rw-r--r--src/backend/storage/buffer/xlog_bufmgr.c142
-rw-r--r--src/backend/storage/lmgr/lock.c177
-rw-r--r--src/backend/storage/smgr/md.c162
-rw-r--r--src/backend/storage/smgr/mm.c6
-rw-r--r--src/backend/storage/smgr/smgr.c134
7 files changed, 556 insertions, 282 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 9c9bda5035c..8d40e8d952f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.93 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -709,23 +709,28 @@ refcount = %ld, file: %s, line: %d\n",
#endif
/*
- * FlushBuffer -- like WriteBuffer, but force the page to disk.
+ * FlushBuffer -- like WriteBuffer, but write the page immediately,
+ * rather than just marking it dirty. On success return, the buffer will
+ * no longer be dirty.
*
* 'buffer' is known to be dirty/pinned, so there should not be a
* problem reading the BufferDesc members without the BufMgrLock
* (nobody should be able to change tags out from under us).
*
- * Unpin if 'release' is TRUE.
+ * If 'sync' is true, a synchronous write is wanted (wait for buffer to hit
+ * the disk). Otherwise it's sufficient to issue the kernel write call.
+ *
+ * Unpin buffer if 'release' is true.
*/
int
-FlushBuffer(Buffer buffer, bool release)
+FlushBuffer(Buffer buffer, bool sync, bool release)
{
BufferDesc *bufHdr;
Relation bufrel;
int status;
if (BufferIsLocal(buffer))
- return FlushLocalBuffer(buffer, release) ? STATUS_OK : STATUS_ERROR;
+ return FlushLocalBuffer(buffer, sync, release) ? STATUS_OK : STATUS_ERROR;
if (BAD_BUFFER_ID(buffer))
return STATUS_ERROR;
@@ -755,12 +760,16 @@ FlushBuffer(Buffer buffer, bool release)
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
- status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
- (char *) MAKE_PTR(bufHdr->data));
+ if (sync)
+ status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+ (char *) MAKE_PTR(bufHdr->data));
+ else
+ status = smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+ (char *) MAKE_PTR(bufHdr->data));
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
- /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+ /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
RelationDecrementReferenceCount(bufrel);
if (status == SM_FAIL)
@@ -926,7 +935,7 @@ SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
/*
* drop relcache refcnt incremented by
- * RelationIdCacheGetRelation
+ * RelationNodeCacheGetRelation
*/
RelationDecrementReferenceCount(reln);
}
@@ -1123,7 +1132,7 @@ BufferSync()
bufHdr->flags &= ~BM_DIRTY;
}
- /* drop refcnt obtained by RelationIdCacheGetRelation */
+ /* drop refcnt obtained by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
}
@@ -1154,7 +1163,7 @@ BufferSync()
/*
* drop relcache refcnt incremented by
- * RelationIdCacheGetRelation
+ * RelationNodeCacheGetRelation
*/
RelationDecrementReferenceCount(reln);
@@ -1458,7 +1467,7 @@ BufferReplace(BufferDesc *bufHdr)
SpinAcquire(BufMgrLock);
- /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+ /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
@@ -1495,21 +1504,23 @@ RelationGetNumberOfBlocks(Relation relation)
}
/* ---------------------------------------------------------------------
- * ReleaseRelationBuffers
+ * DropRelationBuffers
*
* This function removes all the buffered pages for a relation
* from the buffer pool. Dirty pages are simply dropped, without
- * bothering to write them out first. This is used when the
- * relation is about to be deleted. We assume that the caller
- * holds an exclusive lock on the relation, which should assure
- * that no new buffers will be acquired for the rel meanwhile.
+ * bothering to write them out first. This is NOT rollback-able,
+ * and so should be used only with extreme caution!
+ *
+ * We assume that the caller holds an exclusive lock on the relation,
+ * which should assure that no new buffers will be acquired for the rel
+ * meanwhile.
*
* XXX currently it sequentially searches the buffer pool, should be
* changed to more clever ways of searching.
* --------------------------------------------------------------------
*/
void
-ReleaseRelationBuffers(Relation rel)
+DropRelationBuffers(Relation rel)
{
int i;
BufferDesc *bufHdr;
@@ -1589,7 +1600,104 @@ recheck:
* this rel, since we hold exclusive lock on this rel.
*/
if (RelFileNodeEquals(rel->rd_node,
- BufferTagLastDirtied[i - 1].rnode))
+ BufferTagLastDirtied[i - 1].rnode))
+ BufferDirtiedByMe[i - 1] = false;
+ }
+
+ SpinRelease(BufMgrLock);
+}
+
+/* ---------------------------------------------------------------------
+ * DropRelFileNodeBuffers
+ *
+ * This is the same as DropRelationBuffers, except that the target
+ * relation is specified by RelFileNode.
+ *
+ * This is NOT rollback-able. One legitimate use is to clear the
+ * buffer cache of buffers for a relation that is being deleted
+ * during transaction abort.
+ * --------------------------------------------------------------------
+ */
+void
+DropRelFileNodeBuffers(RelFileNode rnode)
+{
+ int i;
+ BufferDesc *bufHdr;
+
+ /* We have to search both local and shared buffers... */
+
+ for (i = 0; i < NLocBuffer; i++)
+ {
+ bufHdr = &LocalBufferDescriptors[i];
+ if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
+ {
+ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ LocalRefCount[i] = 0;
+ bufHdr->tag.rnode.relNode = InvalidOid;
+ }
+ }
+
+ SpinAcquire(BufMgrLock);
+ for (i = 1; i <= NBuffers; i++)
+ {
+ bufHdr = &BufferDescriptors[i - 1];
+recheck:
+ if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
+ {
+
+ /*
+ * If there is I/O in progress, better wait till it's done;
+ * don't want to delete the relation out from under someone
+ * who's just trying to flush the buffer!
+ */
+ if (bufHdr->flags & BM_IO_IN_PROGRESS)
+ {
+ WaitIO(bufHdr, BufMgrLock);
+
+ /*
+ * By now, the buffer very possibly belongs to some other
+ * rel, so check again before proceeding.
+ */
+ goto recheck;
+ }
+ /* Now we can do what we came for */
+ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+
+ /*
+ * Release any refcount we may have.
+ *
+ * This is very probably dead code, and if it isn't then it's
+ * probably wrong. I added the Assert to find out --- tgl
+ * 11/99.
+ */
+ if (!(bufHdr->flags & BM_FREE))
+ {
+ /* Assert checks that buffer will actually get freed! */
+ Assert(PrivateRefCount[i - 1] == 1 &&
+ bufHdr->refcount == 1);
+ /* ReleaseBuffer expects we do not hold the lock at entry */
+ SpinRelease(BufMgrLock);
+ ReleaseBuffer(i);
+ SpinAcquire(BufMgrLock);
+ }
+ /*
+ * And mark the buffer as no longer occupied by this rel.
+ */
+ BufTableDelete(bufHdr);
+ }
+
+ /*
+ * Also check to see if BufferDirtiedByMe info for this buffer
+ * refers to the target relation, and clear it if so. This is
+ * independent of whether the current contents of the buffer
+ * belong to the target relation!
+ *
+ * NOTE: we have no way to clear BufferDirtiedByMe info in other
+ * backends, but hopefully there are none with that bit set for
+ * this rel, since we hold exclusive lock on this rel.
+ */
+ if (RelFileNodeEquals(rnode,
+ BufferTagLastDirtied[i - 1].rnode))
BufferDirtiedByMe[i - 1] = false;
}
@@ -1604,7 +1712,7 @@ recheck:
* bothering to write them out first. This is used when we destroy a
* database, to avoid trying to flush data to disk when the directory
* tree no longer exists. Implementation is pretty similar to
- * ReleaseRelationBuffers() which is for destroying just one relation.
+ * DropRelationBuffers() which is for destroying just one relation.
* --------------------------------------------------------------------
*/
void
@@ -1757,33 +1865,32 @@ BufferPoolBlowaway()
/* ---------------------------------------------------------------------
* FlushRelationBuffers
*
- * This function flushes all dirty pages of a relation out to disk.
+ * This function writes all dirty pages of a relation out to disk.
* Furthermore, pages that have blocknumber >= firstDelBlock are
* actually removed from the buffer pool. An error code is returned
* if we fail to dump a dirty buffer or if we find one of
* the target pages is pinned into the cache.
*
- * This is used by VACUUM before truncating the relation to the given
- * number of blocks. (TRUNCATE TABLE also uses it in the same way.)
- * It might seem unnecessary to flush dirty pages before firstDelBlock,
- * since VACUUM should already have committed its changes. However,
- * it is possible for there still to be dirty pages: if some page
- * had unwritten on-row tuple status updates from a prior transaction,
- * and VACUUM had no additional changes to make to that page, then
- * VACUUM won't have written it. This is harmless in most cases but
- * will break pg_upgrade, which relies on VACUUM to ensure that *all*
- * tuples have correct on-row status. So, we check and flush all
- * dirty pages of the rel regardless of block number.
- *
- * This is also used by RENAME TABLE (with firstDelBlock = 0)
- * to clear out the buffer cache before renaming the physical files of
- * a relation. Without that, some other backend might try to do a
- * blind write of a buffer page (relying on the BlindId of the buffer)
- * and fail because it's not got the right filename anymore.
+ * This is called by DROP TABLE to clear buffers for the relation
+ * from the buffer pool. Note that we must write dirty buffers,
+ * rather than just dropping the changes, because our transaction
+ * might abort later on; we want to roll back safely in that case.
+ *
+ * This is also called by VACUUM before truncating the relation to the
+ * given number of blocks. It might seem unnecessary for VACUUM to
+ * write dirty pages before firstDelBlock, since VACUUM should already
+ * have committed its changes. However, it is possible for there still
+ * to be dirty pages: if some page had unwritten on-row tuple status
+ * updates from a prior transaction, and VACUUM had no additional
+ * changes to make to that page, then VACUUM won't have written it.
+ * This is harmless in most cases but will break pg_upgrade, which
+ * relies on VACUUM to ensure that *all* tuples have correct on-row
+ * status. So, we check and flush all dirty pages of the rel
+ * regardless of block number.
*
* In all cases, the caller should be holding AccessExclusiveLock on
* the target relation to ensure that no other backend is busy reading
- * more blocks of the relation.
+ * more blocks of the relation (or might do so before we commit).
*
* Formerly, we considered it an error condition if we found dirty
* buffers here. However, since BufferSync no longer forces out all
@@ -1812,7 +1919,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
{
if (bufHdr->flags & BM_DIRTY)
{
- if (FlushBuffer(-i - 1, false) != STATUS_OK)
+ if (FlushBuffer(-i - 1, false, false) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
RelationGetRelationName(rel), firstDelBlock,
@@ -1840,15 +1947,17 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
for (i = 0; i < NBuffers; i++)
{
bufHdr = &BufferDescriptors[i];
-recheck:
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
{
if (bufHdr->flags & BM_DIRTY)
{
PinBuffer(bufHdr);
SpinRelease(BufMgrLock);
- if (FlushBuffer(i + 1, true) != STATUS_OK)
+ if (FlushBuffer(i + 1, false, false) != STATUS_OK)
{
+ SpinAcquire(BufMgrLock);
+ UnpinBuffer(bufHdr);
+ SpinRelease(BufMgrLock);
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
RelationGetRelationName(rel), firstDelBlock,
bufHdr->tag.blockNum,
@@ -1856,12 +1965,7 @@ recheck:
return -1;
}
SpinAcquire(BufMgrLock);
-
- /*
- * Buffer could already be reassigned, so must recheck
- * whether it still belongs to rel before freeing it!
- */
- goto recheck;
+ UnpinBuffer(bufHdr);
}
if (!(bufHdr->flags & BM_FREE))
{
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index faa3304b4f6..352f519bdc0 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -16,7 +16,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.34 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -183,7 +183,7 @@ WriteLocalBuffer(Buffer buffer, bool release)
* flushes a local buffer
*/
int
-FlushLocalBuffer(Buffer buffer, bool release)
+FlushLocalBuffer(Buffer buffer, bool sync, bool release)
{
int bufid;
Relation bufrel;
@@ -199,13 +199,18 @@ FlushLocalBuffer(Buffer buffer, bool release)
bufHdr = &LocalBufferDescriptors[bufid];
bufHdr->flags &= ~BM_DIRTY;
bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
-
Assert(bufrel != NULL);
- smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
- (char *) MAKE_PTR(bufHdr->data));
+
+ if (sync)
+ smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+ (char *) MAKE_PTR(bufHdr->data));
+ else
+ smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+ (char *) MAKE_PTR(bufHdr->data));
+
LocalBufferFlushCount++;
- /* drop relcache refcount incremented by RelationIdCacheGetRelation */
+ /* drop relcache refcount incremented by RelationNodeCacheGetRelation */
RelationDecrementReferenceCount(bufrel);
if (release)
diff --git a/src/backend/storage/buffer/xlog_bufmgr.c b/src/backend/storage/buffer/xlog_bufmgr.c
index dcd377b7eb3..15c4321405e 100644
--- a/src/backend/storage/buffer/xlog_bufmgr.c
+++ b/src/backend/storage/buffer/xlog_bufmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.2 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -838,7 +838,7 @@ BufferSync()
SpinRelease(BufMgrLock);
- /* drop refcnt obtained by RelationIdCacheGetRelation */
+ /* drop refcnt obtained by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
{
RelationDecrementReferenceCount(reln);
@@ -1128,7 +1128,7 @@ BufferReplace(BufferDesc *bufHdr)
false); /* no fsync */
}
- /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+ /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
@@ -1159,21 +1159,23 @@ RelationGetNumberOfBlocks(Relation relation)
}
/* ---------------------------------------------------------------------
- * ReleaseRelationBuffers
+ * DropRelationBuffers
*
* This function removes all the buffered pages for a relation
* from the buffer pool. Dirty pages are simply dropped, without
- * bothering to write them out first. This is used when the
- * relation is about to be deleted. We assume that the caller
- * holds an exclusive lock on the relation, which should assure
- * that no new buffers will be acquired for the rel meanwhile.
+ * bothering to write them out first. This is NOT rollback-able,
+ * and so should be used only with extreme caution!
+ *
+ * We assume that the caller holds an exclusive lock on the relation,
+ * which should assure that no new buffers will be acquired for the rel
+ * meanwhile.
*
* XXX currently it sequentially searches the buffer pool, should be
* changed to more clever ways of searching.
* --------------------------------------------------------------------
*/
void
-ReleaseRelationBuffers(Relation rel)
+DropRelationBuffers(Relation rel)
{
int i;
BufferDesc *bufHdr;
@@ -1249,6 +1251,91 @@ recheck:
}
/* ---------------------------------------------------------------------
+ * DropRelFileNodeBuffers
+ *
+ * This is the same as DropRelationBuffers, except that the target
+ * relation is specified by RelFileNode.
+ *
+ * This is NOT rollback-able. One legitimate use is to clear the
+ * buffer cache of buffers for a relation that is being deleted
+ * during transaction abort.
+ * --------------------------------------------------------------------
+ */
+void
+DropRelFileNodeBuffers(RelFileNode rnode)
+{
+ int i;
+ BufferDesc *bufHdr;
+
+ /* We have to search both local and shared buffers... */
+
+ for (i = 0; i < NLocBuffer; i++)
+ {
+ bufHdr = &LocalBufferDescriptors[i];
+ if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
+ {
+ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ bufHdr->cntxDirty = false;
+ LocalRefCount[i] = 0;
+ bufHdr->tag.rnode.relNode = InvalidOid;
+ }
+ }
+
+ SpinAcquire(BufMgrLock);
+ for (i = 1; i <= NBuffers; i++)
+ {
+ bufHdr = &BufferDescriptors[i - 1];
+recheck:
+ if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
+ {
+
+ /*
+ * If there is I/O in progress, better wait till it's done;
+ * don't want to delete the relation out from under someone
+ * who's just trying to flush the buffer!
+ */
+ if (bufHdr->flags & BM_IO_IN_PROGRESS)
+ {
+ WaitIO(bufHdr, BufMgrLock);
+
+ /*
+ * By now, the buffer very possibly belongs to some other
+ * rel, so check again before proceeding.
+ */
+ goto recheck;
+ }
+ /* Now we can do what we came for */
+ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ bufHdr->cntxDirty = false;
+
+ /*
+ * Release any refcount we may have.
+ *
+ * This is very probably dead code, and if it isn't then it's
+ * probably wrong. I added the Assert to find out --- tgl
+ * 11/99.
+ */
+ if (!(bufHdr->flags & BM_FREE))
+ {
+ /* Assert checks that buffer will actually get freed! */
+ Assert(PrivateRefCount[i - 1] == 1 &&
+ bufHdr->refcount == 1);
+ /* ReleaseBuffer expects we do not hold the lock at entry */
+ SpinRelease(BufMgrLock);
+ ReleaseBuffer(i);
+ SpinAcquire(BufMgrLock);
+ }
+ /*
+ * And mark the buffer as no longer occupied by this rel.
+ */
+ BufTableDelete(bufHdr);
+ }
+ }
+
+ SpinRelease(BufMgrLock);
+}
+
+/* ---------------------------------------------------------------------
* DropBuffers
*
* This function removes all the buffers in the buffer cache for a
@@ -1256,7 +1343,7 @@ recheck:
* bothering to write them out first. This is used when we destroy a
* database, to avoid trying to flush data to disk when the directory
* tree no longer exists. Implementation is pretty similar to
- * ReleaseRelationBuffers() which is for destroying just one relation.
+ * DropRelationBuffers() which is for destroying just one relation.
* --------------------------------------------------------------------
*/
void
@@ -1399,33 +1486,32 @@ BufferPoolBlowaway()
/* ---------------------------------------------------------------------
* FlushRelationBuffers
*
- * This function flushes all dirty pages of a relation out to disk.
+ * This function writes all dirty pages of a relation out to disk.
* Furthermore, pages that have blocknumber >= firstDelBlock are
* actually removed from the buffer pool. An error code is returned
* if we fail to dump a dirty buffer or if we find one of
* the target pages is pinned into the cache.
*
- * This is used by VACUUM before truncating the relation to the given
- * number of blocks. (TRUNCATE TABLE also uses it in the same way.)
- * It might seem unnecessary to flush dirty pages before firstDelBlock,
- * since VACUUM should already have committed its changes. However,
- * it is possible for there still to be dirty pages: if some page
- * had unwritten on-row tuple status updates from a prior transaction,
- * and VACUUM had no additional changes to make to that page, then
- * VACUUM won't have written it. This is harmless in most cases but
- * will break pg_upgrade, which relies on VACUUM to ensure that *all*
- * tuples have correct on-row status. So, we check and flush all
- * dirty pages of the rel regardless of block number.
+ * This is called by DROP TABLE to clear buffers for the relation
+ * from the buffer pool. Note that we must write dirty buffers,
+ * rather than just dropping the changes, because our transaction
+ * might abort later on; we want to roll back safely in that case.
*
- * This is also used by RENAME TABLE (with firstDelBlock = 0)
- * to clear out the buffer cache before renaming the physical files of
- * a relation. Without that, some other backend might try to do a
- * blind write of a buffer page (relying on the BlindId of the buffer)
- * and fail because it's not got the right filename anymore.
+ * This is also called by VACUUM before truncating the relation to the
+ * given number of blocks. It might seem unnecessary for VACUUM to
+ * write dirty pages before firstDelBlock, since VACUUM should already
+ * have committed its changes. However, it is possible for there still
+ * to be dirty pages: if some page had unwritten on-row tuple status
+ * updates from a prior transaction, and VACUUM had no additional
+ * changes to make to that page, then VACUUM won't have written it.
+ * This is harmless in most cases but will break pg_upgrade, which
+ * relies on VACUUM to ensure that *all* tuples have correct on-row
+ * status. So, we check and flush all dirty pages of the rel
+ * regardless of block number.
*
* In all cases, the caller should be holding AccessExclusiveLock on
* the target relation to ensure that no other backend is busy reading
- * more blocks of the relation.
+ * more blocks of the relation (or might do so before we commit).
*
* Formerly, we considered it an error condition if we found dirty
* buffers here. However, since BufferSync no longer forces out all
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 23a2dcf1e24..14325e53183 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.71 2000/07/17 03:05:08 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.72 2000/11/08 22:10:00 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -453,7 +453,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
bool
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
{
- XIDLookupEnt *result,
+ XIDLookupEnt *xident,
item;
HTAB *xidTable;
bool found;
@@ -559,9 +559,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
/*
* Find or create an xid entry with this tag
*/
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
- if (!result)
+ if (!xident)
{
SpinRelease(masterLock);
elog(NOTICE, "LockAcquire: xid table corrupted");
@@ -573,16 +573,41 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (!found)
{
- result->nHolding = 0;
- MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
- ProcAddLock(&result->queue);
- XID_PRINT("LockAcquire: new", result);
+ xident->nHolding = 0;
+ MemSet((char *) xident->holders, 0, sizeof(int) * MAX_LOCKMODES);
+ ProcAddLock(&xident->queue);
+ XID_PRINT("LockAcquire: new", xident);
}
else
{
- XID_PRINT("LockAcquire: found", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
- Assert(result->nHolding <= lock->nActive);
+ int i;
+
+ XID_PRINT("LockAcquire: found", xident);
+ Assert((xident->nHolding > 0) && (xident->holders[lockmode] >= 0));
+ Assert(xident->nHolding <= lock->nActive);
+ /*
+ * Issue warning if we already hold a lower-level lock on this
+ * object and do not hold a lock of the requested level or higher.
+ * This indicates a deadlock-prone coding practice (eg, we'd have
+ * a deadlock if another backend were following the same code path
+ * at about the same time).
+ *
+ * XXX Doing numeric comparison on the lockmodes is a hack;
+ * it'd be better to use a table. For now, though, this works.
+ */
+ for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
+ {
+ if (xident->holders[i] > 0)
+ {
+ if (i >= (int) lockmode)
+ break; /* safe: we have a lock >= req level */
+ elog(DEBUG, "Deadlock risk: raising lock level"
+ " from %s to %s on object %u/%u/%u",
+ lock_types[i], lock_types[lockmode],
+ lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
+ break;
+ }
+ }
}
/* ----------------
@@ -601,12 +626,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* hold this lock.
* --------------------
*/
- if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
+ if (xident->nHolding == lock->nActive || xident->holders[lockmode] != 0)
{
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockAcquire: owning", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
+ xident->holders[lockmode]++;
+ xident->nHolding++;
+ XID_PRINT("LockAcquire: owning", xident);
+ Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
GrantLock(lock, lockmode);
SpinRelease(masterLock);
return TRUE;
@@ -623,27 +648,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* If I don't hold locks or my locks don't conflict with waiters
* then force to sleep.
*/
- if (result->nHolding > 0)
+ if (xident->nHolding > 0)
{
for (; i <= lockMethodTable->ctl->numLockModes; i++)
{
- if (result->holders[i] > 0 &&
+ if (xident->holders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* conflict */
}
}
- if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
+ if (xident->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
{
XID_PRINT("LockAcquire: higher priority proc waiting",
- result);
+ xident);
status = STATUS_FOUND;
}
else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+ status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
}
else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+ status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
if (status == STATUS_OK)
GrantLock(lock, lockmode);
@@ -657,17 +682,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (lockmethod == USER_LOCKMETHOD)
{
- if (!result->nHolding)
+ if (!xident->nHolding)
{
- SHMQueueDelete(&result->queue);
- result = (XIDLookupEnt *) hash_search(xidTable,
- (Pointer) result,
+ SHMQueueDelete(&xident->queue);
+ xident = (XIDLookupEnt *) hash_search(xidTable,
+ (Pointer) xident,
HASH_REMOVE, &found);
- if (!result || !found)
+ if (!xident || !found)
elog(NOTICE, "LockAcquire: remove xid, table corrupted");
}
else
- XID_PRINT("LockAcquire: NHOLDING", result);
+ XID_PRINT("LockAcquire: NHOLDING", xident);
lock->nHolding--;
lock->holders[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
@@ -682,7 +707,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Construct bitmask of locks we hold before going to sleep.
*/
MyProc->holdLock = 0;
- if (result->nHolding > 0)
+ if (xident->nHolding > 0)
{
int i,
tmpMask = 2;
@@ -690,7 +715,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
for (i = 1; i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1)
{
- if (result->holders[i] > 0)
+ if (xident->holders[i] > 0)
MyProc->holdLock |= tmpMask;
}
Assert(MyProc->holdLock != 0);
@@ -702,15 +727,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Check the xid entry status, in case something in the ipc
* communication doesn't work correctly.
*/
- if (!((result->nHolding > 0) && (result->holders[lockmode] > 0)))
+ if (!((xident->nHolding > 0) && (xident->holders[lockmode] > 0)))
{
- XID_PRINT("LockAcquire: INCONSISTENT", result);
+ XID_PRINT("LockAcquire: INCONSISTENT", xident);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
SpinRelease(masterLock);
return FALSE;
}
- XID_PRINT("LockAcquire: granted", result);
+ XID_PRINT("LockAcquire: granted", xident);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
@@ -738,7 +763,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
TransactionId xid,
XIDLookupEnt *xidentP) /* xident ptr or NULL */
{
- XIDLookupEnt *result,
+ XIDLookupEnt *xident,
item;
int *myHolders;
int numLockModes;
@@ -758,7 +783,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* A pointer to the xid entry was supplied from the caller.
* Actually only LockAcquire can do it.
*/
- result = xidentP;
+ xident = xidentP;
}
else
{
@@ -788,9 +813,9 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
/*
* Find or create an xid entry with this tag
*/
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
- if (!result)
+ if (!xident)
{
elog(NOTICE, "LockResolveConflicts: xid table corrupted");
return STATUS_ERROR;
@@ -808,14 +833,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* the lock stats.
* ---------------
*/
- MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
- result->nHolding = 0;
- XID_PRINT("LockResolveConflicts: NOT FOUND", result);
+ MemSet(xident->holders, 0, numLockModes * sizeof(*(lock->holders)));
+ xident->nHolding = 0;
+ XID_PRINT("LockResolveConflicts: NOT FOUND", xident);
}
else
- XID_PRINT("LockResolveConflicts: found", result);
+ XID_PRINT("LockResolveConflicts: found", xident);
}
- Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
+ Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
/* ----------------------------
* first check for global conflicts: If no locks conflict
@@ -829,10 +854,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
{
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockResolveConflicts: no conflict", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
+ xident->holders[lockmode]++;
+ xident->nHolding++;
+ XID_PRINT("LockResolveConflicts: no conflict", xident);
+ Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
return STATUS_OK;
}
@@ -842,7 +867,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* that does not reflect our own locks.
* ------------------------
*/
- myHolders = result->holders;
+ myHolders = xident->holders;
bitmask = 0;
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -861,14 +886,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
{
/* no conflict. Get the lock and go on */
- result->holders[lockmode]++;
- result->nHolding++;
- XID_PRINT("LockResolveConflicts: resolved", result);
- Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
+ xident->holders[lockmode]++;
+ xident->nHolding++;
+ XID_PRINT("LockResolveConflicts: resolved", xident);
+ Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
return STATUS_OK;
}
- XID_PRINT("LockResolveConflicts: conflicting", result);
+ XID_PRINT("LockResolveConflicts: conflicting", xident);
return STATUS_FOUND;
}
@@ -965,7 +990,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
SPINLOCK masterLock;
bool found;
LOCKMETHODTABLE *lockMethodTable;
- XIDLookupEnt *result,
+ XIDLookupEnt *xident,
item;
HTAB *xidTable;
TransactionId xid;
@@ -1053,9 +1078,9 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Find an xid entry with this tag
*/
xidTable = lockMethodTable->xidHash;
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_FIND_SAVE, &found);
- if (!result || !found)
+ if (!xident || !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
@@ -1066,23 +1091,23 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
elog(NOTICE, "LockRelease: xid table corrupted");
return FALSE;
}
- XID_PRINT("LockRelease: found", result);
- Assert(result->tag.lock == MAKE_OFFSET(lock));
+ XID_PRINT("LockRelease: found", xident);
+ Assert(xident->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want to
* release.
*/
- if (!(result->holders[lockmode] > 0))
+ if (!(xident->holders[lockmode] > 0))
{
SpinRelease(masterLock);
- XID_PRINT("LockAcquire: WRONGTYPE", result);
+ XID_PRINT("LockAcquire: WRONGTYPE", xident);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
- Assert(result->holders[lockmode] >= 0);
+ Assert(xident->holders[lockmode] >= 0);
return FALSE;
}
- Assert(result->nHolding > 0);
+ Assert(xident->nHolding > 0);
/*
* fix the general lock stats
@@ -1147,27 +1172,27 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* now check to see if I have any private locks. If I do, decrement
* the counts associated with them.
*/
- result->holders[lockmode]--;
- result->nHolding--;
- XID_PRINT("LockRelease: updated", result);
- Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
+ xident->holders[lockmode]--;
+ xident->nHolding--;
+ XID_PRINT("LockRelease: updated", xident);
+ Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
/*
* If this was my last hold on this lock, delete my entry in the XID
* table.
*/
- if (!result->nHolding)
+ if (!xident->nHolding)
{
- if (result->queue.prev == INVALID_OFFSET)
+ if (xident->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
- if (result->queue.next == INVALID_OFFSET)
+ if (xident->queue.next == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
- if (result->queue.next != INVALID_OFFSET)
- SHMQueueDelete(&result->queue);
- XID_PRINT("LockRelease: deleting", result);
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
+ if (xident->queue.next != INVALID_OFFSET)
+ SHMQueueDelete(&xident->queue);
+ XID_PRINT("LockRelease: deleting", xident);
+ xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &xident,
HASH_REMOVE_SAVED, &found);
- if (!result || !found)
+ if (!xident || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockRelease: remove xid, table corrupted");
@@ -1196,7 +1221,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
- XIDLookupEnt *result;
+ XIDLookupEnt *xident;
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
@@ -1371,11 +1396,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
*/
XID_PRINT("LockReleaseAll: deleting", xidLook);
- result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
+ xident = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
(Pointer) xidLook,
HASH_REMOVE,
&found);
- if (!result || !found)
+ if (!xident || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: xid table corrupted");
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index da466afe9f8..c97a46ba4b4 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -8,17 +8,17 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.78 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/file.h>
-#include "postgres.h"
-
#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/smgr.h"
@@ -123,63 +123,39 @@ mdinit()
int
mdcreate(Relation reln)
{
+ char *path;
int fd,
vfd;
- char *path;
- Assert(reln->rd_unlinked && reln->rd_fd < 0);
+ Assert(reln->rd_fd < 0);
path = relpath(reln->rd_node);
- fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
- /*
- * For cataloged relations, pg_class is guaranteed to have a unique
- * record with the same relname by the unique index. So we are able to
- * reuse existent files for new cataloged relations. Currently we reuse
- * them in the following cases. 1. they are empty. 2. they are used
- * for Index relations and their size == BLCKSZ * 2.
- *
- * During bootstrap processing, we skip that check, because pg_time,
- * pg_variable, and pg_log get created before their .bki file entries
- * are processed.
- */
+ fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
if (fd < 0)
{
int save_errno = errno;
- if (!IsBootstrapProcessingMode() &&
- reln->rd_rel->relkind == RELKIND_UNCATALOGED)
- return -1;
-
- fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
+ /*
+ * During bootstrap, there are cases where a system relation will be
+ * accessed (by internal backend processes) before the bootstrap
+ * script nominally creates it. Therefore, allow the file to exist
+ * already, but in bootstrap mode only. (See also mdopen)
+ */
+ if (IsBootstrapProcessingMode())
+ fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
if (fd < 0)
{
+ pfree(path);
/* be sure to return the error reported by create, not open */
errno = save_errno;
return -1;
}
- if (!IsBootstrapProcessingMode())
- {
- bool reuse = false;
- long len = FileSeek(fd, 0L, SEEK_END);
-
- if (len == 0)
- reuse = true;
- else if (reln->rd_rel->relkind == RELKIND_INDEX &&
- len == BLCKSZ * 2)
- reuse = true;
- if (!reuse)
- {
- FileClose(fd);
- /* be sure to return the error reported by create */
- errno = save_errno;
- return -1;
- }
- }
errno = 0;
}
- reln->rd_unlinked = false;
+
+ pfree(path);
vfd = _fdvec_alloc();
if (vfd < 0)
@@ -187,12 +163,10 @@ mdcreate(Relation reln)
Md_fdvec[vfd].mdfd_vfd = fd;
Md_fdvec[vfd].mdfd_flags = (uint16) 0;
+ Md_fdvec[vfd].mdfd_lstbcnt = 0;
#ifndef LET_OS_MANAGE_FILESIZE
Md_fdvec[vfd].mdfd_chain = (MdfdVec *) NULL;
#endif
- Md_fdvec[vfd].mdfd_lstbcnt = 0;
-
- pfree(path);
return vfd;
}
@@ -201,65 +175,50 @@ mdcreate(Relation reln)
* mdunlink() -- Unlink a relation.
*/
int
-mdunlink(Relation reln)
+mdunlink(RelFileNode rnode)
{
- int nblocks;
- int fd;
- MdfdVec *v;
-
- /*
- * If the relation is already unlinked,we have nothing to do any more.
- */
- if (reln->rd_unlinked && reln->rd_fd < 0)
- return SM_SUCCESS;
-
- /*
- * Force all segments of the relation to be opened, so that we won't
- * miss deleting any of them.
- */
- nblocks = mdnblocks(reln);
+ int status = SM_SUCCESS;
+ int save_errno = 0;
+ char *path;
- /*
- * Clean out the mdfd vector, letting fd.c unlink the physical files.
- *
- * NOTE: We truncate the file(s) before deleting 'em, because if other
- * backends are holding the files open, the unlink will fail on some
- * platforms (think Microsoft). Better a zero-size file gets left
- * around than a big file. Those other backends will be forced to
- * close the relation by cache invalidation, but that probably hasn't
- * happened yet.
- */
- fd = RelationGetFile(reln);
- if (fd < 0) /* should not happen */
- elog(ERROR, "mdunlink: mdnblocks didn't open relation");
+ path = relpath(rnode);
- Md_fdvec[fd].mdfd_flags = (uint16) 0;
+ /* Delete the first segment, or only segment if not doing segmenting */
+ if (unlink(path) < 0)
+ {
+ status = SM_FAIL;
+ save_errno = errno;
+ }
#ifndef LET_OS_MANAGE_FILESIZE
- for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;)
+ /* Get the additional segments, if any */
+ if (status == SM_SUCCESS)
{
- MdfdVec *ov = v;
+ char *segpath = (char *) palloc(strlen(path) + 12);
+ int segno;
- FileTruncate(v->mdfd_vfd, 0);
- FileUnlink(v->mdfd_vfd);
- v = v->mdfd_chain;
- if (ov != &Md_fdvec[fd])
- pfree(ov);
+ for (segno = 1; ; segno++)
+ {
+ sprintf(segpath, "%s.%d", path, segno);
+ if (unlink(segpath) < 0)
+ {
+ /* ENOENT is expected after the last segment... */
+ if (errno != ENOENT)
+ {
+ status = SM_FAIL;
+ save_errno = errno;
+ }
+ break;
+ }
+ }
+ pfree(segpath);
}
- Md_fdvec[fd].mdfd_chain = (MdfdVec *) NULL;
-#else
- v = &Md_fdvec[fd];
- FileTruncate(v->mdfd_vfd, 0);
- FileUnlink(v->mdfd_vfd);
#endif
- _fdvec_free(fd);
-
- /* be sure to mark relation closed && unlinked */
- reln->rd_fd = -1;
- reln->rd_unlinked = true;
+ pfree(path);
- return SM_SUCCESS;
+ errno = save_errno;
+ return status;
}
/*
@@ -327,24 +286,29 @@ mdopen(Relation reln)
int vfd;
Assert(reln->rd_fd < 0);
+
path = relpath(reln->rd_node);
fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
+
if (fd < 0)
{
- /* in bootstrap mode, accept mdopen as substitute for mdcreate */
+ /*
+ * During bootstrap, there are cases where a system relation will be
+ * accessed (by internal backend processes) before the bootstrap
+ * script nominally creates it. Therefore, accept mdopen() as a
+ * substitute for mdcreate() in bootstrap mode only. (See mdcreate)
+ */
if (IsBootstrapProcessingMode())
fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
if (fd < 0)
{
- elog(NOTICE, "mdopen: couldn't open %s: %m", path);
- /* mark relation closed and unlinked */
- reln->rd_fd = -1;
- reln->rd_unlinked = true;
+ pfree(path);
return -1;
}
}
- reln->rd_unlinked = false;
+
+ pfree(path);
vfd = _fdvec_alloc();
if (vfd < 0)
@@ -362,8 +326,6 @@ mdopen(Relation reln)
#endif
#endif
- pfree(path);
-
return vfd;
}
diff --git a/src/backend/storage/smgr/mm.c b/src/backend/storage/smgr/mm.c
index a5b22cbcc5c..d64aeb6a418 100644
--- a/src/backend/storage/smgr/mm.c
+++ b/src/backend/storage/smgr/mm.c
@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.19 2000/04/10 23:41:51 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.20 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -204,9 +204,11 @@ mmcreate(Relation reln)
/*
* mmunlink() -- Unlink a relation.
+ *
+ * XXX currently broken: needs to accept RelFileNode, not Relation
*/
int
-mmunlink(Relation reln)
+mmunlink(RelFileNode rnode)
{
int i;
Oid reldbid;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index d2a940a76e5..01a7877e80a 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -11,13 +11,16 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.43 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "storage/bufmgr.h"
#include "storage/smgr.h"
+#include "utils/memutils.h"
+
static void smgrshutdown(void);
@@ -26,7 +29,7 @@ typedef struct f_smgr
int (*smgr_init) (void); /* may be NULL */
int (*smgr_shutdown) (void); /* may be NULL */
int (*smgr_create) (Relation reln);
- int (*smgr_unlink) (Relation reln);
+ int (*smgr_unlink) (RelFileNode rnode);
int (*smgr_extend) (Relation reln, char *buffer);
int (*smgr_open) (Relation reln);
int (*smgr_close) (Relation reln);
@@ -60,10 +63,11 @@ static f_smgr smgrsw[] = {
{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
#ifdef XLOG
- mdnblocks, mdtruncate, mdcommit, mdabort, mdsync},
+ mdnblocks, mdtruncate, mdcommit, mdabort, mdsync
#else
- mdnblocks, mdtruncate, mdcommit, mdabort},
+ mdnblocks, mdtruncate, mdcommit, mdabort
#endif
+ },
#ifdef STABLE_MEMORY_STORAGE
/* main memory */
@@ -94,6 +98,31 @@ static bool smgrwo[] = {
static int NSmgr = lengthof(smgrsw);
/*
+ * We keep a list of all relations (represented as RelFileNode values)
+ * that have been created or deleted in the current transaction. When
+ * a relation is created, we create the physical file immediately, but
+ * remember it so that we can delete the file again if the current
+ * transaction is aborted. Conversely, a deletion request is NOT
+ * executed immediately, but is just entered in the list. When and if
+ * the transaction commits, we can delete the physical file.
+ *
+ * NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
+ * unbetimes. It'd probably be OK to keep it in TopTransactionContext,
+ * but I'm being paranoid.
+ */
+
+typedef struct PendingRelDelete
+{
+ RelFileNode relnode; /* relation that may need to be deleted */
+ int16 which; /* which storage manager? */
+ bool atCommit; /* T=delete at commit; F=delete at abort */
+ struct PendingRelDelete *next; /* linked-list link */
+} PendingRelDelete;
+
+static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+
+
+/*
* smgrinit(), smgrshutdown() -- Initialize or shut down all storage
* managers.
*
@@ -147,27 +176,58 @@ int
smgrcreate(int16 which, Relation reln)
{
int fd;
+ PendingRelDelete *pending;
if ((fd = (*(smgrsw[which].smgr_create)) (reln)) < 0)
elog(ERROR, "cannot create %s: %m", RelationGetRelationName(reln));
+ /* Add the relation to the list of stuff to delete at abort */
+ pending = (PendingRelDelete *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+ pending->relnode = reln->rd_node;
+ pending->which = which;
+ pending->atCommit = false; /* delete if abort */
+ pending->next = pendingDeletes;
+ pendingDeletes = pending;
+
return fd;
}
/*
* smgrunlink() -- Unlink a relation.
*
- * The relation is removed from the store.
+ * The relation is removed from the store. Actually, we just remember
+ * that we want to do this at transaction commit.
*/
int
smgrunlink(int16 which, Relation reln)
{
- int status;
-
- if ((status = (*(smgrsw[which].smgr_unlink)) (reln)) == SM_FAIL)
- elog(ERROR, "cannot unlink %s: %m", RelationGetRelationName(reln));
+ PendingRelDelete *pending;
+
+ /* Make sure the file is closed */
+ if (reln->rd_fd >= 0)
+ smgrclose(which, reln);
+
+ /* Add the relation to the list of stuff to delete at commit */
+ pending = (PendingRelDelete *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+ pending->relnode = reln->rd_node;
+ pending->which = which;
+ pending->atCommit = true; /* delete if commit */
+ pending->next = pendingDeletes;
+ pendingDeletes = pending;
+
+ /*
+ * NOTE: if the relation was created in this transaction, it will now
+ * be present in the pending-delete list twice, once with atCommit true
+ * and once with atCommit false. Hence, it will be physically deleted
+ * at end of xact in either case (and the other entry will be ignored
+ * by smgrDoPendingDeletes, so no error will occur). We could instead
+ * remove the existing list entry and delete the physical file
+ * immediately, but for now I'll keep the logic simple.
+ */
- return status;
+ return SM_SUCCESS;
}
/*
@@ -193,17 +253,18 @@ smgrextend(int16 which, Relation reln, char *buffer)
/*
* smgropen() -- Open a relation using a particular storage manager.
*
- * Returns the fd for the open relation on success, aborts the
- * transaction on failure.
+ * Returns the fd for the open relation on success.
+ *
+ * On failure, returns -1 if failOK, else aborts the transaction.
*/
int
-smgropen(int16 which, Relation reln)
+smgropen(int16 which, Relation reln, bool failOK)
{
int fd;
- if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0 &&
- !reln->rd_unlinked)
- elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln));
+ if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0)
+ if (! failOK)
+ elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln));
return fd;
}
@@ -211,12 +272,6 @@ smgropen(int16 which, Relation reln)
/*
* smgrclose() -- Close a relation.
*
- * NOTE: underlying manager should allow case where relation is
- * already closed. Indeed relation may have been unlinked!
- * This is currently called only from RelationFlushRelation() when
- * the relation cache entry is about to be dropped; could be doing
- * simple relation cache clear, or finishing up DROP TABLE.
- *
* Returns SM_SUCCESS on success, aborts on failure.
*/
int
@@ -412,6 +467,41 @@ smgrtruncate(int16 which, Relation reln, int nblocks)
}
/*
+ * smgrDoPendingDeletes() -- take care of relation deletes at end of xact.
+ */
+int
+smgrDoPendingDeletes(bool isCommit)
+{
+ while (pendingDeletes != NULL)
+ {
+ PendingRelDelete *pending = pendingDeletes;
+
+ pendingDeletes = pending->next;
+ if (pending->atCommit == isCommit)
+ {
+ /*
+ * Get rid of any leftover buffers for the rel (shouldn't be
+ * any in the commit case, but there can be in the abort case).
+ */
+ DropRelFileNodeBuffers(pending->relnode);
+ /*
+ * And delete the physical files.
+ *
+ * Note: we treat deletion failure as a NOTICE, not an error,
+ * because we've already decided to commit or abort the current
+ * xact.
+ */
+ if ((*(smgrsw[pending->which].smgr_unlink)) (pending->relnode) == SM_FAIL)
+ elog(NOTICE, "cannot unlink %u/%u: %m",
+ pending->relnode.tblNode, pending->relnode.relNode);
+ }
+ pfree(pending);
+ }
+
+ return SM_SUCCESS;
+}
+
+/*
* smgrcommit(), smgrabort() -- Commit or abort changes made during the
* current transaction.
*/