diff --git a/Database/sqlwrapper.py b/Database/sqlwrapper.py index b03e61a..9d8f37a 100644 --- a/Database/sqlwrapper.py +++ b/Database/sqlwrapper.py @@ -219,6 +219,13 @@ class SqlWrapper(object): def get_querystring(self, action, dialect): + """ Renvoit un modèle de requete selon un dialect et une action + + @param action str: Peut être 'add_column' 'alter_column' 'drop_column' + @param dialect str: Pour le moment seul 'default', 'mysql' et 'postgresql' sont supportés + + @return str: Un modèle de requète + """ string_dialect = dialect if dialect in sqlsettings.querystrings[action] else 'default' querystring = sqlsettings.querystrings[action][string_dialect] return querystring diff --git a/Database/test/__init__.py b/Database/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Database/test/test_sqlwrapper.py b/Database/test/test_sqlwrapper.py new file mode 100644 index 0000000..16914c0 --- /dev/null +++ b/Database/test/test_sqlwrapper.py @@ -0,0 +1,519 @@ +import os +import logging +import random + +from unittest import TestCase +from unittest.mock import MagicMock, Mock, patch, call +import unittest + +import sqlalchemy +from Database.sqlwrapper import SqlWrapper + +from django.conf import settings +from Database.sqlsettings import SQLSettings + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings") + +BADNAMES = ['', " \t ", "bad--", "%", "*", "`", '"', "'", "\0", "hello`World", 'Hello"world', 'foo%', '*bar.*', '%%', "hello\0world", print, 42 ] + +for c in ('*','%','`', "'", '"', "\\"): + for _ in range(16): + c = "\\"+c + BADNAMES.append(c) +BADNAMES+=[chr(i) for i in list(range(0,0x09))+[0x0b, 0x0c]+list(range(0x0e, 0x01f))] + +class SqlWrapperTests(TestCase): + + def setUp(self): + #Overwriting database conf + SQLSettings.DB_READ_CONNECTION_NAME = 'testdb' + SQLSettings.DB_WRITE_CONNECTION_NAME = 'testdb' + + + self.testdb = os.path.join('/tmp/', 'lodel2_testdb.sqlite3') + + settings.DATABASES['testdb'] = { + 'ENGINE': 'sqlite', + 'NAME': self.testdb, + } + #Disable logging but CRITICAL + logging.basicConfig(level=logging.CRITICAL) + pass + + def tearDown(self): + try: + os.unlink(self.testdb) #Removing the test database + except FileNotFoundError: pass + + """ Testing standart instanciation of sqlwrapper """ + def test_init_sqlwrapper(self): + sw = SqlWrapper() + self.assertIsInstance(sw, SqlWrapper) + sw2 = SqlWrapper() + self.assertIsInstance(sw2, SqlWrapper) + + def test_get_engine(self): + sw = SqlWrapper() + engine = sw.get_engine(SQLSettings.DB_READ_CONNECTION_NAME) + self.assertIsInstance(engine, sqlalchemy.engine.base.Engine) + + def test_get_engine_badargs(self): + sw = SqlWrapper() + with self.assertRaises(Exception): + sw.get_engine(0) + with self.assertRaises(Exception): + sw.get_engine('') + with self.assertRaises(Exception): + sw.get_engine(' ') + + + @patch.object(SqlWrapper, 'get_read_engine') + def test_execute_queries(self, mock_read): + queries = [ 'SELECT * FROM foo', 'SELECT * FROM bar', 'SELECT foo.id, bar.id FROM foo, bar WHERE foo.id = 42 AND bar.name = \'hello world !\'' ] + + mock_read.return_value = MagicMock() + mock_engine = mock_read.return_value + mock_engine.connect.return_value = MagicMock() + mock_conn = mock_engine.connect.return_value + sw = SqlWrapper() + + #One query execution + sw.execute(queries[0], 'read') + mock_conn.execute.assert_called_with(queries[0]) + mock_conn.reset_mock() + + #multiple queries execution + sw.execute(queries, 'read') + + except_calls = [ call(q) for q in queries ] + self.assertEqual(except_calls, mock_conn.execute.mock_calls) + + @patch.object(sqlalchemy.ForeignKeyConstraint, '__init__') + def test_create_fk_constraint_mock(self, mock_fk): + mock_fk.return_value = None + + sw = SqlWrapper() + sw.create_foreign_key_constraint_object('foo','bar', 'foobar') + mock_fk.asser_called_with(['foo'], ['bar'], 'foobar') + + def test_create_fk_constraint(self): + sw = SqlWrapper() + fk = sw.create_foreign_key_constraint_object('foo', 'bar', 'foobar') + self.assertIsInstance(fk, sqlalchemy.ForeignKeyConstraint) + + @unittest.skip('Dev') #TODO remove it + def test_create_fk_constraint_badargs(self): + sw = SqlWrapper() + + + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(['foo', 'bar'], 'foofoo', 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('bar', 2, 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(None, 'foo', 'bar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('bar', None, 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(None, None, 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(print, 'foo', 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('foo', print, 'babar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('foo', 'bar', print) + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('bar', 'foo', 42) + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('bar', 'foo', ' ') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(" \t ", 'foo') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('bar', 'foo', " \t ") + + foocol = sqlalchemy.Column('foo',sqlalchemy.INTEGER) + + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(foocol, foocol) + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object(foocol, 'bar') + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('foo', foocol) + with self.assertRaises(Exception): + sw.create_foreign_key_constraint_object('foo', 'bar', foocol) + + @patch.object(sqlalchemy.Column, '__init__') + def test_create_column(self, mockcol): + mockcol.return_value = None + sw = SqlWrapper() + foo = sw.create_column_object('foo', 'INTEGER') + mockcol.assert_called_once_with('foo', sqlalchemy.INTEGER) + mockcol.reset_mock() + + foo = sw.create_column_object('foo', 'VARCHAR(50)') + self.assertEqual(mockcol.call_args[0][0], 'foo') + self.assertIsInstance(mockcol.call_args[0][1], sqlalchemy.VARCHAR) + self.assertEqual(mockcol.call_args[0][1].length, 50) + mockcol.reset_mock() + + foo = sw.create_column_object('foo', 'TEXT') + mockcol.assert_called_once_with('foo', sqlalchemy.TEXT) + mockcol.reset_mock() + + foo = sw.create_column_object('foo', 'DATE') + mockcol.assert_called_once_with('foo', sqlalchemy.DATE) + mockcol.reset_mock() + + foo = sw.create_column_object('foo', 'BOOLEAN') + mockcol.assert_called_once_with('foo', sqlalchemy.BOOLEAN) + mockcol.reset_mock() + + xtrmlongname = '' + for _ in range(200): + xtrmlongname += 'veryvery' + xtrmlongname += 'longname' + + foo = sw.create_column_object(xtrmlongname, 'TEXT') + mockcol.assert_called_once_with(xtrmlongname, sqlalchemy.TEXT) + mockcol.reset_mock() + + def test_create_column_extra_fk(self): + sw = SqlWrapper() + + extra = { 'foreignkey': 'bar' } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + fk = rescol.foreign_keys.pop() + self.assertIsInstance(fk, sqlalchemy.ForeignKey) + self.assertEqual(fk._colspec, 'bar') + + def test_create_column_extra_default(self): + sw = SqlWrapper() + + + extra = { 'default': None, 'nullable': True } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, None) + + extra = { 'default': "NULL", 'nullable': True } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, "NULL") + + extra = { 'default': 'null', 'nullable': True } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 'null') + + extra = { 'default': 42 } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 42) + + extra = { 'default': 'foobardefault' } + rescol = sw.create_column_object('foo', 'VARCHAR(50)', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 'foobardefault') + + extra = { 'default': 'foodefault' } + rescol = sw.create_column_object('foo', 'TEXT', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 'foodefault') + + extra = { 'default': True } + rescol = sw.create_column_object('foo', 'BOOLEAN', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, True) + + extra = { 'default': False } + rescol = sw.create_column_object('foo', 'BOOLEAN', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, False) + + extra = { 'default': "true" } + rescol = sw.create_column_object('foo', 'BOOLEAN', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, "true") + + extra = { 'default': 0 } + rescol = sw.create_column_object('foo', 'BOOLEAN', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 0) + + extra = { 'default': 1 } + rescol = sw.create_column_object('foo', 'BOOLEAN', extra) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 1) + + + def test_create_column_extra_pknull(self): + sw = SqlWrapper() + + for b in (True,False): + extra = { 'primarykey': b } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + self.assertEqual(rescol.primary_key, b) + + extra = { 'nullable': b } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + self.assertEqual(rescol.nullable, b) + + extra = { 'primarykey' : b, 'nullable': not b } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + self.assertEqual(rescol.primary_key, b) + self.assertEqual(rescol.nullable, not b) + + extra = { 'primarykey' : b, 'nullable': b } + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + self.assertEqual(rescol.primary_key, b) + self.assertEqual(rescol.nullable, b) + + + + def test_create_column_extra_all(self): + sw = SqlWrapper() + + extra = { 'primarykey': False, 'nullable': False, 'default':42, 'foreignkey': 'foobar'} + rescol = sw.create_column_object('foo', 'INTEGER', extra) + self.assertIsInstance(rescol, sqlalchemy.Column) + self.assertEqual(rescol.primary_key, False) + self.assertEqual(rescol.nullable, False) + self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault) + self.assertEqual(rescol.default.arg, 42) + fk = rescol.foreign_keys.pop() + self.assertIsInstance(fk, sqlalchemy.ForeignKey) + self.assertEqual(fk._colspec, 'foobar') + + @unittest.skip('Dev') #TODO remove it + def test_create_column_badargs(self): + sw = SqlWrapper() + + cc = sw.create_column_object + ain = self.assertIsNone + + for bname in BADNAMES: + ain(cc(bname, 'INTEGER')) + + ain(cc('c', 'INNNTEGER')) + ain(cc(" \t\t ", 'TEXT')) + ain(cc('c', ' ')) + ain(cc('c', 'VARCHAR(foo)')) + ain(cc('supercol', 'SUPERNOTTYPEDVARCHARSTR')) + + ain(cc('c', None)) + ain(cc(None, None)) + + @unittest.skip('Dev') #TODO remove it + def test_create_column_badextra(self): + sw = SqlWrapper() + + cc = sw.create_column_object + ain = self.assertIsNone + + #Put junk in extra datas + for xtra_name in [ 'primarykey', 'nullable', 'foreignkey' ]: + for junk in [ print, sqlalchemy, SqlWrapper, ' ' ]+BADNAMES: + ain(cc('c', 'TEXT', { xtra_name: junk })) + + for junk in [True, False, 52, ' ']+BADNAMES: + ain(cc('c', 'TEXT', { 'foreignkey': junk })) + + for xtra_name in ('primarykey', 'nullalble'): + for junk in [4096, 'HELLO WORLD !', ' ']+BADNAMES: + ain(cc('c', 'TEXT', { xtra_name, junk } )) + + @patch.object(sqlalchemy.Table, '__init__') + def test_create_table(self, mock_table): + #TODO check constraints + """ Create tables and check that the names are ok """ + mock_table.return_value = None + sw = SqlWrapper() + + cols = [ + { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } }, + { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} }, + { 'name': 'col2', 'type': 'TEXT' } + ] + + for table_name in ['supertable', 'besttableever', 'foo-table']: + + params = { 'name': table_name, 'columns': cols, 'constraints':dict() } + + self.assertTrue(sw.create_table(params)) + self.assertEqual(mock_table.call_args[0][0], table_name) + pass + + @patch.object(sqlalchemy.Table, 'append_column') + def test_create_table_col(self, mock_append): + #TODO check constraints + """ Create a table and check that the columns are OK """ + sw = SqlWrapper() + + table_name = 'supertablecol' + + cols = [ + { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } }, + { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} }, + { 'name': 'col2', 'type': 'TEXT' } + ] + + except_coltype = [ sqlalchemy.INTEGER, sqlalchemy.INTEGER, sqlalchemy.TEXT ] + + params = { 'name': table_name, 'columns': cols, 'constraints':dict() } + + sw.create_table(params) #This call return false because of the mock on append_column + + self.assertEqual(len(mock_append.mock_calls), len(cols)) + + for n,c in enumerate(mock_append.mock_calls): + self.assertEqual(c[1][0].name, cols[n]['name']) + self.assertIsInstance(c[1][0].type, except_coltype[n]) + pass + + def test_create_table_recreate(self): + #TODO check constraints + """ Try to create the same table 2 times (except a False return the second time) """ + sw = SqlWrapper() + + params = { 'name': 'redondant', 'columns': [{'name':'foocol', 'type': 'INTEGER'}]} + + self.assertTrue(sw.create_table(params)) + + params['columns'] = [{'name':'barcol', 'type': 'INTEGER'}] + + self.assertFalse(sw.create_table(params)) + + @unittest.skip('dev') #TODO remove it + def test_create_table_badargs(self): + sw = SqlWrapper() + + af = self.assertFalse + ct = sw.create_table + + foocol = {'name': 'foocol', 'type': 'INTEGER'} + + p = {'name': 'foo'} + af(ct(p)) #no columns + for bname in BADNAMES: #bad column name + p['columns'] = bname + af(ct(p)) + p['columns']=[]; af(ct(p)) #empty columns TODO Or return True is normal ??? + + + p = {'columns':[foocol]} + af(ct(p)) #no name + for bname in BADNAMES: + p['name'] = bname + af(ct(p)) + pass + + + def create_test_table(self, sw): + """ Create a table for test purpose """ + table_name = 'testtable' + cols = [ + { 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} }, + { 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} }, + { 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} }, + ] + + sw.create_table( { 'name': table_name, 'columns': cols} ) + pass + + def test_get_table(self): + """ Try to get the testtable (check the name and type of return) """ + sw = SqlWrapper() + + self.create_test_table(sw) + + rt = sw.get_table('testtable') + self.assertIsInstance(rt, sqlalchemy.Table) + self.assertEqual(rt.name, 'testtable') + + rt = sw.get_table('testtable', 'write') + self.assertIsInstance(rt, sqlalchemy.Table) + self.assertEqual(rt.name, 'testtable') + + rt = sw.get_table('testtable', 'read') + self.assertIsInstance(rt, sqlalchemy.Table) + self.assertEqual(rt.name, 'testtable') + pass + + @unittest.skip('dev') #TODO remove skip + def test_get_table_badargs(self): + sw = SqlWrapper() + + self.create_test_table(sw) + + for badname in BADNAMES: + with self.assertRaises((sqlalchemy.exc.NoSuchTableError, Exception)): + sw.get_table(badname) + + with self.assertRaises(sqlalchemy.exc.NoSuchTableError): + sw.get_table('FOOBAR') + with self.assertRaises(Exception): + sw.get_table(print) + with self.assertRaises(Exception): + sw.get_table(1) + + #bad action types + with self.assertRaises(Exception): + sw.get_table('testtable', print) + with self.assertRaises(Exception): + sw.get_table('testtable', 'foobar') + with self.assertRaises(Exception): + sw.get_table('testtable', 42) + pass + + def test_drop_table(self): + sw = SqlWrapper() + + self.create_test_table(sw) + + self.assertTrue(sw.drop_table('testtable')) + self.assertFalse(sw.drop_table('testtable')) + self.assertFalse(sw.drop_table('nonexisting')) + pass + + def test_drop_table_badargs(self): + sw = SqlWrapper() + + self.create_test_table(sw) + + af = self.assertFalse + + for bname in BADNAMES: + self.assertFalse(sw.drop_table(bname)) + + def test_create_get_drop_table(self): + sw = SqlWrapper() + + funkynames = ('standart', "wow-nice", "-really-", "test_test-test", "_test", "test_", "test42", "foobar_123", "foobar-123", "foofoo--babar") + types = ['INTEGER', 'VARCHAR(5)', 'VARCHAR(128)', 'TEXT', 'BOOLEAN'] + params = dict() + + cols = [] + for i,name in enumerate(funkynames): + cols.append({'name':name, 'type':types[i%len(types)]}) + params['columns'] = cols + + + for name in random.sample(funkynames, len(funkynames)): + params['name'] = name + self.assertTrue(sw.create_table(params)) + + for name in random.sample(funkynames, len(funkynames)): + rt = sw.get_table(name) + self.assertIsInstance(rt, sqlalchemy.Table)#do we get a table ? + self.assertEqual(rt.name, name) + + for name in random.sample(funkynames, len(funkynames)): + self.assertTrue(sw.drop_table(name)) + pass + + diff --git a/EditorialModel/lib/component.py b/EditorialModel/lib/component.py index 6a46b6b..7efe869 100644 --- a/EditorialModel/lib/component.py +++ b/EditorialModel/lib/component.py @@ -17,7 +17,6 @@ class EmComponent(object): @exception TypeError """ def __init__(self, id_or_name): - logger.debug('Instanciation : '+str(id_or_name)) if self is EmComponent: raise EnvironmentError('Abstract class') if type(id_or_name) is int: diff --git a/runtest b/runtest index a53191b..b229768 100755 --- a/runtest +++ b/runtest @@ -1,3 +1,5 @@ #!/bin/bash +export DJANGO_SETTINGS_MODULE="Lodel.settings" + python -m unittest