1
0
Fork 0
mirror of https://github.com/yweber/lodel2.git synced 2026-05-02 13:10:58 +02:00

Merge branch 'sqlwrapper'

This commit is contained in:
Yann 2015-06-05 11:07:02 +02:00
commit fff0225428
9 changed files with 581 additions and 902 deletions

90
Database/sqlalter.py Normal file
View file

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
import os
import sqlalchemy as sqla
from sqlalchemy.ext.compiler import compiles
## @file sqlalter.py
# This file defines all DDL (data definition langage) for the ALTER TABLE instructions
#
# It uses the SqlAlchemy compilation and quoting methos to generate SQL
class AddColumn(sqla.schema.DDLElement):
""" Defines the ddl for adding a column to a table """
def __init__(self,table, column):
""" Instanciate the DDL
@param table sqlalchemy.Table: A sqlalchemy table object
@param column sqlalchemy.Column: A sqlalchemy column object
"""
self.col = column
self.table = table
@compiles(AddColumn, 'mysql')
@compiles(AddColumn, 'postgresql')
@compiles(AddColumn, 'sqlite')
def visit_add_column(element, ddlcompiler, **kw):
""" Compiles the AddColumn DDL for mysql, postgresql and sqlite"""
prep = ddlcompiler.sql_compiler.preparer
tname = prep.format_table(element.table)
colname = prep.format_column(element.col)
return 'ALTER TABLE %s ADD COLUMN %s %s'%(tname, colname, element.col.type)
@compiles(AddColumn)
def visit_add_column(element, ddlcompiler, **kw):
raise NotImplementedError('Add column not yet implemented for '+str(ddlcompiler.dialect.name))
class DropColumn(sqla.schema.DDLElement):
""" Defines the DDL for droping a column from a table """
def __init__(self, table, column):
""" Instanciate the DDL
@param table sqlalchemy.Table: A sqlalchemy table object
@param column sqlalchemy.Column: A sqlalchemy column object representing the column to drop
"""
self.col = column
self.table = table
@compiles(DropColumn,'mysql')
@compiles(DropColumn, 'postgresql')
def visit_drop_column(element, ddlcompiler, **kw):
""" Compiles the DropColumn DDL for mysql & postgresql """
prep = ddlcompiler.sql_compiler.preparer
tname = prep.format_table(element.table)
colname = prep.format_column(element.col)
return 'ALTER TABLE %s DROP COLUMN %s'%(tname, colname)
@compiles(DropColumn)
def visit_drop_column(element, ddlcompiler, **kw):
raise NotImplementedError('Drop column not yet implemented for '+str(ddlcompiler.dialect.name))
class AlterColumn(sqla.schema.DDLElement):
""" Defines the DDL for changing the type of a column """
def __init__(self, table, column):
""" Instanciate the DDL
@param table sqlalchemy.Table: A sqlalchemy Table object
@param column sqlalchemy.Column: A sqlalchemy Column object representing the new column
"""
self.col = column
self.table = table
@compiles(AlterColumn, 'mysql')
def visit_alter_column(element, ddlcompiler, **kw):
""" Compiles the AlterColumn DDL for mysql """
prep = ddlcompiler.sql_compiler.preparer
tname = prep.format_table(element.table)
colname = prep.format_column(element.col)
return 'ALTER TABLE %s ALTER COLUMN %s %s'%(tname, colname, element.col.type)
@compiles(AlterColumn, 'postgresql')
def visit_alter_column(element, ddlcompiler, **kw):
""" Compiles the AlterColumn DDL for postgresql """
prep = ddlcompiler.sql_compiler.preparer
tname = prep.format_table(element.table)
colname = prep.format_column(element.col)
return 'ALTER TABLE %s ALTER COLUMN %s TYPE %s'%(tname, colname, element.col.type)
@compiles(AlterColumn)
def visit_alter_column(element, ddlcompiler, **kw):
raise NotImplementedError('Alter column not yet implemented for '+str(ddlcompiler.dialect.name))

View file

@ -1,73 +0,0 @@
# -*- coding: utf-8 -*-
import os
import logging as logger
import sqlalchemy as sql
from Database.sqlwrapper import SqlWrapper
class SqlObject(object):
""" Object that make aliases with sqlalchemy
Example usage of object that inherite from SqlObject :
class foo(SlqObject,...):
def __init__(self, ...):
self.__class__.tname = 'foo_table'
f = foo(...)
req = f.where(f.col.id == 42)
res = f.rexec(req)
e = bar(...)
req = f.join(e.col.id == f.col.id)
res = f.rexec(req)
"""
def __init__(self, tname):
if not type(tname) is str:
logger.error("Unable to instanciate, bad argument...")
raise TypeError('Excepted a string not a '+str(type(tname)))
self.tname = tname
self.table = self.Table()
pass
def Table(self):
self.table = sql.Table(self.tname, sql.MetaData(), autoload_with=SqlWrapper.rengine, autoload=True)
return self.table
@property
def col(self):
return self.table.c
@property
def sel(self):
return sql.select([self.table])
@property
def where(self):
return self.sel.where
@property
def join(self):
return self.sel.join
@property
def rconn(self):
return SqlWrapper.rc()
@property
def wconn(self):
return SqlWrapper.wc()
def sFetchAll(self, sel):
return self.rexec(sel).fetchall()
def rexec(self, o):
return self.rconn.execute(o)
def wexec(self, o):
return self.wconn.execute(o)

