aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/large_object/inv_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/large_object/inv_api.c')
-rw-r--r--src/backend/storage/large_object/inv_api.c131
1 files changed, 85 insertions, 46 deletions
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index 299e0756d1f..38734a5f9c3 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.110 2005/04/14 20:03:25 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.111 2005/06/13 02:26:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -114,6 +114,42 @@ close_lo_relation(bool isCommit)
}
+/*
+ * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
+ * read with can be specified.
+ */
+static bool
+myLargeObjectExists(Oid loid, Snapshot snapshot)
+{
+ bool retval = false;
+ Relation pg_largeobject;
+ ScanKeyData skey[1];
+ SysScanDesc sd;
+
+ /*
+ * See if we can find any tuples belonging to the specified LO
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_largeobject_loid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+
+ sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+ snapshot, 1, skey);
+
+ if (systable_getnext(sd) != NULL)
+ retval = true;
+
+ systable_endscan(sd);
+
+ heap_close(pg_largeobject, AccessShareLock);
+
+ return retval;
+}
+
+
static int32
getbytealen(bytea *data)
{
@@ -125,58 +161,44 @@ getbytealen(bytea *data)
/*
- * inv_create -- create a new large object.
+ * inv_create -- create a new large object
*
- * Arguments:
- * flags
+ * Arguments:
+ * lobjId - OID to use for new large object, or InvalidOid to pick one
*
- * Returns:
- * large object descriptor, appropriately filled in.
+ * Returns:
+ * OID of new object
+ *
+ * If lobjId is not InvalidOid, then an error occurs if the OID is already
+ * in use.
*/
-LargeObjectDesc *
-inv_create(int flags)
+Oid
+inv_create(Oid lobjId)
{
- Oid file_oid;
- LargeObjectDesc *retval;
-
/*
- * Allocate an OID to be the LO's identifier.
+ * Allocate an OID to be the LO's identifier, unless we were told
+ * what to use. In event of collision with an existing ID, loop
+ * to find a free one.
*/
- file_oid = newoid();
-
- /* Check for duplicate (shouldn't happen) */
- if (LargeObjectExists(file_oid))
- elog(ERROR, "large object %u already exists", file_oid);
+ if (!OidIsValid(lobjId))
+ {
+ do {
+ lobjId = newoid();
+ } while (LargeObjectExists(lobjId));
+ }
/*
* Create the LO by writing an empty first page for it in
- * pg_largeobject
+ * pg_largeobject (will fail if duplicate)
*/
- LargeObjectCreate(file_oid);
+ LargeObjectCreate(lobjId);
/*
- * Advance command counter so that new tuple will be seen by later
- * large-object operations in this transaction.
+ * Advance command counter to make new tuple visible to later operations.
*/
CommandCounterIncrement();
- /*
- * Prepare LargeObjectDesc data structure for accessing LO
- */
- retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
-
- retval->id = file_oid;
- retval->subid = GetCurrentSubTransactionId();
- retval->offset = 0;
-
- if (flags & INV_WRITE)
- retval->flags = IFS_WRLOCK | IFS_RDLOCK;
- else if (flags & INV_READ)
- retval->flags = IFS_RDLOCK;
- else
- elog(ERROR, "invalid flags: %d", flags);
-
- return retval;
+ return lobjId;
}
/*
@@ -190,11 +212,6 @@ inv_open(Oid lobjId, int flags)
{
LargeObjectDesc *retval;
- if (!LargeObjectExists(lobjId))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("large object %u does not exist", lobjId)));
-
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
retval->id = lobjId;
@@ -202,12 +219,25 @@ inv_open(Oid lobjId, int flags)
retval->offset = 0;
if (flags & INV_WRITE)
+ {
+ retval->snapshot = SnapshotNow;
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
+ }
else if (flags & INV_READ)
+ {
+ /* be sure to copy snap into fscxt */
+ retval->snapshot = CopySnapshot(ActiveSnapshot);
retval->flags = IFS_RDLOCK;
+ }
else
elog(ERROR, "invalid flags: %d", flags);
+ /* Can't use LargeObjectExists here because it always uses SnapshotNow */
+ if (!myLargeObjectExists(lobjId, retval->snapshot))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist", lobjId)));
+
return retval;
}
@@ -218,6 +248,8 @@ void
inv_close(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
+ if (obj_desc->snapshot != SnapshotNow)
+ FreeSnapshot(obj_desc->snapshot);
pfree(obj_desc);
}
@@ -268,7 +300,7 @@ inv_getsize(LargeObjectDesc *obj_desc)
ObjectIdGetDatum(obj_desc->id));
sd = index_beginscan(lo_heap_r, lo_index_r,
- SnapshotNow, 1, skey);
+ obj_desc->snapshot, 1, skey);
/*
* Because the pg_largeobject index is on both loid and pageno, but we
@@ -379,7 +411,7 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Int32GetDatum(pageno));
sd = index_beginscan(lo_heap_r, lo_index_r,
- SnapshotNow, 2, skey);
+ obj_desc->snapshot, 2, skey);
while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
{
@@ -470,6 +502,13 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
+ /* enforce writability because snapshot is probably wrong otherwise */
+ if ((obj_desc->flags & IFS_WRLOCK) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("large object %u was not opened for writing",
+ obj_desc->id)));
+
if (nbytes <= 0)
return 0;
@@ -488,7 +527,7 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Int32GetDatum(pageno));
sd = index_beginscan(lo_heap_r, lo_index_r,
- SnapshotNow, 2, skey);
+ obj_desc->snapshot, 2, skey);
oldtuple = NULL;
olddata = NULL;