aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorItagaki Takahiro <itagaki.takahiro@gmail.com>2009-12-11 03:34:57 +0000
committerItagaki Takahiro <itagaki.takahiro@gmail.com>2009-12-11 03:34:57 +0000
commitf1325ce213ae1843d2ee636ff6780c3f8ac9ada6 (patch)
tree2fab9db3d075fcca27a87e92a9be02263865b93a /src/backend
parent64579962bbe522bf9ced8e4ed712b9072fb89142 (diff)
downloadpostgresql-f1325ce213ae1843d2ee636ff6780c3f8ac9ada6.tar.gz
postgresql-f1325ce213ae1843d2ee636ff6780c3f8ac9ada6.zip
Add large object access control.
A new system catalog pg_largeobject_metadata manages ownership and access privileges of large objects. KaiGai Kohei, reviewed by Jaime Casanova.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/Makefile8
-rw-r--r--src/backend/catalog/aclchk.c316
-rw-r--r--src/backend/catalog/dependency.c16
-rw-r--r--src/backend/catalog/pg_largeobject.c257
-rw-r--r--src/backend/catalog/pg_shdepend.c7
-rw-r--r--src/backend/commands/alter.c7
-rw-r--r--src/backend/commands/comment.c19
-rw-r--r--src/backend/commands/tablecmds.c3
-rw-r--r--src/backend/libpq/be-fsstubs.c48
-rw-r--r--src/backend/parser/gram.y23
-rw-r--r--src/backend/storage/large_object/inv_api.c93
-rw-r--r--src/backend/tcop/utility.c5
-rw-r--r--src/backend/utils/adt/acl.c7
-rw-r--r--src/backend/utils/misc/guc.c13
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
15 files changed, 717 insertions, 106 deletions
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index ec548990b10..02a2b01b815 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -2,7 +2,7 @@
#
# Makefile for backend/catalog
#
-# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.73 2009/10/07 22:14:16 alvherre Exp $
+# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.74 2009/12/11 03:34:55 itagaki Exp $
#
#-------------------------------------------------------------------------
@@ -29,9 +29,9 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_proc.h pg_type.h pg_attribute.h pg_class.h \
pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
- pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
- pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
- pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
+ pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
+ pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \
+ pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index 8b2599a99f5..3c2fdb0cf14 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.156 2009/10/12 20:39:39 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.157 2009/12/11 03:34:55 itagaki Exp $
*
* NOTES
* See acl.h.
@@ -31,6 +31,8 @@
#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
@@ -103,6 +105,7 @@ static void ExecGrant_Fdw(InternalGrant *grantStmt);
static void ExecGrant_ForeignServer(InternalGrant *grantStmt);
static void ExecGrant_Function(InternalGrant *grantStmt);
static void ExecGrant_Language(InternalGrant *grantStmt);
+static void ExecGrant_Largeobject(InternalGrant *grantStmt);
static void ExecGrant_Namespace(InternalGrant *grantStmt);
static void ExecGrant_Tablespace(InternalGrant *grantStmt);
@@ -251,6 +254,9 @@ restrict_and_check_grant(bool is_grant, AclMode avail_goptions, bool all_privs,
case ACL_KIND_LANGUAGE:
whole_mask = ACL_ALL_RIGHTS_LANGUAGE;
break;
+ case ACL_KIND_LARGEOBJECT:
+ whole_mask = ACL_ALL_RIGHTS_LARGEOBJECT;
+ break;
case ACL_KIND_NAMESPACE:
whole_mask = ACL_ALL_RIGHTS_NAMESPACE;
break;
@@ -410,6 +416,10 @@ ExecuteGrantStmt(GrantStmt *stmt)
all_privileges = ACL_ALL_RIGHTS_LANGUAGE;
errormsg = gettext_noop("invalid privilege type %s for language");
break;
+ case ACL_OBJECT_LARGEOBJECT:
+ all_privileges = ACL_ALL_RIGHTS_LARGEOBJECT;
+ errormsg = gettext_noop("invalid privilege type %s for large object");
+ break;
case ACL_OBJECT_NAMESPACE:
all_privileges = ACL_ALL_RIGHTS_NAMESPACE;
errormsg = gettext_noop("invalid privilege type %s for schema");
@@ -513,6 +523,9 @@ ExecGrantStmt_oids(InternalGrant *istmt)
case ACL_OBJECT_LANGUAGE:
ExecGrant_Language(istmt);
break;
+ case ACL_OBJECT_LARGEOBJECT:
+ ExecGrant_Largeobject(istmt);
+ break;
case ACL_OBJECT_NAMESPACE:
ExecGrant_Namespace(istmt);
break;
@@ -597,6 +610,20 @@ objectNamesToOids(GrantObjectType objtype, List *objnames)
ReleaseSysCache(tuple);
}
break;
+ case ACL_OBJECT_LARGEOBJECT:
+ foreach(cell, objnames)
+ {
+ Oid lobjOid = intVal(lfirst(cell));
+
+ if (!LargeObjectExists(lobjOid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist",
+ lobjOid)));
+
+ objects = lappend_oid(objects, lobjOid);
+ }
+ break;
case ACL_OBJECT_NAMESPACE:
foreach(cell, objnames)
{
@@ -1279,6 +1306,9 @@ RemoveRoleFromObjectACL(Oid roleid, Oid classid, Oid objid)
case LanguageRelationId:
istmt.objtype = ACL_OBJECT_LANGUAGE;
break;
+ case LargeObjectRelationId:
+ istmt.objtype = ACL_OBJECT_LARGEOBJECT;
+ break;
case NamespaceRelationId:
istmt.objtype = ACL_OBJECT_NAMESPACE;
break;
@@ -2473,6 +2503,138 @@ ExecGrant_Language(InternalGrant *istmt)
}
static void
+ExecGrant_Largeobject(InternalGrant *istmt)
+{
+ Relation relation;
+ ListCell *cell;
+
+ if (istmt->all_privs && istmt->privileges == ACL_NO_RIGHTS)
+ istmt->privileges = ACL_ALL_RIGHTS_LARGEOBJECT;
+
+ relation = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
+
+ foreach(cell, istmt->objects)
+ {
+ Oid loid = lfirst_oid(cell);
+ Form_pg_largeobject_metadata form_lo_meta;
+ char loname[NAMEDATALEN];
+ Datum aclDatum;
+ bool isNull;
+ AclMode avail_goptions;
+ AclMode this_privileges;
+ Acl *old_acl;
+ Acl *new_acl;
+ Oid grantorId;
+ Oid ownerId;
+ HeapTuple newtuple;
+ Datum values[Natts_pg_largeobject_metadata];
+ bool nulls[Natts_pg_largeobject_metadata];
+ bool replaces[Natts_pg_largeobject_metadata];
+ int noldmembers;
+ int nnewmembers;
+ Oid *oldmembers;
+ Oid *newmembers;
+ ScanKeyData entry[1];
+ SysScanDesc scan;
+ HeapTuple tuple;
+
+ /* There's no syscache for pg_largeobject_metadata */
+ ScanKeyInit(&entry[0],
+ ObjectIdAttributeNumber,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ scan = systable_beginscan(relation,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, entry);
+
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for large object %u", loid);
+
+ form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(tuple);
+
+ /*
+ * Get owner ID and working copy of existing ACL. If there's no ACL,
+ * substitute the proper default.
+ */
+ ownerId = form_lo_meta->lomowner;
+ aclDatum = heap_getattr(tuple,
+ Anum_pg_largeobject_metadata_lomacl,
+ RelationGetDescr(relation), &isNull);
+ if (isNull)
+ old_acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId);
+ else
+ old_acl = DatumGetAclPCopy(aclDatum);
+
+ /* Determine ID to do the grant as, and available grant options */
+ select_best_grantor(GetUserId(), istmt->privileges,
+ old_acl, ownerId,
+ &grantorId, &avail_goptions);
+
+ /*
+ * Restrict the privileges to what we can actually grant, and emit the
+ * standards-mandated warning and error messages.
+ */
+ snprintf(loname, sizeof(loname), "large object %u", loid);
+ this_privileges =
+ restrict_and_check_grant(istmt->is_grant, avail_goptions,
+ istmt->all_privs, istmt->privileges,
+ loid, grantorId, ACL_KIND_LARGEOBJECT,
+ loname, 0, NULL);
+
+ /*
+ * Generate new ACL.
+ *
+ * We need the members of both old and new ACLs so we can correct the
+ * shared dependency information.
+ */
+ noldmembers = aclmembers(old_acl, &oldmembers);
+
+ new_acl = merge_acl_with_grant(old_acl, istmt->is_grant,
+ istmt->grant_option, istmt->behavior,
+ istmt->grantees, this_privileges,
+ grantorId, ownerId);
+
+ nnewmembers = aclmembers(new_acl, &newmembers);
+
+ /* finished building new ACL value, now insert it */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, false, sizeof(nulls));
+ MemSet(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
+ values[Anum_pg_largeobject_metadata_lomacl - 1]
+ = PointerGetDatum(new_acl);
+
+ newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation),
+ values, nulls, replaces);
+
+ simple_heap_update(relation, &newtuple->t_self, newtuple);
+
+ /* keep the catalog indexes up to date */
+ CatalogUpdateIndexes(relation, newtuple);
+
+ /* Update the shared dependency ACL info */
+ updateAclDependencies(LargeObjectRelationId,
+ HeapTupleGetOid(tuple), 0,
+ ownerId, istmt->is_grant,
+ noldmembers, oldmembers,
+ nnewmembers, newmembers);
+
+ systable_endscan(scan);
+
+ pfree(new_acl);
+
+ /* prevent error when processing duplicate objects */
+ CommandCounterIncrement();
+ }
+
+ heap_close(relation, RowExclusiveLock);
+}
+
+static void
ExecGrant_Namespace(InternalGrant *istmt)
{
Relation relation;
@@ -2812,6 +2974,8 @@ static const char *const no_priv_msg[MAX_ACL_KIND] =
gettext_noop("permission denied for type %s"),
/* ACL_KIND_LANGUAGE */
gettext_noop("permission denied for language %s"),
+ /* ACL_KIND_LARGEOBJECT */
+ gettext_noop("permission denied for large object %s"),
/* ACL_KIND_NAMESPACE */
gettext_noop("permission denied for schema %s"),
/* ACL_KIND_OPCLASS */
@@ -2850,6 +3014,8 @@ static const char *const not_owner_msg[MAX_ACL_KIND] =
gettext_noop("must be owner of type %s"),
/* ACL_KIND_LANGUAGE */
gettext_noop("must be owner of language %s"),
+ /* ACL_KIND_LARGEOBJECT */
+ gettext_noop("must be owner of large object %s"),
/* ACL_KIND_NAMESPACE */
gettext_noop("must be owner of schema %s"),
/* ACL_KIND_OPCLASS */
@@ -2969,6 +3135,9 @@ pg_aclmask(AclObjectKind objkind, Oid table_oid, AttrNumber attnum, Oid roleid,
return pg_proc_aclmask(table_oid, roleid, mask, how);
case ACL_KIND_LANGUAGE:
return pg_language_aclmask(table_oid, roleid, mask, how);
+ case ACL_KIND_LARGEOBJECT:
+ return pg_largeobject_aclmask_snapshot(table_oid, roleid,
+ mask, how, SnapshotNow);
case ACL_KIND_NAMESPACE:
return pg_namespace_aclmask(table_oid, roleid, mask, how);
case ACL_KIND_TABLESPACE:
@@ -3352,6 +3521,90 @@ pg_language_aclmask(Oid lang_oid, Oid roleid,
}
/*
+ * Exported routine for examining a user's privileges for a largeobject
+ *
+ * The reason why this interface has an argument of snapshot is that
+ * we apply a snapshot available on lo_open(), not SnapshotNow, when
+ * it is opened as read-only mode.
+ * If we could see the metadata and data from inconsistent viewpoint,
+ * it will give us much confusion. So, we need to provide an interface
+ * which takes an argument of snapshot.
+ *
+ * If the caller refers a large object with a certain snapshot except
+ * for SnapshotNow, its permission checks should be also applied in
+ * the same snapshot.
+ */
+AclMode
+pg_largeobject_aclmask_snapshot(Oid lobj_oid, Oid roleid,
+ AclMode mask, AclMaskHow how,
+ Snapshot snapshot)
+{
+ AclMode result;
+ Relation pg_lo_meta;
+ ScanKeyData entry[1];
+ SysScanDesc scan;
+ HeapTuple tuple;
+ Datum aclDatum;
+ bool isNull;
+ Acl *acl;
+ Oid ownerId;
+
+ /* Superusers bypass all permission checking. */
+ if (superuser_arg(roleid))
+ return mask;
+
+ /*
+ * Get the largeobject's ACL from pg_language_metadata
+ */
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ AccessShareLock);
+
+ ScanKeyInit(&entry[0],
+ ObjectIdAttributeNumber,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(lobj_oid));
+
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ snapshot, 1, entry);
+
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist", lobj_oid)));
+
+ ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner;
+
+ aclDatum = heap_getattr(tuple, Anum_pg_largeobject_metadata_lomacl,
+ RelationGetDescr(pg_lo_meta), &isNull);
+
+ if (isNull)
+ {
+ /* No ACL, so build default ACL */
+ acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId);
+ aclDatum = (Datum) 0;
+ }
+ else
+ {
+ /* detoast ACL if necessary */
+ acl = DatumGetAclP(aclDatum);
+ }
+
+ result = aclmask(acl, roleid, ownerId, mask, how);
+
+ /* if we have a detoasted copy, free it */
+ if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
+ pfree(acl);
+
+ systable_endscan(scan);
+
+ heap_close(pg_lo_meta, AccessShareLock);
+
+ return result;
+}
+
+/*
* Exported routine for examining a user's privileges for a namespace
*/
AclMode
@@ -3802,6 +4055,20 @@ pg_language_aclcheck(Oid lang_oid, Oid roleid, AclMode mode)
}
/*
+ * Exported routine for checking a user's access privileges to a largeobject
+ */
+AclResult
+pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode,
+ Snapshot snapshot)
+{
+ if (pg_largeobject_aclmask_snapshot(lobj_oid, roleid, mode,
+ ACLMASK_ANY, snapshot) != 0)
+ return ACLCHECK_OK;
+ else
+ return ACLCHECK_NO_PRIV;
+}
+
+/*
* Exported routine for checking a user's access privileges to a namespace
*/
AclResult
@@ -3992,6 +4259,53 @@ pg_language_ownercheck(Oid lan_oid, Oid roleid)
}
/*
+ * Ownership check for a largeobject (specified by OID)
+ *
+ * Note that we have no candidate to call this routine with a certain
+ * snapshot except for SnapshotNow, so we don't provide an interface
+ * with _snapshot() version now.
+ */
+bool
+pg_largeobject_ownercheck(Oid lobj_oid, Oid roleid)
+{
+ Relation pg_lo_meta;
+ ScanKeyData entry[1];
+ SysScanDesc scan;
+ HeapTuple tuple;
+ Oid ownerId;
+
+ /* Superusers bypass all permission checking. */
+ if (superuser_arg(roleid))
+ return true;
+
+ /* There's no syscache for pg_largeobject_metadata */
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ AccessShareLock);
+
+ ScanKeyInit(&entry[0],
+ ObjectIdAttributeNumber,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(lobj_oid));
+
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, entry);
+
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist", lobj_oid)));
+
+ ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner;
+
+ systable_endscan(scan);
+ heap_close(pg_lo_meta, AccessShareLock);
+
+ return has_privs_of_role(roleid, ownerId);
+}
+
+/*
* Ownership check for a namespace (specified by OID).
*/
bool
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 8a07c69c7e8..4ef3bb3c56f 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.92 2009/10/05 19:24:35 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.93 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -37,6 +37,7 @@
#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
@@ -131,6 +132,7 @@ static const Oid object_classes[MAX_OCLASS] = {
ConversionRelationId, /* OCLASS_CONVERSION */
AttrDefaultRelationId, /* OCLASS_DEFAULT */
LanguageRelationId, /* OCLASS_LANGUAGE */
+ LargeObjectRelationId, /* OCLASS_LARGEOBJECT */
OperatorRelationId, /* OCLASS_OPERATOR */
OperatorClassRelationId, /* OCLASS_OPCLASS */
OperatorFamilyRelationId, /* OCLASS_OPFAMILY */
@@ -1074,6 +1076,10 @@ doDeletion(const ObjectAddress *object)
DropProceduralLanguageById(object->objectId);
break;
+ case OCLASS_LARGEOBJECT:
+ LargeObjectDrop(object->objectId);
+ break;
+
case OCLASS_OPERATOR:
RemoveOperatorById(object->objectId);
break;
@@ -1991,6 +1997,10 @@ getObjectClass(const ObjectAddress *object)
Assert(object->objectSubId == 0);
return OCLASS_LANGUAGE;
+ case LargeObjectRelationId:
+ Assert(object->objectSubId == 0);
+ return OCLASS_LARGEOBJECT;
+
case OperatorRelationId:
Assert(object->objectSubId == 0);
return OCLASS_OPERATOR;
@@ -2243,6 +2253,10 @@ getObjectDescription(const ObjectAddress *object)
ReleaseSysCache(langTup);
break;
}
+ case OCLASS_LARGEOBJECT:
+ appendStringInfo(&buffer, _("large object %u"),
+ object->objectId);
+ break;
case OCLASS_OPERATOR:
appendStringInfo(&buffer, _("operator %s"),
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
index 313ccdd3f07..517a0f3932e 100644
--- a/src/backend/catalog/pg_largeobject.c
+++ b/src/backend/catalog/pg_largeobject.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,8 +16,16 @@
#include "access/genam.h"
#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "catalog/catalog.h"
+#include "catalog/dependency.h"
#include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
+#include "catalog/toasting.h"
+#include "miscadmin.h"
+#include "utils/acl.h"
#include "utils/bytea.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -27,113 +35,258 @@
/*
* Create a large object having the given LO identifier.
*
- * We do this by inserting an empty first page, so that the object will
- * appear to exist with size 0. Note that the unique index will reject
- * an attempt to create a duplicate page.
+ * We create a new large object by inserting an entry into
+ * pg_largeobject_metadata without any data pages, so that the object
+ * will appear to exist with size 0.
*/
-void
+Oid
LargeObjectCreate(Oid loid)
{
- Relation pg_largeobject;
+ Relation pg_lo_meta;
HeapTuple ntup;
- Datum values[Natts_pg_largeobject];
- bool nulls[Natts_pg_largeobject];
- int i;
+ Oid loid_new;
+ Datum values[Natts_pg_largeobject_metadata];
+ bool nulls[Natts_pg_largeobject_metadata];
- pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
/*
- * Form new tuple
+ * Insert metadata of the largeobject
*/
- for (i = 0; i < Natts_pg_largeobject; i++)
- {
- values[i] = (Datum) NULL;
- nulls[i] = false;
- }
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
- i = 0;
- values[i++] = ObjectIdGetDatum(loid);
- values[i++] = Int32GetDatum(0);
- values[i++] = DirectFunctionCall1(byteain,
- CStringGetDatum(""));
+ values[Anum_pg_largeobject_metadata_lomowner - 1]
+ = ObjectIdGetDatum(GetUserId());
+ nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true;
- ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls);
+ ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta),
+ values, nulls);
+ if (OidIsValid(loid))
+ HeapTupleSetOid(ntup, loid);
- /*
- * Insert it
- */
- simple_heap_insert(pg_largeobject, ntup);
-
- /* Update indexes */
- CatalogUpdateIndexes(pg_largeobject, ntup);
+ loid_new = simple_heap_insert(pg_lo_meta, ntup);
+ Assert(!OidIsValid(loid) || loid == loid_new);
- heap_close(pg_largeobject, RowExclusiveLock);
+ CatalogUpdateIndexes(pg_lo_meta, ntup);
heap_freetuple(ntup);
+
+ heap_close(pg_lo_meta, RowExclusiveLock);
+
+ return loid_new;
}
+/*
+ * Drop a large object having the given LO identifier.
+ *
+ * When we drop a large object, it is necessary to drop both of metadata
+ * and data pages in same time.
+ */
void
LargeObjectDrop(Oid loid)
{
- bool found = false;
+ Relation pg_lo_meta;
Relation pg_largeobject;
ScanKeyData skey[1];
- SysScanDesc sd;
+ SysScanDesc scan;
HeapTuple tuple;
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
+
+ pg_largeobject = heap_open(LargeObjectRelationId,
+ RowExclusiveLock);
+
+ /*
+ * Delete an entry from pg_largeobject_metadata
+ */
ScanKeyInit(&skey[0],
- Anum_pg_largeobject_loid,
+ ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(loid));
+ ObjectIdGetDatum(loid));
- pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, skey);
- sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
- SnapshotNow, 1, skey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist", loid)));
+
+ simple_heap_delete(pg_lo_meta, &tuple->t_self);
+
+ systable_endscan(scan);
+
+ /*
+ * Delete all the associated entries from pg_largeobject
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_largeobject_loid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
- while ((tuple = systable_getnext(sd)) != NULL)
+ scan = systable_beginscan(pg_largeobject,
+ LargeObjectLOidPNIndexId, true,
+ SnapshotNow, 1, skey);
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
simple_heap_delete(pg_largeobject, &tuple->t_self);
- found = true;
}
- systable_endscan(sd);
+ systable_endscan(scan);
heap_close(pg_largeobject, RowExclusiveLock);
- if (!found)
+ heap_close(pg_lo_meta, RowExclusiveLock);
+}
+
+/*
+ * LargeObjectAlterOwner
+ *
+ * Implementation of ALTER LARGE OBJECT statement
+ */
+void
+LargeObjectAlterOwner(Oid loid, Oid newOwnerId)
+{
+ Form_pg_largeobject_metadata form_lo_meta;
+ Relation pg_lo_meta;
+ ScanKeyData skey[1];
+ SysScanDesc scan;
+ HeapTuple oldtup;
+ HeapTuple newtup;
+
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
+
+ ScanKeyInit(&skey[0],
+ ObjectIdAttributeNumber,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, skey);
+
+ oldtup = systable_getnext(scan);
+ if (!HeapTupleIsValid(oldtup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", loid)));
+
+ form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup);
+ if (form_lo_meta->lomowner != newOwnerId)
+ {
+ Datum values[Natts_pg_largeobject_metadata];
+ bool nulls[Natts_pg_largeobject_metadata];
+ bool replaces[Natts_pg_largeobject_metadata];
+ Acl *newAcl;
+ Datum aclDatum;
+ bool isnull;
+
+ /* Superusers can always do it */
+ if (!superuser())
+ {
+ /*
+ * The 'lo_compat_privileges' is not checked here, because we
+ * don't have any access control features in the 8.4.x series
+ * or earlier release.
+ * So, it is not a place we can define a compatible behavior.
+ */
+
+ /* Otherwise, must be owner of the existing object */
+ if (!pg_largeobject_ownercheck(loid, GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be owner of large object %u", loid)));
+
+ /* Must be able to become new owner */
+ check_is_member_of_role(GetUserId(), newOwnerId);
+ }
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(nulls));
+
+ values[Anum_pg_largeobject_metadata_lomowner - 1]
+ = ObjectIdGetDatum(newOwnerId);
+ replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true;
+
+ /*
+ * Determine the modified ACL for the new owner.
+ * This is only necessary when the ACL is non-null.
+ */
+ aclDatum = heap_getattr(oldtup,
+ Anum_pg_largeobject_metadata_lomacl,
+ RelationGetDescr(pg_lo_meta), &isnull);
+ if (!isnull)
+ {
+ newAcl = aclnewowner(DatumGetAclP(aclDatum),
+ form_lo_meta->lomowner, newOwnerId);
+ values[Anum_pg_largeobject_metadata_lomacl - 1]
+ = PointerGetDatum(newAcl);
+ replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
+ }
+
+ newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta),
+ values, nulls, replaces);
+
+ simple_heap_update(pg_lo_meta, &newtup->t_self, newtup);
+ CatalogUpdateIndexes(pg_lo_meta, newtup);
+
+ heap_freetuple(newtup);
+
+ /* Update owner dependency reference */
+ changeDependencyOnOwner(LargeObjectRelationId,
+ loid, newOwnerId);
+ }
+ systable_endscan(scan);
+
+ heap_close(pg_lo_meta, RowExclusiveLock);
}
+/*
+ * LargeObjectExists
+ *
+ * Currently, we don't use system cache to contain metadata of
+ * large objects, because massive number of large objects can
+ * consume not a small amount of process local memory.
+ *
+ * Note that LargeObjectExists always scans the system catalog
+ * with SnapshotNow, so it is unavailable to use to check
+ * existence in read-only accesses.
+ */
bool
LargeObjectExists(Oid loid)
{
+ Relation pg_lo_meta;
+ ScanKeyData skey[1];
+ SysScanDesc sd;
+ HeapTuple tuple;
bool retval = false;
- Relation pg_largeobject;
- ScanKeyData skey[1];
- SysScanDesc sd;
- /*
- * See if we can find any tuples belonging to the specified LO
- */
ScanKeyInit(&skey[0],
- Anum_pg_largeobject_loid,
+ ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
- pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ AccessShareLock);
- sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+ sd = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
SnapshotNow, 1, skey);
- if (systable_getnext(sd) != NULL)
+ tuple = systable_getnext(sd);
+ if (HeapTupleIsValid(tuple))
retval = true;
systable_endscan(sd);
- heap_close(pg_largeobject, AccessShareLock);
+ heap_close(pg_lo_meta, AccessShareLock);
return retval;
}
diff --git a/src/backend/catalog/pg_shdepend.c b/src/backend/catalog/pg_shdepend.c
index be70143ea27..7130444448e 100644
--- a/src/backend/catalog/pg_shdepend.c
+++ b/src/backend/catalog/pg_shdepend.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.36 2009/10/07 22:14:18 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.37 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -25,6 +25,7 @@
#include "catalog/pg_database.h"
#include "catalog/pg_default_acl.h"
#include "catalog/pg_language.h"
+#include "catalog/pg_largeobject.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
@@ -1347,6 +1348,10 @@ shdepReassignOwned(List *roleids, Oid newrole)
AlterLanguageOwner_oid(sdepForm->objid, newrole);
break;
+ case LargeObjectRelationId:
+ LargeObjectAlterOwner(sdepForm->objid, newrole);
+ break;
+
case DefaultAclRelationId:
/*
* Ignore default ACLs; they should be handled by
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index b91f2205d1c..8ba630a83db 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -8,13 +8,14 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.31 2009/01/01 17:23:37 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.32 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
+#include "catalog/pg_largeobject.h"
#include "commands/alter.h"
#include "commands/conversioncmds.h"
#include "commands/dbcommands.h"
@@ -233,6 +234,10 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
AlterLanguageOwner(strVal(linitial(stmt->object)), newowner);
break;
+ case OBJECT_LARGEOBJECT:
+ LargeObjectAlterOwner(intVal(linitial(stmt->object)), newowner);
+ break;
+
case OBJECT_OPERATOR:
Assert(list_length(stmt->objarg) == 2);
AlterOperatorOwner(stmt->object,
diff --git a/src/backend/commands/comment.c b/src/backend/commands/comment.c
index 610816db6d0..d57ea25d9ca 100644
--- a/src/backend/commands/comment.c
+++ b/src/backend/commands/comment.c
@@ -7,7 +7,7 @@
* Copyright (c) 1996-2009, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/comment.c,v 1.108 2009/10/12 19:49:24 adunstan Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/comment.c,v 1.109 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -25,6 +25,7 @@
#include "catalog/pg_description.h"
#include "catalog/pg_language.h"
#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
@@ -42,6 +43,7 @@
#include "commands/comment.h"
#include "commands/dbcommands.h"
#include "commands/tablespace.h"
+#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
@@ -1435,7 +1437,20 @@ CommentLargeObject(List *qualname, char *comment)
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", loid)));
- /* Call CreateComments() to create/drop the comments */
+ /* Permission checks */
+ if (!lo_compat_privileges &&
+ !pg_largeobject_ownercheck(loid, GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be owner of large object %u", loid)));
+
+ /*
+ * Call CreateComments() to create/drop the comments
+ *
+ * See the comment in the inv_create() which describes
+ * the reason why LargeObjectRelationId is used instead
+ * of the LargeObjectMetadataRelationId.
+ */
CreateComments(loid, LargeObjectRelationId, 0, comment);
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c9188b2d7c6..2344b79547b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.308 2009/12/09 21:57:50 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.309 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -6186,6 +6186,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
case OCLASS_CAST:
case OCLASS_CONVERSION:
case OCLASS_LANGUAGE:
+ case OCLASS_LARGEOBJECT:
case OCLASS_OPERATOR:
case OCLASS_OPCLASS:
case OCLASS_OPFAMILY:
diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c
index 24605b5490d..0b816b6ff02 100644
--- a/src/backend/libpq/be-fsstubs.c
+++ b/src/backend/libpq/be-fsstubs.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.91 2009/06/11 14:48:58 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.92 2009/12/11 03:34:55 itagaki Exp $
*
* NOTES
* This should be moved to a more appropriate place. It is here
@@ -42,14 +42,20 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "catalog/pg_largeobject_metadata.h"
#include "libpq/be-fsstubs.h"
#include "libpq/libpq-fs.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/large_object.h"
+#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
+/*
+ * compatibility flag for permission checks
+ */
+bool lo_compat_privileges;
/*#define FSDB 1*/
#define BUFSIZE 8192
@@ -156,6 +162,17 @@ lo_read(int fd, char *buf, int len)
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
+ /* Permission checks */
+ if (!lo_compat_privileges &&
+ pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+ GetUserId(),
+ ACL_SELECT,
+ cookies[fd]->snapshot) != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for large object %u",
+ cookies[fd]->id)));
+
status = inv_read(cookies[fd], buf, len);
return status;
@@ -177,6 +194,17 @@ lo_write(int fd, const char *buf, int len)
errmsg("large object descriptor %d was not opened for writing",
fd)));
+ /* Permission checks */
+ if (!lo_compat_privileges &&
+ pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+ GetUserId(),
+ ACL_UPDATE,
+ cookies[fd]->snapshot) != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for large object %u",
+ cookies[fd]->id)));
+
status = inv_write(cookies[fd], buf, len);
return status;
@@ -251,6 +279,13 @@ lo_unlink(PG_FUNCTION_ARGS)
{
Oid lobjId = PG_GETARG_OID(0);
+ /* Must be owner of the largeobject */
+ if (!lo_compat_privileges &&
+ !pg_largeobject_ownercheck(lobjId, GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be owner of large object %u", lobjId)));
+
/*
* If there are any open LO FDs referencing that ID, close 'em.
*/
@@ -482,6 +517,17 @@ lo_truncate(PG_FUNCTION_ARGS)
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
+ /* Permission checks */
+ if (!lo_compat_privileges &&
+ pg_largeobject_aclcheck_snapshot(cookies[fd]->id,
+ GetUserId(),
+ ACL_UPDATE,
+ cookies[fd]->snapshot) != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for large object %u",
+ cookies[fd]->id)));
+
inv_truncate(cookies[fd], len);
PG_RETURN_INT32(0);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 8dca2573605..3257ccbbf83 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.695 2009/12/07 05:22:22 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.696 2009/12/11 03:34:55 itagaki Exp $
*
* HISTORY
* AUTHOR DATE MAJOR EVENT
@@ -397,6 +397,7 @@ static TypeName *TableFuncTypeName(List *columns);
%type <boolean> opt_varying opt_timezone
%type <ival> Iconst SignedIconst
+%type <list> Iconst_list
%type <str> Sconst comment_text
%type <str> RoleId opt_granted_by opt_boolean ColId_or_Sconst
%type <list> var_list
@@ -4576,6 +4577,14 @@ privilege_target:
n->objs = $2;
$$ = n;
}
+ | LARGE_P OBJECT_P Iconst_list
+ {
+ PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
+ n->targtype = ACL_TARGET_OBJECT;
+ n->objtype = ACL_OBJECT_LARGEOBJECT;
+ n->objs = $3;
+ $$ = n;
+ }
| SCHEMA name_list
{
PrivTarget *n = (PrivTarget *) palloc(sizeof(PrivTarget));
@@ -5851,6 +5860,14 @@ AlterOwnerStmt: ALTER AGGREGATE func_name aggr_args OWNER TO RoleId
n->newowner = $7;
$$ = (Node *)n;
}
+ | ALTER LARGE_P OBJECT_P Iconst OWNER TO RoleId
+ {
+ AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
+ n->objectType = OBJECT_LARGEOBJECT;
+ n->object = list_make1(makeInteger($4));
+ n->newowner = $7;
+ $$ = (Node *)n;
+ }
| ALTER OPERATOR any_operator oper_argtypes OWNER TO RoleId
{
AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
@@ -10542,6 +10559,10 @@ SignedIconst: Iconst { $$ = $1; }
| '-' Iconst { $$ = - $2; }
;
+Iconst_list: Iconst { $$ = list_make1(makeInteger($1)); }
+ | Iconst_list ',' Iconst { $$ = lappend($1, makeInteger($3)); }
+ ;
+
/*
* Name classification hierarchy.
*
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index 0def1decd0d..224971c03eb 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -24,7 +24,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.138 2009/06/11 14:49:02 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.139 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -32,18 +32,23 @@
#include "access/genam.h"
#include "access/heapam.h"
+#include "access/sysattr.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "catalog/catalog.h"
+#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
#include "commands/comment.h"
#include "libpq/libpq-fs.h"
+#include "miscadmin.h"
#include "storage/large_object.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
#include "utils/tqual.h"
@@ -139,30 +144,31 @@ close_lo_relation(bool isCommit)
static bool
myLargeObjectExists(Oid loid, Snapshot snapshot)
{
+ Relation pg_lo_meta;
+ ScanKeyData skey[1];
+ SysScanDesc sd;
+ HeapTuple tuple;
bool retval = false;
- Relation pg_largeobject;
- ScanKeyData skey[1];
- SysScanDesc sd;
- /*
- * See if we can find any tuples belonging to the specified LO
- */
ScanKeyInit(&skey[0],
- Anum_pg_largeobject_loid,
+ ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
- pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ AccessShareLock);
- sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+ sd = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
snapshot, 1, skey);
- if (systable_getnext(sd) != NULL)
+ tuple = systable_getnext(sd);
+ if (HeapTupleIsValid(tuple))
retval = true;
systable_endscan(sd);
- heap_close(pg_largeobject, AccessShareLock);
+ heap_close(pg_lo_meta, AccessShareLock);
return retval;
}
@@ -193,31 +199,31 @@ getbytealen(bytea *data)
Oid
inv_create(Oid lobjId)
{
+ Oid lobjId_new;
+
/*
- * Allocate an OID to be the LO's identifier, unless we were told what to
- * use. We can use the index on pg_largeobject for checking OID
- * uniqueness, even though it has additional columns besides OID.
+ * Create a new largeobject with empty data pages
*/
- if (!OidIsValid(lobjId))
- {
- open_lo_relation();
-
- lobjId = GetNewOidWithIndex(lo_heap_r, LargeObjectLOidPNIndexId,
- Anum_pg_largeobject_loid);
- }
+ lobjId_new = LargeObjectCreate(lobjId);
/*
- * Create the LO by writing an empty first page for it in pg_largeobject
- * (will fail if duplicate)
+ * dependency on the owner of largeobject
+ *
+ * The reason why we use LargeObjectRelationId instead of
+ * LargeObjectMetadataRelationId here is to provide backward
+ * compatibility to the applications which utilize a knowledge
+ * about internal layout of system catalogs.
+ * OID of pg_largeobject_metadata and loid of pg_largeobject
+ * are same value, so there are no actual differences here.
*/
- LargeObjectCreate(lobjId);
-
+ recordDependencyOnOwner(LargeObjectRelationId,
+ lobjId_new, GetUserId());
/*
* Advance command counter to make new tuple visible to later operations.
*/
CommandCounterIncrement();
- return lobjId;
+ return lobjId_new;
}
/*
@@ -292,10 +298,15 @@ inv_close(LargeObjectDesc *obj_desc)
int
inv_drop(Oid lobjId)
{
- LargeObjectDrop(lobjId);
+ ObjectAddress object;
- /* Delete any comments on the large object */
- DeleteComments(lobjId, LargeObjectRelationId, 0);
+ /*
+ * Delete any comments and dependencies on the large object
+ */
+ object.classId = LargeObjectRelationId;
+ object.objectId = lobjId;
+ object.objectSubId = 0;
+ performDeletion(&object, DROP_CASCADE);
/*
* Advance command counter so that tuple removal will be seen by later
@@ -315,7 +326,6 @@ inv_drop(Oid lobjId)
static uint32
inv_getsize(LargeObjectDesc *obj_desc)
{
- bool found = false;
uint32 lastbyte = 0;
ScanKeyData skey[1];
SysScanDesc sd;
@@ -339,13 +349,13 @@ inv_getsize(LargeObjectDesc *obj_desc)
* large object in reverse pageno order. So, it's sufficient to examine
* the first valid tuple (== last valid page).
*/
- while ((tuple = systable_getnext_ordered(sd, BackwardScanDirection)) != NULL)
+ tuple = systable_getnext_ordered(sd, BackwardScanDirection);
+ if (HeapTupleIsValid(tuple))
{
Form_pg_largeobject data;
bytea *datafield;
bool pfreeit;
- found = true;
if (HeapTupleHasNulls(tuple)) /* paranoia */
elog(ERROR, "null field found in pg_largeobject");
data = (Form_pg_largeobject) GETSTRUCT(tuple);
@@ -360,15 +370,10 @@ inv_getsize(LargeObjectDesc *obj_desc)
lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
if (pfreeit)
pfree(datafield);
- break;
}
systable_endscan_ordered(sd);
- if (!found)
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("large object %u does not exist", obj_desc->id)));
return lastbyte;
}
@@ -545,6 +550,12 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
errmsg("large object %u was not opened for writing",
obj_desc->id)));
+ /* check existence of the target largeobject */
+ if (!LargeObjectExists(obj_desc->id))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u was already dropped", obj_desc->id)));
+
if (nbytes <= 0)
return 0;
@@ -736,6 +747,12 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
errmsg("large object %u was not opened for writing",
obj_desc->id)));
+ /* check existence of the target largeobject */
+ if (!LargeObjectExists(obj_desc->id))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u was already dropped", obj_desc->id)));
+
open_lo_relation();
indstate = CatalogOpenIndexes(lo_heap_r);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 59576a25d8b..2fd4b9923f5 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.322 2009/12/09 21:57:51 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.323 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1638,6 +1638,9 @@ CreateCommandTag(Node *parsetree)
case OBJECT_LANGUAGE:
tag = "ALTER LANGUAGE";
break;
+ case OBJECT_LARGEOBJECT:
+ tag = "ALTER LARGEOBJECT";
+ break;
case OBJECT_OPERATOR:
tag = "ALTER OPERATOR";
break;
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 142e06cf45f..31cbbe7c760 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.151 2009/12/05 21:43:35 petere Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.152 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -764,6 +764,11 @@ acldefault(GrantObjectType objtype, Oid ownerId)
world_default = ACL_USAGE;
owner_default = ACL_ALL_RIGHTS_LANGUAGE;
break;
+ case ACL_OBJECT_LARGEOBJECT:
+ /* Grant SELECT,UPDATE by default, for now */
+ world_default = ACL_NO_RIGHTS;
+ owner_default = ACL_ALL_RIGHTS_LARGEOBJECT;
+ break;
case ACL_OBJECT_NAMESPACE:
world_default = ACL_NO_RIGHTS;
owner_default = ACL_ALL_RIGHTS_NAMESPACE;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index edc5544f7f3..900c3662786 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -10,7 +10,7 @@
* Written by Peter Eisentraut <peter_e@gmx.net>.
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.526 2009/12/09 21:57:51 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.527 2009/12/11 03:34:56 itagaki Exp $
*
*--------------------------------------------------------------------
*/
@@ -38,6 +38,7 @@
#include "commands/trigger.h"
#include "funcapi.h"
#include "libpq/auth.h"
+#include "libpq/be-fsstubs.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/cost.h"
@@ -1226,6 +1227,16 @@ static struct config_bool ConfigureNamesBool[] =
false, NULL, NULL
},
+ {
+ {"lo_compat_privileges", PGC_SUSET, COMPAT_OPTIONS_PREVIOUS,
+ gettext_noop("Enables backward compatibility in privilege checks on large objects"),
+ gettext_noop("When turned on, privilege checks on large objects perform "
+ "with backward compatibility as 8.4.x or earlier releases.")
+ },
+ &lo_compat_privileges,
+ false, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f2accd263e6..d2da9b9c3d2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -489,6 +489,7 @@
#backslash_quote = safe_encoding # on, off, or safe_encoding
#default_with_oids = off
#escape_string_warning = on
+#lo_compat_privileges = off
#sql_inheritance = on
#standard_conforming_strings = off
#synchronize_seqscans = on