diff options
Diffstat (limited to 'src/interfaces/python/pg.py')
-rw-r--r-- | src/interfaces/python/pg.py | 302 |
1 files changed, 0 insertions, 302 deletions
diff --git a/src/interfaces/python/pg.py b/src/interfaces/python/pg.py deleted file mode 100644 index 0380dc4de9a..00000000000 --- a/src/interfaces/python/pg.py +++ /dev/null @@ -1,302 +0,0 @@ -# pg.py -# Written by D'Arcy J.M. Cain - -# This library implements some basic database management stuff. It -# includes the pg module and builds on it. This is known as the -# "Classic" interface. For DB-API compliance use the pgdb module. - -from _pg import * -from types import * -import string, re, sys - -# utility function -# We expect int, seq, decimal, text or date (more later) -def _quote(d, t): - if d == None: - return "NULL" - - if t in ['int', 'seq']: - if d == "": return "NULL" - return "%d" % long(d) - - if t == 'decimal': - if d == "": return "NULL" - return "%f" % float(d) - - if t == 'money': - if d == "": return "NULL" - return "'%.2f'" % float(d) - - if t == 'bool': - # Can't run upper() on these - if d in (0, 1): return ("'f'", "'t'")[d] - - if string.upper(d) in ['T', 'TRUE', 'Y', 'YES', '1', 'ON']: - return "'t'" - else: - return "'f'" - - if t == 'date' and d == '': return "NULL" - if t in ('inet', 'cidr') and d == '': return "NULL" - - return "'%s'" % string.strip(re.sub("'", "''", \ - re.sub("\\\\", "\\\\\\\\", "%s" % d))) - -class DB: - """This class wraps the pg connection type""" - - def __init__(self, *args, **kw): - self.db = apply(connect, args, kw) - - # Create convience methods, in a way that is still overridable - # (members are not copied because they are actually functions) - for e in self.db.__methods__: - setattr(self, e, getattr(self.db, e)) - - self.__attnames__ = {} - self.__pkeys__ = {} - self.debug = None # For debugging scripts, set to output format - # that takes a single string arg. For example - # in a CGI set to "%s<BR>" - - def _do_debug(self, s): - if not self.debug: return - if type(self.debug) == StringType: print self.debug % s - if type(self.debug) == FunctionType: self.debug(s) - if type(self.debug) == FileType: print >> self.debug, s - - # wrap query for debugging - def query(self, qstr): - self._do_debug(qstr) - return self.db.query(qstr) - - def pkey(self, cl, newpkey = None): - """This method returns the primary key of a class. If newpkey - is set and is set and is not a dictionary then set that - value as the primary key of the class. If it is a dictionary - then replace the __pkeys__ dictionary with it.""" - # Get all the primary keys at once - if type(newpkey) == DictType: - self.__pkeys__ = newpkey - return - - if newpkey: - self.__pkeys__[cl] = newpkey - return newpkey - - if self.__pkeys__ == {}: - for rel, att in self.db.query("""SELECT - pg_class.relname, pg_attribute.attname - FROM pg_class, pg_attribute, pg_index - WHERE pg_class.oid = pg_attribute.attrelid AND - pg_class.oid = pg_index.indrelid AND - pg_index.indkey[0] = pg_attribute.attnum AND - pg_index.indisprimary = 't' AND - pg_attribute.attisdropped = 'f'""").getresult(): - self.__pkeys__[rel] = att - - # will raise an exception if primary key doesn't exist - return self.__pkeys__[cl] - - def get_databases(self): - l = [] - for n in self.db.query("SELECT datname FROM pg_database").getresult(): - l.append(n[0]) - return l - - def get_tables(self): - l = [] - for n in self.db.query("""SELECT relname FROM pg_class - WHERE relkind = 'r' AND - relname !~ '^Inv' AND - relname !~ '^pg_'""").getresult(): - l.append(n[0]) - return l - - def get_attnames(self, cl, newattnames = None): - """This method gets a list of attribute names for a class. If - the optional newattnames exists it must be a dictionary and - will become the new attribute names dictionary.""" - - if type(newattnames) == DictType: - self.__attnames__ = newattnames - return - elif newattnames: - raise error, "If supplied, newattnames must be a dictionary" - - # May as well cache them - if self.__attnames__.has_key(cl): - return self.__attnames__[cl] - - query = """SELECT pg_attribute.attname, pg_type.typname - FROM pg_class, pg_attribute, pg_type - WHERE pg_class.relname = '%s' AND - pg_attribute.attnum > 0 AND - pg_attribute.attrelid = pg_class.oid AND - pg_attribute.atttypid = pg_type.oid AND - pg_attribute.attisdropped = 'f'""" - - l = {} - for attname, typname in self.db.query(query % cl).getresult(): - if re.match("^int", typname): - l[attname] = 'int' - elif re.match("^oid", typname): - l[attname] = 'int' - elif re.match("^text", typname): - l[attname] = 'text' - elif re.match("^char", typname): - l[attname] = 'text' - elif re.match("^name", typname): - l[attname] = 'text' - elif re.match("^abstime", typname): - l[attname] = 'date' - elif re.match("^date", typname): - l[attname] = 'date' - elif re.match("^timestamp", typname): - l[attname] = 'date' - elif re.match("^bool", typname): - l[attname] = 'bool' - elif re.match("^float", typname): - l[attname] = 'decimal' - elif re.match("^money", typname): - l[attname] = 'money' - else: - l[attname] = 'text' - - l['oid'] = 'int' # every table has this - self.__attnames__[cl] = l # cache it - return self.__attnames__[cl] - - # return a tuple from a database - def get(self, cl, arg, keyname = None, view = 0): - if cl[-1] == '*': # need parent table name - xcl = cl[:-1] - else: - xcl = cl - - if keyname == None: # use the primary key by default - keyname = self.pkey(xcl) - - fnames = self.get_attnames(xcl) - - if type(arg) == DictType: - # To allow users to work with multiple tables we munge the - # name when the key is "oid" - if keyname == 'oid': k = arg['oid_%s' % xcl] - else: k = arg[keyname] - else: - k = arg - arg = {} - - # We want the oid for later updates if that isn't the key - if keyname == 'oid': - q = "SELECT * FROM %s WHERE oid = %s" % (cl, k) - elif view: - q = "SELECT * FROM %s WHERE %s = %s" % \ - (cl, keyname, _quote(k, fnames[keyname])) - else: - q = "SELECT oid AS oid_%s, %s FROM %s WHERE %s = %s" % \ - (xcl, string.join(fnames.keys(), ','),\ - cl, keyname, _quote(k, fnames[keyname])) - - self._do_debug(q) - res = self.db.query(q).dictresult() - if res == []: - raise error, \ - "No such record in %s where %s is %s" % \ - (cl, keyname, _quote(k, fnames[keyname])) - return None - - for k in res[0].keys(): - arg[k] = res[0][k] - - return arg - - # Inserts a new tuple into a table - # We currently don't support insert into views although PostgreSQL does - def insert(self, cl, a): - fnames = self.get_attnames(cl) - l = [] - n = [] - for f in fnames.keys(): - if f != 'oid' and a.has_key(f): - l.append(_quote(a[f], fnames[f])) - n.append(f) - - try: - q = "INSERT INTO %s (%s) VALUES (%s)" % \ - (cl, string.join(n, ','), string.join(l, ',')) - self._do_debug(q) - a['oid_%s' % cl] = self.db.query(q) - except: - raise error, "Error inserting into %s: %s" % (cl, sys.exc_value) - - # reload the dictionary to catch things modified by engine - # note that get() changes 'oid' below to oid_table - # if no read perms (it can and does happen) return None - try: return self.get(cl, a, 'oid') - except: return None - - # Update always works on the oid which get returns if available - # otherwise use the primary key. Fail if neither. - def update(self, cl, a): - self.pkey(cl) # make sure we have a self.__pkeys__ dictionary - - foid = 'oid_%s' % cl - if a.has_key(foid): - where = "oid = %s" % a[foid] - elif self.__pkeys__.has_key(cl) and a.has_key(self.__pkeys__[cl]): - where = "%s = '%s'" % (self.__pkeys__[cl], a[self.__pkeys__[cl]]) - else: - raise error, "Update needs primary key or oid as %s" % foid - - v = [] - k = 0 - fnames = self.get_attnames(cl) - - for ff in fnames.keys(): - if ff != 'oid' and a.has_key(ff): - v.append("%s = %s" % (ff, _quote(a[ff], fnames[ff]))) - - if v == []: - return None - - try: - q = "UPDATE %s SET %s WHERE %s" % \ - (cl, string.join(v, ','), where) - self._do_debug(q) - self.db.query(q) - except: - raise error, "Can't update %s: %s" % (cl, sys.exc_value) - - # reload the dictionary to catch things modified by engine - if a.has_key(foid): - return self.get(cl, a, 'oid') - else: - return self.get(cl, a) - - # At some point we will need a way to get defaults from a table - def clear(self, cl, a = {}): - fnames = self.get_attnames(cl) - for ff in fnames.keys(): - if fnames[ff] in ['int', 'decimal', 'seq', 'money']: - a[ff] = 0 - else: - a[ff] = "" - - a['oid'] = 0 - return a - - # Like update, delete works on the oid - # one day we will be testing that the record to be deleted - # isn't referenced somewhere (or else PostgreSQL will) - def delete(self, cl, a): - try: - q = "DELETE FROM %s WHERE oid = %s" % (cl, a['oid_%s' % cl]) - self._do_debug(q) - self.db.query(q) - except: - raise error, "Can't delete %s: %s" % (cl, sys.exc_value) - - return None - |