aboutsummaryrefslogtreecommitdiff
path: root/src/interfaces/python/pg.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/python/pg.py')
-rw-r--r--src/interfaces/python/pg.py302
1 files changed, 0 insertions, 302 deletions
diff --git a/src/interfaces/python/pg.py b/src/interfaces/python/pg.py
deleted file mode 100644
index 0380dc4de9a..00000000000
--- a/src/interfaces/python/pg.py
+++ /dev/null
@@ -1,302 +0,0 @@
-# pg.py
-# Written by D'Arcy J.M. Cain
-
-# This library implements some basic database management stuff. It
-# includes the pg module and builds on it. This is known as the
-# "Classic" interface. For DB-API compliance use the pgdb module.
-
-from _pg import *
-from types import *
-import string, re, sys
-
-# utility function
-# We expect int, seq, decimal, text or date (more later)
-def _quote(d, t):
- if d == None:
- return "NULL"
-
- if t in ['int', 'seq']:
- if d == "": return "NULL"
- return "%d" % long(d)
-
- if t == 'decimal':
- if d == "": return "NULL"
- return "%f" % float(d)
-
- if t == 'money':
- if d == "": return "NULL"
- return "'%.2f'" % float(d)
-
- if t == 'bool':
- # Can't run upper() on these
- if d in (0, 1): return ("'f'", "'t'")[d]
-
- if string.upper(d) in ['T', 'TRUE', 'Y', 'YES', '1', 'ON']:
- return "'t'"
- else:
- return "'f'"
-
- if t == 'date' and d == '': return "NULL"
- if t in ('inet', 'cidr') and d == '': return "NULL"
-
- return "'%s'" % string.strip(re.sub("'", "''", \
- re.sub("\\\\", "\\\\\\\\", "%s" % d)))
-
-class DB:
- """This class wraps the pg connection type"""
-
- def __init__(self, *args, **kw):
- self.db = apply(connect, args, kw)
-
- # Create convience methods, in a way that is still overridable
- # (members are not copied because they are actually functions)
- for e in self.db.__methods__:
- setattr(self, e, getattr(self.db, e))
-
- self.__attnames__ = {}
- self.__pkeys__ = {}
- self.debug = None # For debugging scripts, set to output format
- # that takes a single string arg. For example
- # in a CGI set to "%s<BR>"
-
- def _do_debug(self, s):
- if not self.debug: return
- if type(self.debug) == StringType: print self.debug % s
- if type(self.debug) == FunctionType: self.debug(s)
- if type(self.debug) == FileType: print >> self.debug, s
-
- # wrap query for debugging
- def query(self, qstr):
- self._do_debug(qstr)
- return self.db.query(qstr)
-
- def pkey(self, cl, newpkey = None):
- """This method returns the primary key of a class. If newpkey
- is set and is set and is not a dictionary then set that
- value as the primary key of the class. If it is a dictionary
- then replace the __pkeys__ dictionary with it."""
- # Get all the primary keys at once
- if type(newpkey) == DictType:
- self.__pkeys__ = newpkey
- return
-
- if newpkey:
- self.__pkeys__[cl] = newpkey
- return newpkey
-
- if self.__pkeys__ == {}:
- for rel, att in self.db.query("""SELECT
- pg_class.relname, pg_attribute.attname
- FROM pg_class, pg_attribute, pg_index
- WHERE pg_class.oid = pg_attribute.attrelid AND
- pg_class.oid = pg_index.indrelid AND
- pg_index.indkey[0] = pg_attribute.attnum AND
- pg_index.indisprimary = 't' AND
- pg_attribute.attisdropped = 'f'""").getresult():
- self.__pkeys__[rel] = att
-
- # will raise an exception if primary key doesn't exist
- return self.__pkeys__[cl]
-
- def get_databases(self):
- l = []
- for n in self.db.query("SELECT datname FROM pg_database").getresult():
- l.append(n[0])
- return l
-
- def get_tables(self):
- l = []
- for n in self.db.query("""SELECT relname FROM pg_class
- WHERE relkind = 'r' AND
- relname !~ '^Inv' AND
- relname !~ '^pg_'""").getresult():
- l.append(n[0])
- return l
-
- def get_attnames(self, cl, newattnames = None):
- """This method gets a list of attribute names for a class. If
- the optional newattnames exists it must be a dictionary and
- will become the new attribute names dictionary."""
-
- if type(newattnames) == DictType:
- self.__attnames__ = newattnames
- return
- elif newattnames:
- raise error, "If supplied, newattnames must be a dictionary"
-
- # May as well cache them
- if self.__attnames__.has_key(cl):
- return self.__attnames__[cl]
-
- query = """SELECT pg_attribute.attname, pg_type.typname
- FROM pg_class, pg_attribute, pg_type
- WHERE pg_class.relname = '%s' AND
- pg_attribute.attnum > 0 AND
- pg_attribute.attrelid = pg_class.oid AND
- pg_attribute.atttypid = pg_type.oid AND
- pg_attribute.attisdropped = 'f'"""
-
- l = {}
- for attname, typname in self.db.query(query % cl).getresult():
- if re.match("^int", typname):
- l[attname] = 'int'
- elif re.match("^oid", typname):
- l[attname] = 'int'
- elif re.match("^text", typname):
- l[attname] = 'text'
- elif re.match("^char", typname):
- l[attname] = 'text'
- elif re.match("^name", typname):
- l[attname] = 'text'
- elif re.match("^abstime", typname):
- l[attname] = 'date'
- elif re.match("^date", typname):
- l[attname] = 'date'
- elif re.match("^timestamp", typname):
- l[attname] = 'date'
- elif re.match("^bool", typname):
- l[attname] = 'bool'
- elif re.match("^float", typname):
- l[attname] = 'decimal'
- elif re.match("^money", typname):
- l[attname] = 'money'
- else:
- l[attname] = 'text'
-
- l['oid'] = 'int' # every table has this
- self.__attnames__[cl] = l # cache it
- return self.__attnames__[cl]
-
- # return a tuple from a database
- def get(self, cl, arg, keyname = None, view = 0):
- if cl[-1] == '*': # need parent table name
- xcl = cl[:-1]
- else:
- xcl = cl
-
- if keyname == None: # use the primary key by default
- keyname = self.pkey(xcl)
-
- fnames = self.get_attnames(xcl)
-
- if type(arg) == DictType:
- # To allow users to work with multiple tables we munge the
- # name when the key is "oid"
- if keyname == 'oid': k = arg['oid_%s' % xcl]
- else: k = arg[keyname]
- else:
- k = arg
- arg = {}
-
- # We want the oid for later updates if that isn't the key
- if keyname == 'oid':
- q = "SELECT * FROM %s WHERE oid = %s" % (cl, k)
- elif view:
- q = "SELECT * FROM %s WHERE %s = %s" % \
- (cl, keyname, _quote(k, fnames[keyname]))
- else:
- q = "SELECT oid AS oid_%s, %s FROM %s WHERE %s = %s" % \
- (xcl, string.join(fnames.keys(), ','),\
- cl, keyname, _quote(k, fnames[keyname]))
-
- self._do_debug(q)
- res = self.db.query(q).dictresult()
- if res == []:
- raise error, \
- "No such record in %s where %s is %s" % \
- (cl, keyname, _quote(k, fnames[keyname]))
- return None
-
- for k in res[0].keys():
- arg[k] = res[0][k]
-
- return arg
-
- # Inserts a new tuple into a table
- # We currently don't support insert into views although PostgreSQL does
- def insert(self, cl, a):
- fnames = self.get_attnames(cl)
- l = []
- n = []
- for f in fnames.keys():
- if f != 'oid' and a.has_key(f):
- l.append(_quote(a[f], fnames[f]))
- n.append(f)
-
- try:
- q = "INSERT INTO %s (%s) VALUES (%s)" % \
- (cl, string.join(n, ','), string.join(l, ','))
- self._do_debug(q)
- a['oid_%s' % cl] = self.db.query(q)
- except:
- raise error, "Error inserting into %s: %s" % (cl, sys.exc_value)
-
- # reload the dictionary to catch things modified by engine
- # note that get() changes 'oid' below to oid_table
- # if no read perms (it can and does happen) return None
- try: return self.get(cl, a, 'oid')
- except: return None
-
- # Update always works on the oid which get returns if available
- # otherwise use the primary key. Fail if neither.
- def update(self, cl, a):
- self.pkey(cl) # make sure we have a self.__pkeys__ dictionary
-
- foid = 'oid_%s' % cl
- if a.has_key(foid):
- where = "oid = %s" % a[foid]
- elif self.__pkeys__.has_key(cl) and a.has_key(self.__pkeys__[cl]):
- where = "%s = '%s'" % (self.__pkeys__[cl], a[self.__pkeys__[cl]])
- else:
- raise error, "Update needs primary key or oid as %s" % foid
-
- v = []
- k = 0
- fnames = self.get_attnames(cl)
-
- for ff in fnames.keys():
- if ff != 'oid' and a.has_key(ff):
- v.append("%s = %s" % (ff, _quote(a[ff], fnames[ff])))
-
- if v == []:
- return None
-
- try:
- q = "UPDATE %s SET %s WHERE %s" % \
- (cl, string.join(v, ','), where)
- self._do_debug(q)
- self.db.query(q)
- except:
- raise error, "Can't update %s: %s" % (cl, sys.exc_value)
-
- # reload the dictionary to catch things modified by engine
- if a.has_key(foid):
- return self.get(cl, a, 'oid')
- else:
- return self.get(cl, a)
-
- # At some point we will need a way to get defaults from a table
- def clear(self, cl, a = {}):
- fnames = self.get_attnames(cl)
- for ff in fnames.keys():
- if fnames[ff] in ['int', 'decimal', 'seq', 'money']:
- a[ff] = 0
- else:
- a[ff] = ""
-
- a['oid'] = 0
- return a
-
- # Like update, delete works on the oid
- # one day we will be testing that the record to be deleted
- # isn't referenced somewhere (or else PostgreSQL will)
- def delete(self, cl, a):
- try:
- q = "DELETE FROM %s WHERE oid = %s" % (cl, a['oid_%s' % cl])
- self._do_debug(q)
- self.db.query(q)
- except:
- raise error, "Can't delete %s: %s" % (cl, sys.exc_value)
-
- return None
-