View file

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
import os
import logging as logger
import sqlalchemy as sql
from Database.sqlwrapper import SqlWrapper
class SqlQueryBuilder():
def __init__(self, sqlwrapper, table):
if not type(sqlwrapper) is SqlWrapper:
logger.error("Unable to instanciate, bad argument...")
raise TypeError('Excepted a SqlWrapper not a '+str(type(sqlwrapper)))
self.table = table
self.sqlwrapper = sqlwrapper
self.proxy = None
def Select(self, arg):
""" Alias for select clause
@param arg iterable: arg must be a Python list or other iterable and contain eather table name/type or colum/literal_column
"""
self.proxy = sql.select(arg)
return self.proxy
def Where(self, arg):
""" Alias for where clause
@param arg SQL expression object or string
"""
self.proxy = self.proxy.where(arg)
def From(self, arg):
""" Alias for select_from clause
@param arg Table or table('tablename') or join clause
"""
self.proxy = self.proxy.select_from(arg)
def Update(self):
self.proxy = self.table.update()
def Insert(self):
self.proxy = self.table.insert()
def Delete(self):
self.proxy = self.proxy.delete()
def Value(self, arg):
"""
Allow you to specifies the VALUES or SET clause of the statement.
@param arg: VALUES or SET clause
"""
self.proxy = self.proxy.values(arg)
def Execute(self, bindedparam):
"""
Execute the sql query constructed in the proxy and return the result.
If no query then return False.
@return: query result on success else False
"""
if(self.proxy.__str__().split() == 'SELECT'):
if('bindparam' in self.proxy.__str__()):
#on test separement la présence de la clause bindparam et le type de l'argument correspondant
#car si la clause est présente mais que l'argument est defectueux on doit renvoyer False et non pas executer la requete
if(type(bindedparam) is list and type(bindedparam[0]) is dict):
return self.sqlwrapper.rconn.execute(self.proxy, bindedparam)
else:
return False
else:
return self.sqlwrapper.rconn.execute(self.proxy)
elif(self.proxy is not None):
if('bindparam' in self.proxy.__str__()):
if(type(bindedparam) is list and type(bindedparam[0]) is dict):
return self.sqlwrapper.wconn.execute(self.proxy, bindedparam)
else:
return False
else:
return self.sqlwrapper.wconn.execute(self.proxy)
else:
return False

View file

@ -4,11 +4,11 @@ from Database.sqlwrapper import SqlWrapper
class SQLSetup(object):
def initDb(self):
db = SqlWrapper()
def initDb(self, dbconfname = 'default'):
db = SqlWrapper(read_db = dbconfname, write_db = dbconfname)
tables = self.get_schema()
for table in tables:
err = db.create_table(table)
db.dropAll()
db.createAllFromConf(tables)
def get_schema(self):
tables = []

View file

