diff options
Diffstat (limited to 'src/interfaces/python/pgdb.py')
-rw-r--r-- | src/interfaces/python/pgdb.py | 452 |
1 files changed, 0 insertions, 452 deletions
diff --git a/src/interfaces/python/pgdb.py b/src/interfaces/python/pgdb.py deleted file mode 100644 index 848cb509407..00000000000 --- a/src/interfaces/python/pgdb.py +++ /dev/null @@ -1,452 +0,0 @@ -""" pgdb - DB-SIG compliant module for PygreSQL. - - (c) 1999, Pascal Andre <andre@via.ecp.fr>. - See package documentation for further information on copyright. - - Inline documentation is sparse. See DB-SIG 2.0 specification for - usage information. - - basic usage: - - pgdb.connect(connect_string) -> connection - connect_string = 'host:database:user:password:opt:tty' - All parts are optional. You may also pass host through - password as keyword arguments. To pass a port, pass it in - the host keyword parameter: - pgdb.connect(host='localhost:5432') - - connection.cursor() -> cursor - - connection.commit() - - connection.close() - - connection.rollback() - - cursor.execute(query[, params]) - execute a query, binding params (a dictionary) if it is - passed. The binding syntax is the same as the % operator - for dictionaries, and no quoting is done. - - cursor.executemany(query, list of params) - execute a query many times, binding each param dictionary - from the list. - - cursor.fetchone() -> [value, value, ...] - - cursor.fetchall() -> [[value, value, ...], ...] - - cursor.fetchmany([size]) -> [[value, value, ...], ...] - returns size or cursor.arraysize number of rows from result - set. Default cursor.arraysize is 1. - - cursor.description -> [(column_name, type_name, display_size, - internal_size, precision, scale, null_ok), ...] - - Note that precision, scale and null_ok are not implemented. - - cursor.rowcount - number of rows available in the result set. Available after - a call to execute. - - cursor.close() - -""" - -import _pg -import string -import exceptions -import types -import time -import types - -# Marc-Andre is changing where DateTime goes. This handles it either way. -try: from mx import DateTime -except ImportError: import DateTime - -### module constants - -# compliant with DB SIG 2.0 -apilevel = '2.0' - -# module may be shared, but not connections -threadsafety = 1 - -# this module use extended python format codes -paramstyle = 'pyformat' - -### exception hierarchy - -class Warning(StandardError): - pass - -class Error(StandardError): - pass - -class InterfaceError(Error): - pass - -class DatabaseError(Error): - pass - -class DataError(DatabaseError): - pass - -class OperationalError(DatabaseError): - pass - -class IntegrityError(DatabaseError): - pass - -class InternalError(DatabaseError): - pass - -class ProgrammingError(DatabaseError): - pass - -class NotSupportedError(DatabaseError): - pass - -### internal type handling class -class pgdbTypeCache: - - def __init__(self, cnx): - self.__source = cnx.source() - self.__type_cache = {} - - def typecast(self, typ, value): - # for NULL values, no typecast is necessary - if value == None: - return value - - if typ == STRING: - pass - elif typ == BINARY: - pass - elif typ == BOOL: - value = (value[:1] in ['t','T']) - elif typ == INTEGER: - value = int(value) - elif typ == LONG: - value = long(value) - elif typ == FLOAT: - value = float(value) - elif typ == MONEY: - value = string.replace(value, "$", "") - value = string.replace(value, ",", "") - value = float(value) - elif typ == DATETIME: - # format may differ ... we'll give string - pass - elif typ == ROWID: - value = long(value) - return value - - def getdescr(self, oid): - try: - return self.__type_cache[oid] - except: - self.__source.execute( - "SELECT typname, typlen " - "FROM pg_type WHERE oid = %s" % oid - ) - res = self.__source.fetch(1)[0] - # column name is omitted from the return value. It will - # have to be prepended by the caller. - res = ( - res[0], - None, string.atoi(res[1]), - None, None, None - ) - self.__type_cache[oid] = res - return res - -### cursor object - -class pgdbCursor: - - def __init__(self, src, cache): - self.__cache = cache - self.__source = src - self.description = None - self.rowcount = -1 - self.arraysize = 1 - - def close(self): - self.__source.close() - self.description = None - self.rowcount = -1 - - def execute(self, operation, params = None): - # "The parameters may also be specified as list of - # tuples to e.g. insert multiple rows in a single - # operation, but this kind of usage is deprecated: - if params and type(params) == types.ListType and \ - type(params[0]) == types.TupleType: - self.executemany(operation, params) - else: - # not a list of tuples - self.executemany(operation, (params,)) - - def executemany(self, operation, param_seq): - self.description = None - self.rowcount = -1 - - # first try to execute all queries - totrows = 0 - sql = "INIT" - try: - for params in param_seq: - if params != None: - sql = _quoteparams(operation, params) - else: - sql = operation - rows = self.__source.execute(sql) - if rows != None: # true is __source is NOT a DQL - totrows = totrows + rows - except _pg.error, msg: - raise DatabaseError, "error '%s' in '%s'" % ( msg, sql ) - except: - raise OperationalError, "internal error in '%s'" % sql - - # then initialize result raw count and description - if self.__source.resulttype == _pg.RESULT_DQL: - self.rowcount = self.__source.ntuples - d = [] - for typ in self.__source.listinfo(): - # listinfo is a sequence of - # (index, column_name, type_oid) - # getdescr returns all items needed for a - # description tuple except the column_name. - desc = typ[1:2]+self.__cache.getdescr(typ[2]) - d.append(desc) - self.description = d - else: - self.rowcount = totrows - self.description = None - - def fetchone(self): - res = self.fetchmany(1, 0) - try: - return res[0] - except: - return None - - def fetchall(self): - return self.fetchmany(-1, 0) - - def fetchmany(self, size = None, keep = 1): - if size == None: - size = self.arraysize - if keep == 1: - self.arraysize = size - - try: res = self.__source.fetch(size) - except _pg.error, e: raise DatabaseError, str(e) - - result = [] - for r in res: - row = [] - for i in range(len(r)): - row.append(self.__cache.typecast( - self.description[i][1], - r[i] - ) - ) - result.append(row) - return result - - def nextset(self): - raise NotSupportedError, "nextset() is not supported" - - def setinputsizes(self, sizes): - pass - - def setoutputsize(self, size, col = 0): - pass - - -try: - _quote = _pg.quote_fast - _quoteparams = _pg.quoteparams_fast -except (NameError, AttributeError): - def _quote(x): - if type(x) == DateTime.DateTimeType: - x = str(x) - if type(x) == types.StringType: - x = "'" + string.replace( - string.replace(str(x), '\\', '\\\\'), "'", "''") + "'" - - elif type(x) in (types.IntType, types.LongType, types.FloatType): - pass - elif x is None: - x = 'NULL' - elif type(x) in (types.ListType, types.TupleType): - x = '(%s)' % string.join(map(lambda x: str(_quote(x)), x), ',') - elif hasattr(x, '__pg_repr__'): - x = x.__pg_repr__() - else: - raise InterfaceError, 'do not know how to handle type %s' % type(x) - - return x - - def _quoteparams(s, params): - if hasattr(params, 'has_key'): - x = {} - for k, v in params.items(): - x[k] = _quote(v) - params = x - else: - params = tuple(map(_quote, params)) - - return s % params - -### connection object - -class pgdbCnx: - - def __init__(self, cnx): - self.__cnx = cnx - self.__cache = pgdbTypeCache(cnx) - try: - src = self.__cnx.source() - src.execute("BEGIN") - except: - raise OperationalError, "invalid connection." - - def close(self): - self.__cnx.close() - - def commit(self): - try: - src = self.__cnx.source() - src.execute("COMMIT") - src.execute("BEGIN") - except: - raise OperationalError, "can't commit." - - def rollback(self): - try: - src = self.__cnx.source() - src.execute("ROLLBACK") - src.execute("BEGIN") - except: - raise OperationalError, "can't rollback." - - def cursor(self): - try: - src = self.__cnx.source() - return pgdbCursor(src, self.__cache) - except: - raise pgOperationalError, "invalid connection." - -### module interface - -# connects to a database -def connect(dsn = None, user = None, password = None, host = None, database = None): - # first get params from DSN - dbport = -1 - dbhost = "" - dbbase = "" - dbuser = "" - dbpasswd = "" - dbopt = "" - dbtty = "" - try: - params = string.split(dsn, ":") - dbhost = params[0] - dbbase = params[1] - dbuser = params[2] - dbpasswd = params[3] - dbopt = params[4] - dbtty = params[5] - except: - pass - - # override if necessary - if user != None: - dbuser = user - if password != None: - dbpasswd = password - if database != None: - dbbase = database - if host != None: - try: - params = string.split(host, ":") - dbhost = params[0] - dbport = int(params[1]) - except: - pass - - # empty host is localhost - if dbhost == "": - dbhost = None - if dbuser == "": - dbuser = None - - # open the connection - cnx = _pg.connect(dbbase, dbhost, dbport, dbopt, - dbtty, dbuser, dbpasswd) - return pgdbCnx(cnx) - -### types handling - -# PostgreSQL is object-oriented: types are dynamic. We must thus use type names -# as internal type codes. - -class pgdbType: - - def __init__(self, *values): - self.values= values - - def __cmp__(self, other): - if other in self.values: - return 0 - if other < self.values: - return 1 - else: - return -1 - -STRING = pgdbType( - 'char', 'bpchar', 'name', 'text', 'varchar' -) - -# BLOB support is pg specific -BINARY = pgdbType() -INTEGER = pgdbType('int2', 'int4', 'serial') -LONG = pgdbType('int8') -FLOAT = pgdbType('float4', 'float8', 'numeric') -BOOL = pgdbType('bool') -MONEY = pgdbType('money') - -# this may be problematic as type are quite different ... I hope it won't hurt -DATETIME = pgdbType( - 'abstime', 'reltime', 'tinterval', 'date', 'time', 'timespan', 'timestamp', 'timestamptz', 'interval' -) - -# OIDs are used for everything (types, tables, BLOBs, rows, ...). This may cause -# confusion, but we are unable to find out what exactly is behind the OID (at -# least not easily enough). Should this be undefined as BLOBs ? -ROWID = pgdbType( - 'oid', 'oid8' -) - -# mandatory type helpers -def Date(year, month, day): - return DateTime.DateTime(year, month, day) - -def Time(hour, minute, second): - return DateTime.TimeDelta(hour, minute, second) - -def Timestamp(year, month, day, hour, minute, second): - return DateTime.DateTime(year, month, day, hour, minute, second) - -def DateFromTicks(ticks): - return apply(Date, time.localtime(ticks)[:3]) - -def TimeFromTicks(ticks): - return apply(Time, time.localtime(ticks)[3:6]) - -def TimestampFromTicks(ticks): - return apply(Timestamp, time.localtime(ticks)[:6]) - |