@ -1,118 +1,201 @@
# -*- coding: utf-8 -*-
import os
import re
import logging as logger
import sqlalchemy as sqla
from sqlalchemy.ext.compiler import compiles
from django.conf import settings
from Database.sqlalter import *
#Logger config
logger.getLogger().setLevel('DEBUG')
#To be able to use dango confs
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
class SqlWrapper(object):
""" A wrapper class to sqlalchemy
""" A wrapper class to sqlalchemy
Usefull to provide a standart API
__Note__ : This class is not thread safe (sqlAlchemy connections are not). Create a new instance of the class to use in different threads or use SqlWrapper::copy
"""
##Read Engine
rengine = None
##Write Engine
wengine = None
##Read connection
rconn = None
##Write connection
wconn = None
ENGINES = {'mysql': {
'driver': 'pymysql',
'encoding': 'utf8'
},
'postgresql': {
'driver': 'psycopg2',
'encoding': 'utf8',
},
'sqlite': {
'driver': 'pysqlite',
'encoding': 'utf8',
},
}
##Configuration dict alias for class access
config=settings.LODEL2SQLWRAPPER
##SqlAlchemy logging
sqla_logging = False
##Wrapper instance list
wrapinstance = dict()
def __init__(self, alchemy_logs=None):
def __init__(self, name=None, alchemy_logs=None, read_db = "default", write_db = "default"):
""" Instanciate a new SqlWrapper
@param name str: The wrapper name
@param alchemy_logs bool: If true activate sqlalchemy logger
@param read_db str: The name of the db conf
@param write_db str: The name of the db conf
@todo Better use of name (should use self.cfg['wrapper'][name] to get engines configs
@todo Is it a really good idea to store instance in class scope ? Maybe not !!
"""
if (alchemy_logs != None and bool(alchemy_logs) != self.__class__.sqla_logging):
#logging config changed for sqlalchemy
self.__class__.restart()
self.sqlalogging = False if alchemy_logs == None else bool(alchemy_logs)
if not self.__class__.started():
self.__class__.start()
if name == None:
self.name = read_db+'+'+write_db
else:
self.name = name
self.r_dbconf = read_db
self.w_dbconf = write_db
self._checkConf() #raise if errors in configuration
if self.name in self.__class__.wrapinstance:
logger.warning("A SqlWrapper with the name "+self.name+" allready exist. Replacing the old one by the new one")
SqlWrapper.wrapinstance[self.name] = self
#Engine and wrapper initialisation
self.r_engine = self._getEngine(True, self.sqlalogging)
self.w_engine = self._getEngine(False, self.sqlalogging)
self.r_conn = None
self.w_conn = None
self.metadata = None #TODO : use it to load all db schema in 1 request and don't load it each table instanciation
self.meta_crea = None
logger.debug("New wrapper instance : <"+self.name+" read:"+str(self.r_engine)+" write:"+str(self.w_engine))
pass
@property
def cfg(self):
""" Return the SqlWrapper.config dict """
return self.__class__.config;
@property
def _engines_cfg(self):
return self.__class__.ENGINES;
@property
def meta(self):
if self.metadata == None:
self.renewMetaData()
return self.metadata
def renewMetaData(self):
""" (Re)load the database schema """
if self.metadata == None:
self.metadata = sqla.MetaData(bind=self.r_engine, reflect=True)
else:
self.metadata = sqla.MetaData(bind=self.r_engine, reflect=True)
@property
def rconn(self):
""" Return the read connection
@warning Do not store the connection, call this method each time you need it
"""
return self._getConnection(True)
@property
def wconn(self):
""" Return the write connection
@warning Do not store the connection, call this method each time you need it
"""
return self._getConnection(False)
def _getConnection(self, read):
""" Return an opened connection
@param read bool: If true return the reading connection
@return A sqlAlchemy db connection
@private
"""
if read:
r = self.r_conn
else:
r = self.w_conn
if r == None:
#Connection not yet opened
self.connect(read)
r = self._getConnection(read) #TODO : Un truc plus safe/propre qu'un appel reccursif ?
return r
def connect(self, read = None):
""" Open a connection to a database
@param read bool|None: If None connect both, if True only connect the read side (False the write side)
@return None
"""
if read or read == None:
if self.r_conn != None:
logger.debug(' SqlWrapper("'+self.name+'") Unable to connect, already connected')
else:
self.r_conn = self.r_engine.connect()
if not read or read == None:
if self.w_conn != None:
logger.debug(' SqlWrapper("'+self.name+'") Unable to connect, already connected')
else:
self.w_conn = self.w_engine.connect()
def disconnect(self, read = None):
""" Close a connection to a database
@param read bool|None: If None disconnect both, if True only connect the read side (False the write side)
@return None
"""
if read or read == None:
if self.r_conn == None:
logger.info('Unable to close read connection : connection not opened')
else:
self.r_conn.close()
self.r_conn = None
if not read or read == None:
if self.r_conn == None:
logger.info('Unable to close write connection : connection not opened')
else:
self.w_conn.close()
self.w_conn = None
def reconnect(self, read = None):
""" Close and reopen a connection to a database
@param read bool|None: If None disconnect both, if True only connect the read side (False the write side)
@return None
"""
self.disconnect(read)
self.connect(read)
@classmethod
def table(c, tname):
""" Return a SqlAlchemy Table object
@param o str: Table name
@return a SqlAlchemy Table instance
def reconnectAll(c, read = None):
""" Reconnect all the wrappers
@static
"""
for wname in c.wrapinstance:
c.wrapinstance[wname].reconnect(read)
def Table(self, tname):
""" Instanciate a new SqlAlchemy Table
@param tname str: The table name
@return A new instance of SqlAlchemy::Table
"""
if not isinstance(tname, str):
raise TypeError("Excepting a str but got a "+str(type(name)))
return sqla.Table(o, sqla.MetaData())
return TypeError('Excepting a <class str> but got a '+str(type(tname)))
#return sqla.Table(tname, self.meta, autoload_with=self.r_engine, autoload=True)
return sqla.Table(tname, self.meta)
@classmethod
def connect(c,read = None):
if read == None:
return c.connect(True) and c.coonect(False)
elif read:
c.rconn = c.rengine.connect()
else:
c.wconn = c.wengine.connect()
return True #TODO attention c'est pas checké...
@classmethod
def conn(c, read=True):
if read:
res = c.rconn
else:
res = c.wconn
if res == None:
if not c.connect(read):
raise RuntimeError('Unable to connect to Db')
return c.conn(read)
return c.rconn
@classmethod
def rc(c): return c.conn(True)
@classmethod
def wc(c): return c.conn(False)
@classmethod
def start(c, sqlalogging = None):
""" Load engines
Open connections to databases
@param sqlalogging bool: overwrite class parameter about sqlalchemy logging
@return False if already started
"""
c.checkConf()
if c.started():
logger.warning('Starting SqlWrapper but it is allready started')
return False
c.rengine = c._getEngine(read=True, sqlalogging=None)
c.wengine = c._getEngine(read=False, sqlalogging=None)
return True
@classmethod
def stop(c): c.rengine = c.wengine = None; pass
@classmethod
def restart(c): c.stop(); c.start(); pass
@classmethod
def started(c): return (c.rengine != None and c.rengine != None)
@classmethod
def _sqllog(c,sqlalogging = None):
return bool(sqlalogging) if sqlalogging != None else c.sqla_logging
@classmethod
def _getEngine(c, read=True, sqlalogging = None):
def _getEngine(self, read=True, sqlalogging = None):
""" Return a sqlalchemy engine
@param read bool: If True return the read engine, else
return the write one
@ -121,16 +204,10 @@ class SqlWrapper(object):
@todo Put the check on db config in SqlWrapper.checkConf()
"""
#Loading confs
connconf = 'dbread' if read else 'dbwrite'
dbconf = connconf if connconf in c.config['db'] else 'default'
cfg = self.cfg['db'][self.r_dbconf if read else self.w_dbconf]
if dbconf not in c.config['db']: #This check should not be here
raise NameError('Configuration error no db "'+dbconf+'" in configuration files')
cfg = c.config['db'][dbconf] #Database config
edata = c.config['engines'][cfg['ENGINE']] #engine infos
conn_str = cfg['ENGINE']+'+'+edata['driver']+'://'
edata = self._engines_cfg[cfg['ENGINE']] #engine infos
conn_str = ""
if cfg['ENGINE'] == 'sqlite':
#Sqlite connection string
@ -153,10 +230,26 @@ class SqlWrapper(object):
conn_str = '%s+%s://'%(cfg['ENGINE'], edata['driver'])
conn_str += '%s@%s/%s'%(user,host,cfg['NAME'])
return sqla.create_engine(conn_str, encoding=edata['encoding'], echo=c._sqllog(sqlalogging))
ret = sqla.create_engine(conn_str, encoding=edata['encoding'], echo=self.sqlalogging)
logger.debug("Getting engine :"+str(ret))
return ret
@classmethod
def checkConf(c):
def getWrapper(c, name):
""" Return a wrapper instance from a wrapper name
@param name str: The wrapper name
@return a SqlWrapper instance
@throw KeyError
"""
if name not in c.wrapinstance:
raise KeyError("No wrapper named '"+name+"' exists")
return c.wrapinstance[name]
def _checkConf(self):
""" Class method that check the configuration
Configuration looks like
@ -172,44 +265,225 @@ class SqlWrapper(object):
- dbwrite (mandatory if no default db)
"""
err = []
if 'db' not in c.config:
if 'db' not in self.cfg:
err.append('Missing "db" in configuration')
else:
if 'default' not in c.config['db']:
if 'dbread' not in c.config:
err.append('Missing "dbread" in configuration and no "default" db declared')
if 'dbwrite' not in c.config:
err.append('Missing "dbwrite" in configuration and no "default" db declared')
for dbname in c.config['db']:
db = c.config['db'][dbname]
if 'ENGINE' not in db:
err.append('Missing "ENGINE" in database "'+db+'"')
for dbname in [self.r_dbconf, self.w_dbconf]:
if dbname not in self.cfg['db']:
err.append('Missing "'+dbname+'" db configuration')
else:
if db['ENGINE'] != 'sqlite' and 'USER' not in db:
err.append('Missing "USER" in database "'+db+'"')
if 'NAME' not in db:
err.append('Missing "NAME" in database "'+db+'"')
if len(c.config['engines']) == 0:
err.append('Missing "engines" in configuration')
for ename in c.config['engines']:
engine = c.config['engines'][ename]
if 'driver' not in engine:
err.append('Missing "driver" in database engine "'+ename+'"')
if 'encoding' not in engine:
err.append('Missing "encoding" in database engine "'+ename+'"')
db = self.cfg['db'][dbname]
if 'ENGINE' not in db:
err.append('Missing "ENGINE" in database "'+db+'"')
else:
if db['ENGINE'] not in self._engines_cfg:
err.append('Unknown engine "'+db['ENGINE']+'"')
elif db['ENGINE'] != 'sqlite' and 'USER' not in db:
err.append('Missing "User" in configuration of database "'+dbname+'"')
if 'NAME' not in db:
err.append('Missing "NAME" in database "'+dbname+'"')
if len(err)>0:
err_str = "\n"
for e in err:
err_str += "\t\t"+e+"\n"
raise NameError('Configuration errors in LODEL2SQLWRAPPER:'+err_str)
def dropAll(self):
""" Drop ALL tables from the database """
if not settings.DEBUG:
logger.critical("Trying to drop all tables but we are not in DEBUG !!!")
raise RuntimeError("Trying to drop all tables but we are not in DEBUG !!!")
meta = sqla.MetaData(bind=self.w_engine, reflect = True)
meta.drop_all()
pass
@property
def cfg(self):
""" Get the dict of options for the wrapper
Its an alias to the classes property SqlWrapper.config
@return a dict containing the Db settings"""
return self.__class__.config
def createAllFromConf(self, schema):
""" Create a bunch of tables from a schema
@param schema list: A list of table schema
@see SqlWrapper::createTable()
"""
self.meta_crea = sqla.MetaData()
logger.info("Running function createAllFromConf")
for i,table in enumerate(schema):
if not isinstance(table, dict):
raise TypeError("Excepted a list of dict but got a "+str(type(schema))+" in the list")
self.createTable(**table)
self.meta_crea.create_all(bind = self.w_engine)
logger.info("All tables created")
self.meta_crea = None
self.renewMetaData()
pass
def createTable(self, name, columns, **kw):
""" Create a table
@param name str: The table name
@param columns list: A list of columns description dict
@param extra dict: Extra arguments for table creation
@see SqlWrapper::createColumn()
"""
if self.meta_crea == None:
self.meta_crea = sqla.MetaData()
crea_now = True
else:
crea_now = False
if not isinstance(name, str):
raise TypeError("<class str> excepted for table name, but got "+type(name))
res = sqla.Table(name, self.meta_crea, **kw)
for i,col in enumerate(columns):
res.append_column(self.createColumn(**col))
if crea_now:
self.meta_crea.create_all(self.w_engine)
#logger.debug("Table '"+name+"' created")
pass
def createColumn(self, **kwargs):
""" Create a Column
Accepte named parameters :
- name : The column name
- type : see SqlWrapper::_strToSqlAType()
- extra : a dict like { "primarykey":True, "nullable":False, "default":"test"...}
@param **kwargs
"""
if not 'name' in kwargs or ('type' not in kwargs and 'type_' not in kwargs):
pass#ERROR
#Converting parameters
if 'type_' not in kwargs and 'type' in kwargs:
kwargs['type_'] = self._strToSqlAType(kwargs['type'])
del kwargs['type']
if 'extra' in kwargs:
#put the extra keys in kwargs
for exname in kwargs['extra']:
kwargs[exname] = kwargs['extra'][exname]
del kwargs['extra']
if 'foreignkey' in kwargs:
#Instanciate a fk
fk = sqla.ForeignKey(kwargs['foreignkey'])
del kwargs['foreignkey']
else:
fk = None
if 'primarykey' in kwargs:
#renaming primary_key in primarykey in kwargs
kwargs['primary_key'] = kwargs['primarykey']
del kwargs['primarykey']
res = sqla.Column(**kwargs)
if fk != None:
res.append_foreign_key(fk)
#logger.debug("Column '"+kwargs['name']+"' created")
return res
def _strToSqlAType(self, strtype):
""" Convert a string to an sqlAlchemy column type """
if 'VARCHAR' in strtype:
return self._strToVarchar(strtype)
else:
try:
return getattr(sqla, strtype)
except AttributeError:
raise NameError("Unknown type '"+strtype+"'")
pass
def _strToVarchar(self, vstr):
""" Convert a string like 'VARCHAR(XX)' (with XX an integer) to a SqlAlchemy varchar type"""
check_length = re.search(re.compile('VARCHAR\(([\d]+)\)', re.IGNORECASE), vstr)
column_length = int(check_length.groups()[0]) if check_length else None
return sqla.VARCHAR(length=column_length)
def dropColumn(self, tname, colname):
""" Drop a column from a table
@param tname str|sqlalchemy.Table: The table name or a Table object
@param colname str|sqlalchemy.Column: The column name or a column object
@return None
"""
if tname not in self.meta.tables: #Useless ?
raise NameError("The table '"+tname+"' dont exist")
table = self.Table(tname)
col = sqla.Column(colname)
ddl = DropColumn(table, col)
sql = ddl.compile(dialect=self.w_engine.dialect)
sql = str(sql)
logger.debug("Executing SQL : '"+sql+"'")
ret = bool(self.w_engine.execute(sql))
self.renewMetaData()
return ret
def addColumn(self, tname, colname, coltype):
""" Add a column to a table
@param tname str: The table name
@param colname str: The column name
@param coltype str: The new column type
@return True if query success False if it fails
"""
if tname not in self.meta.tables: #Useless ?
raise NameError("The table '"+tname+"' dont exist")
table = self.Table(tname)
newcol = self.createColumn(name=colname, type_ = coltype)
ddl = AddColumn(table, newcol)
sql = ddl.compile(dialect=self.w_engine.dialect)
sql = str(sql)
logger.debug("Executing SQL : '"+sql+"'")
ret = bool(self.wconn.execute(sql))
self.renewMetaData()
return ret
def alterColumn(self, tname, colname, col_newtype):
""" Change the type of a column
@param tname str: The table name
@param colname str: The column name
@param col_newtype str: The column new type
@return True if query successs False if it fails
"""
if tname not in self.meta.tables: #Useless ?
raise NameError("The table '"+tname+"' dont exist")
col = self.createColumn(name=colname, type_=col_newtype)
table = self.Table(tname)
ddl = AlterColumn(table, newcol)
sql = ddl.compile(dialect=self.w_engine.dialect)
sql = str(sql)
logger.debug("Executing SQL : '"+sql+"'")
ret = bool(self.wconn.execute(sql))
self.renewMetaData()
return ret
def _debug__printSchema(self):
""" Debug function to print the db schema """
print(self.meta)
for tname in self.meta.tables:
self._debug__printTable(tname)
def _debug__printTable(self, tname):
t = self.meta.tables[tname]
tstr = 'Table : "'+tname+'" :\n'
for c in t.c:
tstr += '\t\t"'+c.name+'"('+str(c.type)+') \n'
print(tstr)

View file

@ -1,519 +0,0 @@
import os
import logging
import random
from unittest import TestCase
from unittest.mock import MagicMock, Mock, patch, call
import unittest
import sqlalchemy
from Database.sqlwrapper import SqlWrapper
from django.conf import settings
from Database.sqlsettings import SQLSettings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
BADNAMES = ['', " \t ", "bad--", "%", "*", "`", '"', "'", "\0", "hello`World", 'Hello"world', 'foo%', '*bar.*', '%%', "hello\0world", print, 42 ]
for c in ('*','%','`', "'", '"', "\\"):
for _ in range(16):
c = "\\"+c
BADNAMES.append(c)
BADNAMES+=[chr(i) for i in list(range(0,0x09))+[0x0b, 0x0c]+list(range(0x0e, 0x01f))]
class SqlWrapperTests(TestCase):
def setUp(self):
#Overwriting database conf
SQLSettings.DB_READ_CONNECTION_NAME = 'testdb'
SQLSettings.DB_WRITE_CONNECTION_NAME = 'testdb'
self.testdb = os.path.join('/tmp/', 'lodel2_testdb.sqlite3')
settings.DATABASES['testdb'] = {
'ENGINE': 'sqlite',
'NAME': self.testdb,
}
#Disable logging but CRITICAL
logging.basicConfig(level=logging.CRITICAL)
pass
def tearDown(self):
try:
os.unlink(self.testdb) #Removing the test database
except FileNotFoundError: pass
""" Testing standart instanciation of sqlwrapper """
def test_init_sqlwrapper(self):
sw = SqlWrapper()
self.assertIsInstance(sw, SqlWrapper)
sw2 = SqlWrapper()
self.assertIsInstance(sw2, SqlWrapper)
def test_get_engine(self):
sw = SqlWrapper()
engine = sw.get_engine(SQLSettings.DB_READ_CONNECTION_NAME)
self.assertIsInstance(engine, sqlalchemy.engine.base.Engine)
def test_get_engine_badargs(self):
sw = SqlWrapper()
with self.assertRaises(Exception):
sw.get_engine(0)
with self.assertRaises(Exception):
sw.get_engine('')
with self.assertRaises(Exception):
sw.get_engine(' ')
@patch.object(SqlWrapper, 'get_read_engine')
def test_execute_queries(self, mock_read):
queries = [ 'SELECT * FROM foo', 'SELECT * FROM bar', 'SELECT foo.id, bar.id FROM foo, bar WHERE foo.id = 42 AND bar.name = \'hello world !\'' ]
mock_read.return_value = MagicMock()
mock_engine = mock_read.return_value
mock_engine.connect.return_value = MagicMock()
mock_conn = mock_engine.connect.return_value
sw = SqlWrapper()
#One query execution
sw.execute(queries[0], 'read')
mock_conn.execute.assert_called_with(queries[0])
mock_conn.reset_mock()
#multiple queries execution
sw.execute(queries, 'read')
except_calls = [ call(q) for q in queries ]
self.assertEqual(except_calls, mock_conn.execute.mock_calls)
@patch.object(sqlalchemy.ForeignKeyConstraint, '__init__')
def test_create_fk_constraint_mock(self, mock_fk):
mock_fk.return_value = None
sw = SqlWrapper()
sw.create_foreign_key_constraint_object('foo','bar', 'foobar')
mock_fk.asser_called_with(['foo'], ['bar'], 'foobar')
def test_create_fk_constraint(self):
sw = SqlWrapper()
fk = sw.create_foreign_key_constraint_object('foo', 'bar', 'foobar')
self.assertIsInstance(fk, sqlalchemy.ForeignKeyConstraint)
@unittest.skip('Dev') #TODO remove it
def test_create_fk_constraint_badargs(self):
sw = SqlWrapper()
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(['foo', 'bar'], 'foofoo', 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('bar', 2, 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(None, 'foo', 'bar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('bar', None, 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(None, None, 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(print, 'foo', 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('foo', print, 'babar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('foo', 'bar', print)
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('bar', 'foo', 42)
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('bar', 'foo', ' ')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(" \t ", 'foo')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('bar', 'foo', " \t ")
foocol = sqlalchemy.Column('foo',sqlalchemy.INTEGER)
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(foocol, foocol)
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object(foocol, 'bar')
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('foo', foocol)
with self.assertRaises(Exception):
sw.create_foreign_key_constraint_object('foo', 'bar', foocol)
@patch.object(sqlalchemy.Column, '__init__')
def test_create_column(self, mockcol):
mockcol.return_value = None
sw = SqlWrapper()
foo = sw.create_column_object('foo', 'INTEGER')
mockcol.assert_called_once_with('foo', sqlalchemy.INTEGER)
mockcol.reset_mock()
foo = sw.create_column_object('foo', 'VARCHAR(50)')
self.assertEqual(mockcol.call_args[0][0], 'foo')
self.assertIsInstance(mockcol.call_args[0][1], sqlalchemy.VARCHAR)
self.assertEqual(mockcol.call_args[0][1].length, 50)
mockcol.reset_mock()
foo = sw.create_column_object('foo', 'TEXT')
mockcol.assert_called_once_with('foo', sqlalchemy.TEXT)
mockcol.reset_mock()
foo = sw.create_column_object('foo', 'DATE')
mockcol.assert_called_once_with('foo', sqlalchemy.DATE)
mockcol.reset_mock()
foo = sw.create_column_object('foo', 'BOOLEAN')
mockcol.assert_called_once_with('foo', sqlalchemy.BOOLEAN)
mockcol.reset_mock()
xtrmlongname = ''
for _ in range(200):
xtrmlongname += 'veryvery'
xtrmlongname += 'longname'
foo = sw.create_column_object(xtrmlongname, 'TEXT')
mockcol.assert_called_once_with(xtrmlongname, sqlalchemy.TEXT)
mockcol.reset_mock()
def test_create_column_extra_fk(self):
sw = SqlWrapper()
extra = { 'foreignkey': 'bar' }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
fk = rescol.foreign_keys.pop()
self.assertIsInstance(fk, sqlalchemy.ForeignKey)
self.assertEqual(fk._colspec, 'bar')
def test_create_column_extra_default(self):
sw = SqlWrapper()
extra = { 'default': None, 'nullable': True }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, None)
extra = { 'default': "NULL", 'nullable': True }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, "NULL")
extra = { 'default': 'null', 'nullable': True }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 'null')
extra = { 'default': 42 }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 42)
extra = { 'default': 'foobardefault' }
rescol = sw.create_column_object('foo', 'VARCHAR(50)', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 'foobardefault')
extra = { 'default': 'foodefault' }
rescol = sw.create_column_object('foo', 'TEXT', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 'foodefault')
extra = { 'default': True }
rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, True)
extra = { 'default': False }
rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, False)
extra = { 'default': "true" }
rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, "true")
extra = { 'default': 0 }
rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 0)
extra = { 'default': 1 }
rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 1)
def test_create_column_extra_pknull(self):
sw = SqlWrapper()
for b in (True,False):
extra = { 'primarykey': b }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
self.assertEqual(rescol.primary_key, b)
extra = { 'nullable': b }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
self.assertEqual(rescol.nullable, b)
extra = { 'primarykey' : b, 'nullable': not b }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
self.assertEqual(rescol.primary_key, b)
self.assertEqual(rescol.nullable, not b)
extra = { 'primarykey' : b, 'nullable': b }
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
self.assertEqual(rescol.primary_key, b)
self.assertEqual(rescol.nullable, b)
def test_create_column_extra_all(self):
sw = SqlWrapper()
extra = { 'primarykey': False, 'nullable': False, 'default':42, 'foreignkey': 'foobar'}
rescol = sw.create_column_object('foo', 'INTEGER', extra)
self.assertIsInstance(rescol, sqlalchemy.Column)
self.assertEqual(rescol.primary_key, False)
self.assertEqual(rescol.nullable, False)
self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
self.assertEqual(rescol.default.arg, 42)
fk = rescol.foreign_keys.pop()
self.assertIsInstance(fk, sqlalchemy.ForeignKey)
self.assertEqual(fk._colspec, 'foobar')
@unittest.skip('Dev') #TODO remove it
def test_create_column_badargs(self):
sw = SqlWrapper()
cc = sw.create_column_object
ain = self.assertIsNone
for bname in BADNAMES:
ain(cc(bname, 'INTEGER'))
ain(cc('c', 'INNNTEGER'))
ain(cc(" \t\t ", 'TEXT'))
ain(cc('c', ' '))
ain(cc('c', 'VARCHAR(foo)'))
ain(cc('supercol', 'SUPERNOTTYPEDVARCHARSTR'))
ain(cc('c', None))
ain(cc(None, None))
@unittest.skip('Dev') #TODO remove it
def test_create_column_badextra(self):
sw = SqlWrapper()
cc = sw.create_column_object
ain = self.assertIsNone
#Put junk in extra datas
for xtra_name in [ 'primarykey', 'nullable', 'foreignkey' ]:
for junk in [ print, sqlalchemy, SqlWrapper, ' ' ]+BADNAMES:
ain(cc('c', 'TEXT', { xtra_name: junk }))
for junk in [True, False, 52, ' ']+BADNAMES:
ain(cc('c', 'TEXT', { 'foreignkey': junk }))
for xtra_name in ('primarykey', 'nullalble'):
for junk in [4096, 'HELLO WORLD !', ' ']+BADNAMES:
ain(cc('c', 'TEXT', { xtra_name, junk } ))
@patch.object(sqlalchemy.Table, '__init__')
def test_create_table(self, mock_table):
#TODO check constraints
""" Create tables and check that the names are ok """
mock_table.return_value = None
sw = SqlWrapper()
cols = [
{ 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
{ 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
{ 'name': 'col2', 'type': 'TEXT' }
]
for table_name in ['supertable', 'besttableever', 'foo-table']:
params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
self.assertTrue(sw.create_table(params))
self.assertEqual(mock_table.call_args[0][0], table_name)
pass
@patch.object(sqlalchemy.Table, 'append_column')
def test_create_table_col(self, mock_append):
#TODO check constraints
""" Create a table and check that the columns are OK """
sw = SqlWrapper()
table_name = 'supertablecol'
cols = [
{ 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
{ 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
{ 'name': 'col2', 'type': 'TEXT' }
]
except_coltype = [ sqlalchemy.INTEGER, sqlalchemy.INTEGER, sqlalchemy.TEXT ]
params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
sw.create_table(params) #This call return false because of the mock on append_column
self.assertEqual(len(mock_append.mock_calls), len(cols))
for n,c in enumerate(mock_append.mock_calls):
self.assertEqual(c[1][0].name, cols[n]['name'])
self.assertIsInstance(c[1][0].type, except_coltype[n])
pass
def test_create_table_recreate(self):
#TODO check constraints
""" Try to create the same table 2 times (except a False return the second time) """
sw = SqlWrapper()
params = { 'name': 'redondant', 'columns': [{'name':'foocol', 'type': 'INTEGER'}]}
self.assertTrue(sw.create_table(params))
params['columns'] = [{'name':'barcol', 'type': 'INTEGER'}]
self.assertFalse(sw.create_table(params))
@unittest.skip('dev') #TODO remove it
def test_create_table_badargs(self):
sw = SqlWrapper()
af = self.assertFalse
ct = sw.create_table
foocol = {'name': 'foocol', 'type': 'INTEGER'}
p = {'name': 'foo'}
af(ct(p)) #no columns
for bname in BADNAMES: #bad column name
p['columns'] = bname
af(ct(p))
p['columns']=[]; af(ct(p)) #empty columns TODO Or return True is normal ???
p = {'columns':[foocol]}
af(ct(p)) #no name
for bname in BADNAMES:
p['name'] = bname
af(ct(p))
pass
def create_test_table(self, sw):
""" Create a table for test purpose """
table_name = 'testtable'
cols = [
{ 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} },
{ 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} },
{ 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} },
]
sw.create_table( { 'name': table_name, 'columns': cols} )
pass
def test_get_table(self):
""" Try to get the testtable (check the name and type of return) """
sw = SqlWrapper()
self.create_test_table(sw)
rt = sw.get_table('testtable')
self.assertIsInstance(rt, sqlalchemy.Table)
self.assertEqual(rt.name, 'testtable')
rt = sw.get_table('testtable', 'write')
self.assertIsInstance(rt, sqlalchemy.Table)
self.assertEqual(rt.name, 'testtable')
rt = sw.get_table('testtable', 'read')
self.assertIsInstance(rt, sqlalchemy.Table)
self.assertEqual(rt.name, 'testtable')
pass
@unittest.skip('dev') #TODO remove skip
def test_get_table_badargs(self):
sw = SqlWrapper()
self.create_test_table(sw)
for badname in BADNAMES:
with self.assertRaises((sqlalchemy.exc.NoSuchTableError, Exception)):
sw.get_table(badname)
with self.assertRaises(sqlalchemy.exc.NoSuchTableError):
sw.get_table('FOOBAR')
with self.assertRaises(Exception):
sw.get_table(print)
with self.assertRaises(Exception):
sw.get_table(1)
#bad action types
with self.assertRaises(Exception):
sw.get_table('testtable', print)
with self.assertRaises(Exception):
sw.get_table('testtable', 'foobar')
with self.assertRaises(Exception):
sw.get_table('testtable', 42)
pass
def test_drop_table(self):
sw = SqlWrapper()
self.create_test_table(sw)
self.assertTrue(sw.drop_table('testtable'))
self.assertFalse(sw.drop_table('testtable'))
self.assertFalse(sw.drop_table('nonexisting'))
pass
def test_drop_table_badargs(self):
sw = SqlWrapper()
self.create_test_table(sw)
af = self.assertFalse
for bname in BADNAMES:
self.assertFalse(sw.drop_table(bname))
def test_create_get_drop_table(self):
sw = SqlWrapper()
funkynames = ('standart', "wow-nice", "-really-", "test_test-test", "_test", "test_", "test42", "foobar_123", "foobar-123", "foofoo--babar")
types = ['INTEGER', 'VARCHAR(5)', 'VARCHAR(128)', 'TEXT', 'BOOLEAN']
params = dict()
cols = []
for i,name in enumerate(funkynames):
cols.append({'name':name, 'type':types[i%len(types)]})
params['columns'] = cols
for name in random.sample(funkynames, len(funkynames)):
params['name'] = name
self.assertTrue(sw.create_table(params))
for name in random.sample(funkynames, len(funkynames)):
rt = sw.get_table(name)
self.assertIsInstance(rt, sqlalchemy.Table)#do we get a table ?
self.assertEqual(rt.name, name)
for name in random.sample(funkynames, len(funkynames)):
self.assertTrue(sw.drop_table(name))
pass

View file

@ -1,175 +0,0 @@
import os
import logging
import random
from unittest import TestCase
import unittest
from Database.sqlwrapper import SqlWrapper
from django.conf import settings
from Database.sqlsettings import SQLSettings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
#Bad strings for injection tests
INJECTIONS = [ 'foo UNION SELECT 1,2,3,4--', "foo' UNION SELECT 1,2,3--", 'foo" UNION SELECT 1,2,3--', 'foo` UNION SELECT 1,2,3,4--', '--', 'foo`; SELECT 1,2,3', 'foo"; SELECT 1,2,3', "foo'; SELECT 1,2,3", "; SELECT 1,2,3" ]
NAMEINJECT = INJECTIONS + [ '%', '*', "\0", "\b\b\b\b\b\b\b\b\b" ]
#Valid SQL types
VTYPES = [ 'integer', 'varchar(1)', 'varchar(50)', 'text', 'boolean' ]
class SqlWrapperQueryStrTests(TestCase):
def setUp(self):
#creating a test table
sw = SqlWrapper()
self.ttn = 'testtable'
self.cols = [
{ 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} },
{ 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} },
{ 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} },
{ 'name': 'testbool', 'type': 'BOOLEAN', 'extra': {'nullable':False, 'default': False}},
]
sw.create_table( { 'name': self.ttn, 'columns': self.cols} )
#Disable logging but CRITICAL
logging.basicConfig(level=logging.CRITICAL)
pass
def tearDown(self):
sw = SqlWrapper()
sw.drop_table(self.ttn)
@unittest.skip('dev') #TODO remove skip
def test_get_querystring(self):
sw = SqlWrapper()
actions = [ 'add_column', 'alter_column', 'drop_column' ]
dialects = [ 'default', 'mysql', 'postgresql' ]
for action in actions:
for dialect in dialects:
r = sw.get_querystring(action, dialect)
self.assertIsInstance(r, str)
@unittest.skip('dev') #TODO remove skip
def test_get_querystring_badargs(self):
sw = SqlWrapper()
actions = [ 1, -1, print, [], 'foo']
dialects = actions
for action in actions:
for dialect in dialects:
with self.assertRaises(ValueError):
r = sw.get_querystring(action, dialect)
@unittest.skip('dev') #TODO remove skip
def test_add_column(self):
sw = SqlWrapper()
colnames = [ 'taddcol1', 'test-add-col', 'test_add_col', '-test', '_add', '__col__' ]
for i, name in enumerate(colnames):
col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
self.assertTrue(sw.add_column(self.ttn, col))
pass
@unittest.skip('dev') #TODO remove skip
def test_add_column_badargs(self):
sw = SqlWrapper()
coolname = 'cool'
i=0
self.assertFalse(sw.add_column(self.ttn, {'type': 'INTEGER'}))
self.assertFalse(sw.add_column(self.ttn, {'name': 'foo'}))
self.assertFalse(sw.add_column(self.ttn, dict()))
self.assertFalse(sw.add_column(self.ttn, print))
self.assertFalse(sw.add_column(self.ttn, ['foo', 'integer']))
self.assertFalse(sw.add_column(self.ttn, None))
self.assertFalse(sw.add_column(self.ttn, 42))
self.assertFalse(sw.add_column(1, {'name':'foo', 'type':'integer'}))
self.assertFalse(sw.add_column(print, {'name':'foo', 'type':'integer'}))
self.assertFalse(sw.add_column([], {'name':'foo', 'type':'integer'}))
self.assertFalse(sw.add_column(dict(), {'name':'foo', 'type':'integer'}))
for badname in NAMEINJECT:
self.assertFalse(sw.add_column(self.ttn, {'name':badname, 'type':'INTEGER'}))
self.assertFalse(sw.add_column(self.ttn, {'name':coolname+str(i), 'type':badname}))
self.assertFalse(sw.add_column(badname, {'name':coolname+str(i), 'type':'INTEGER'}))
i+=1
@unittest.skip('dev') #TODO remove skip
def test_alter_column(self):
sw = SqlWrapper()
colnames = ['talter', 'talter1', 'test_alter', 'test-alter-col', '-test_alter', '__test_alter__']
for i,name in enumerate(random.sample(colnames, len(colnames))):
col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
self.assertTrue(sw.add_column( self.ttn, col))
for i,name in enumerate(random.sample(colnames, len(colnames))):
col = {'name': name, 'type': VTYPES[i%len(VTYPES)]}
self.assertTrue(self.ttn, col)
pass
@unittest.skip('dev') #TODO remove skip
def test_alter_column_badargs(self):
sw = SqlWrapper()
colnames = ['tabad', 'tabad1']
for i,name in enumerate(colnames):
col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
self.assertTrue(sw.add_column(self.ttn, col))
for i,badname in enumerate(NAMEINJECT):
col = { 'name': badname, 'type': VTYPES[i%len(VTYPES)] }
self.assertFalse(sw.alter_column(self.ttn, col))
col = { 'name': colnames[i%len(colnames)], 'type': badname}
self.assertFalse(sw.alter_column(self.ttn, col))
col = { 'name': badname, 'type': NAMEINJECT[random.randint(0,len(NAMEINJECT))]}
self.assertFalse(sw.alter_column(self.ttn, col))
col = { 'name': colnames[i%len(colnames)], 'type': VTYPES[i%len(VTYPES)] }
self.assertFalse(sw.alter_column(badname, col))
def test_insert(self):
sw = SqlWrapper()
records = [
{ 'pk': 0,
'testchr': 'Hello world !',
'testtext': 'Wow ! Super text... I\'m amazed',
'testbool': False
},
{ 'pk': 1,
'testchr': 'Hello"world...--',
'testtext': 'Another wonderfull text. But this time with spécials chars@;,:--*/+\'{}]{[|~&ù^$*µ$£ê;<ç>\/*-+',
'testbool': True
},
{ 'pk': 2 }, #default values for others
{ 'pk': '3',
'testchr': None,
'testtext': None,
'testbool': 'true'
},
{ 'pk': 4,
'testchr': '',
'testtext': '',
'testbool': 'false'
},
{ 'pk': 5,
'testbool': 0
},
{ 'pk': 6,
'testbool': 1
},
{ 'pk':1024,
'testbool': False
},
]

View file

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.