Browse Source

Cleaning and changin some methods name in sqlwrapper

Yann Weber 9 years ago
parent
commit
8fc072f6ee

+ 27
- 45
Database/sqlwrapper.py View File

@@ -62,7 +62,7 @@ class SqlWrapper(object):
62 62
         self.r_dbconf = read_db
63 63
         self.w_dbconf = write_db
64 64
 
65
-        self.checkConf() #raise if errors in configuration
65
+        self._checkConf() #raise if errors in configuration
66 66
 
67 67
         if self.name in self.__class__.wrapinstance:
68 68
             logger.warning("A SqlWrapper with the name "+self.name+" allready exist. Replacing the old one by the new one")
@@ -82,9 +82,12 @@ class SqlWrapper(object):
82 82
         pass
83 83
 
84 84
     @property
85
-    def cfg(self): return self.__class__.config;
85
+    def cfg(self):
86
+        """ Return the SqlWrapper.config dict """
87
+        return self.__class__.config;
86 88
     @property
87
-    def engines_cfg(self): return self.__class__.ENGINES;
89
+    def _engines_cfg(self):
90
+        return self.__class__.ENGINES;
88 91
 
89 92
     @property
90 93
     def meta(self):
@@ -104,18 +107,19 @@ class SqlWrapper(object):
104 107
         """ Return the read connection
105 108
             @warning Do not store the connection, call this method each time you need it
106 109
         """
107
-        return self.getConnection(True)
110
+        return self._getConnection(True)
108 111
     @property
109 112
     def wconn(self):
110 113
         """ Return the write connection
111 114
             @warning Do not store the connection, call this method each time you need it
112 115
         """
113
-        return self.getConnection(False)
116
+        return self._getConnection(False)
114 117
 
115
-    def getConnection(self, read):
118
+    def _getConnection(self, read):
116 119
         """ Return an opened connection
117 120
             @param read bool: If true return the reading connection
118 121
             @return A sqlAlchemy db connection
122
+            @private
119 123
         """
120 124
         if read:
121 125
             r = self.r_conn
@@ -125,7 +129,7 @@ class SqlWrapper(object):
125 129
         if r == None:
126 130
             #Connection not yet opened
127 131
             self.connect(read)
128
-            r = self.getConnection(read) #TODO : Un truc plus safe/propre qu'un appel reccursif ?
132
+            r = self._getConnection(read) #TODO : Un truc plus safe/propre qu'un appel reccursif ?
129 133
         return r
130 134
 
131 135
 
@@ -175,6 +179,9 @@ class SqlWrapper(object):
175 179
 
176 180
     @classmethod
177 181
     def reconnectAll(c, read = None):
182
+        """ Reconnect all the wrappers 
183
+            @static
184
+        """
178 185
         for wname in c.wrapinstance:
179 186
             c.wrapinstance[wname].reconnect(read)
180 187
             
@@ -199,7 +206,7 @@ class SqlWrapper(object):
199 206
         #Loading confs
200 207
         cfg = self.cfg['db'][self.r_dbconf if read else self.w_dbconf]
201 208
 
202
-        edata = self.engines_cfg[cfg['ENGINE']] #engine infos
209
+        edata = self._engines_cfg[cfg['ENGINE']] #engine infos
203 210
         conn_str = ""
204 211
 
205 212
         if cfg['ENGINE'] == 'sqlite':
@@ -242,7 +249,7 @@ class SqlWrapper(object):
242 249
             raise KeyError("No wrapper named '"+name+"' exists")
243 250
         return c.wrapinstance[name]
244 251
 
245
-    def checkConf(self):
252
+    def _checkConf(self):
246 253
         """ Class method that check the configuration
247 254
             
248 255
             Configuration looks like
@@ -269,7 +276,7 @@ class SqlWrapper(object):
269 276
                     if 'ENGINE' not in db:
270 277
                         err.append('Missing "ENGINE" in database "'+db+'"')
271 278
                     else:
272
-                        if db['ENGINE'] not in self.engines_cfg:
279
+                        if db['ENGINE'] not in self._engines_cfg:
273 280
                             err.append('Unknown engine "'+db['ENGINE']+'"')
274 281
                         elif db['ENGINE'] != 'sqlite' and 'USER' not in db:
275 282
                             err.append('Missing "User" in configuration of database "'+dbname+'"')
@@ -300,6 +307,8 @@ class SqlWrapper(object):
300 307
 
301 308
         logger.info("Running function createAllFromConf")
302 309
         for i,table in enumerate(schema):
310
+            if not isinstance(table, dict):
311
+                raise TypeError("Excepted a list of dict but got a "+str(type(schema))+" in the list")
303 312
             self.createTable(**table)
304 313
 
305 314
         self.meta_crea.create_all(bind = self.w_engine)
@@ -370,7 +379,6 @@ class SqlWrapper(object):
370 379
             kwargs['primary_key'] = kwargs['primarykey']
371 380
             del kwargs['primarykey']
372 381
 
373
-
374 382
         res = sqla.Column(**kwargs)
375 383
 
376 384
         if fk != None:
@@ -397,29 +405,6 @@ class SqlWrapper(object):
397 405
         column_length = int(check_length.groups()[0]) if check_length else None
398 406
         return sqla.VARCHAR(length=column_length)
399 407
      
400
-    @classmethod
401
-    def engineFamily(c, engine):
402
-        """ Given an engine return the db family
403
-            @see SqlWrapper::ENGINES
404
-            @return A str or None
405
-        """
406
-        for fam in c.ENGINES:
407
-            if engine.driver == c.ENGINES[fam]['driver']:
408
-                return fam
409
-        return None
410
-
411
-    @property
412
-    def wEngineFamily(self):
413
-        """ Return the db family of the write engine
414
-            @return a string or None
415
-        """
416
-        return self.__class__.engineFamily(self.w_engine)
417
-    @property   
418
-    def rEngineFamily(self):
419
-        """ Return the db family of the read engine
420
-            @return a string or None
421
-        """
422
-        return self.__class__.engineFamily(self.r_engine)
423 408
 
424 409
     def dropColumn(self, tname, colname):
425 410
         """ Drop a column from a table
@@ -450,10 +435,10 @@ class SqlWrapper(object):
450 435
 
451 436
             @return True if query success False if it fails
452 437
         """
453
-        newcol = self.createColumn(name=colname, type_ = coltype)
454 438
         if tname not in self.meta.tables: #Useless ?
455 439
             raise NameError("The table '"+tname+"' dont exist")
456 440
         table = self.Table(tname)
441
+        newcol = self.createColumn(name=colname, type_ = coltype)
457 442
 
458 443
         ddl = AddColumn(table, newcol)
459 444
         sql = ddl.compile(dialect=self.w_engine.dialect)
@@ -473,20 +458,17 @@ class SqlWrapper(object):
473 458
             @return True if query successs False if it fails
474 459
         """
475 460
         
476
-        if self.wEngineFamily == 'sqlite':
477
-            raise NotImplementedError('AlterColumn not yet implemented for sqlite engines')
478
-            
461
+        if tname not in self.meta.tables: #Useless ?
462
+            raise NameError("The table '"+tname+"' dont exist")
479 463
 
480 464
         col = self.createColumn(name=colname, type_=col_newtype)
481 465
         table = self.Table(tname)
482 466
 
483
-        typepref = 'TYPE ' if self.wEngineFamily == 'postgresql' else ''
484
-
485
-        query = 'ALTER TABLE %s ALTER COLUMN %s %s'%(table.name, col.name, typepref+col.type)
486
-
487
-        logger.debug("Executing SQL : '"+query+"'")
488
-
489
-        ret = bool(self.wconn.execute(query))
467
+        ddl = AlterColumn(table, newcol)
468
+        sql = ddl.compile(dialect=self.w_engine.dialect)
469
+        sql = str(sql)
470
+        logger.debug("Executing SQL  : '"+sql+"'")
471
+        ret = bool(self.wconn.execute(sql))
490 472
 
491 473
         self.renewMetaData()
492 474
         return ret

+ 0
- 0
Database/test/__init__.py View File


+ 0
- 519
Database/test/test_sqlwrapper.py View File

@@ -1,519 +0,0 @@
1
-import os
2
-import logging
3
-import random
4
-
5
-from unittest import TestCase
6
-from unittest.mock import MagicMock, Mock, patch, call
7
-import unittest
8
-
9
-import sqlalchemy
10
-from Database.sqlwrapper import SqlWrapper
11
-
12
-from django.conf import settings
13
-from Database.sqlsettings import SQLSettings
14
-
15
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
16
-
17
-BADNAMES = ['',  "  \t  ", "bad--", "%", "*", "`", '"', "'", "\0", "hello`World", 'Hello"world', 'foo%', '*bar.*', '%%', "hello\0world", print, 42 ]
18
-
19
-for c in ('*','%','`', "'", '"', "\\"):
20
-    for _ in range(16):
21
-        c = "\\"+c
22
-        BADNAMES.append(c)
23
-BADNAMES+=[chr(i) for i in list(range(0,0x09))+[0x0b, 0x0c]+list(range(0x0e, 0x01f))]
24
-
25
-class SqlWrapperTests(TestCase):
26
-
27
-    def setUp(self):
28
-        #Overwriting database conf
29
-        SQLSettings.DB_READ_CONNECTION_NAME = 'testdb'
30
-        SQLSettings.DB_WRITE_CONNECTION_NAME = 'testdb'
31
-        
32
-
33
-        self.testdb = os.path.join('/tmp/', 'lodel2_testdb.sqlite3')
34
-
35
-        settings.DATABASES['testdb'] = {
36
-            'ENGINE': 'sqlite',
37
-            'NAME': self.testdb,
38
-        }
39
-        #Disable logging but CRITICAL
40
-        logging.basicConfig(level=logging.CRITICAL)
41
-        pass
42
-
43
-    def tearDown(self):
44
-        try:
45
-            os.unlink(self.testdb) #Removing the test database
46
-        except FileNotFoundError: pass
47
-    
48
-    """ Testing standart instanciation of sqlwrapper """
49
-    def test_init_sqlwrapper(self):
50
-        sw = SqlWrapper()
51
-        self.assertIsInstance(sw, SqlWrapper)
52
-        sw2 = SqlWrapper()
53
-        self.assertIsInstance(sw2, SqlWrapper)
54
-
55
-    def test_get_engine(self):
56
-        sw = SqlWrapper()
57
-        engine = sw.get_engine(SQLSettings.DB_READ_CONNECTION_NAME)
58
-        self.assertIsInstance(engine, sqlalchemy.engine.base.Engine)
59
-    
60
-    def test_get_engine_badargs(self):
61
-        sw = SqlWrapper()
62
-        with self.assertRaises(Exception):
63
-            sw.get_engine(0)
64
-        with self.assertRaises(Exception):
65
-            sw.get_engine('')
66
-        with self.assertRaises(Exception):
67
-            sw.get_engine('           ')
68
-
69
-    
70
-    @patch.object(SqlWrapper, 'get_read_engine')
71
-    def test_execute_queries(self, mock_read):
72
-        queries = [ 'SELECT * FROM foo', 'SELECT * FROM bar', 'SELECT foo.id, bar.id FROM foo, bar WHERE foo.id = 42 AND bar.name = \'hello world !\'' ]
73
-
74
-        mock_read.return_value = MagicMock()
75
-        mock_engine = mock_read.return_value
76
-        mock_engine.connect.return_value = MagicMock()
77
-        mock_conn = mock_engine.connect.return_value
78
-        sw = SqlWrapper()
79
-
80
-        #One query execution
81
-        sw.execute(queries[0], 'read')
82
-        mock_conn.execute.assert_called_with(queries[0])
83
-        mock_conn.reset_mock()
84
-
85
-        #multiple queries execution
86
-        sw.execute(queries, 'read')
87
-
88
-        except_calls = [ call(q) for q in queries ]
89
-        self.assertEqual(except_calls, mock_conn.execute.mock_calls)
90
-
91
-    @patch.object(sqlalchemy.ForeignKeyConstraint, '__init__')
92
-    def test_create_fk_constraint_mock(self, mock_fk):
93
-        mock_fk.return_value = None
94
-
95
-        sw = SqlWrapper()
96
-        sw.create_foreign_key_constraint_object('foo','bar', 'foobar')
97
-        mock_fk.asser_called_with(['foo'], ['bar'], 'foobar')
98
-
99
-    def test_create_fk_constraint(self):
100
-        sw = SqlWrapper()
101
-        fk = sw.create_foreign_key_constraint_object('foo', 'bar', 'foobar')
102
-        self.assertIsInstance(fk, sqlalchemy.ForeignKeyConstraint)
103
-        
104
-    @unittest.skip('Dev') #TODO remove it
105
-    def test_create_fk_constraint_badargs(self):
106
-        sw = SqlWrapper()
107
-
108
-        
109
-        with self.assertRaises(Exception):
110
-            sw.create_foreign_key_constraint_object(['foo', 'bar'], 'foofoo', 'babar')
111
-        with self.assertRaises(Exception):
112
-            sw.create_foreign_key_constraint_object('bar', 2, 'babar')
113
-        with self.assertRaises(Exception):
114
-            sw.create_foreign_key_constraint_object(None, 'foo', 'bar')
115
-        with self.assertRaises(Exception):
116
-            sw.create_foreign_key_constraint_object('bar', None, 'babar')
117
-        with self.assertRaises(Exception):
118
-            sw.create_foreign_key_constraint_object(None, None, 'babar')
119
-        with self.assertRaises(Exception):
120
-            sw.create_foreign_key_constraint_object(print, 'foo', 'babar')
121
-        with self.assertRaises(Exception):
122
-            sw.create_foreign_key_constraint_object('foo', print, 'babar')
123
-        with self.assertRaises(Exception):
124
-            sw.create_foreign_key_constraint_object('foo', 'bar', print)
125
-        with self.assertRaises(Exception):
126
-            sw.create_foreign_key_constraint_object('bar', 'foo', 42)
127
-        with self.assertRaises(Exception):
128
-            sw.create_foreign_key_constraint_object('bar', 'foo', '   ')
129
-        with self.assertRaises(Exception):
130
-            sw.create_foreign_key_constraint_object(" \t  ", 'foo')
131
-        with self.assertRaises(Exception):
132
-            sw.create_foreign_key_constraint_object('bar', 'foo', "  \t ")
133
-
134
-        foocol = sqlalchemy.Column('foo',sqlalchemy.INTEGER)
135
-
136
-        with self.assertRaises(Exception):
137
-            sw.create_foreign_key_constraint_object(foocol, foocol)
138
-        with self.assertRaises(Exception):
139
-            sw.create_foreign_key_constraint_object(foocol, 'bar')
140
-        with self.assertRaises(Exception):
141
-            sw.create_foreign_key_constraint_object('foo', foocol)
142
-        with self.assertRaises(Exception):
143
-            sw.create_foreign_key_constraint_object('foo', 'bar', foocol)
144
-    
145
-    @patch.object(sqlalchemy.Column, '__init__')
146
-    def test_create_column(self, mockcol):
147
-        mockcol.return_value = None
148
-        sw = SqlWrapper()
149
-        foo = sw.create_column_object('foo', 'INTEGER')
150
-        mockcol.assert_called_once_with('foo', sqlalchemy.INTEGER)
151
-        mockcol.reset_mock()
152
-
153
-        foo = sw.create_column_object('foo', 'VARCHAR(50)')
154
-        self.assertEqual(mockcol.call_args[0][0], 'foo')
155
-        self.assertIsInstance(mockcol.call_args[0][1], sqlalchemy.VARCHAR)
156
-        self.assertEqual(mockcol.call_args[0][1].length, 50)
157
-        mockcol.reset_mock()
158
-
159
-        foo = sw.create_column_object('foo', 'TEXT')
160
-        mockcol.assert_called_once_with('foo', sqlalchemy.TEXT)
161
-        mockcol.reset_mock()
162
-
163
-        foo = sw.create_column_object('foo', 'DATE')
164
-        mockcol.assert_called_once_with('foo', sqlalchemy.DATE)
165
-        mockcol.reset_mock()
166
-
167
-        foo = sw.create_column_object('foo', 'BOOLEAN')
168
-        mockcol.assert_called_once_with('foo', sqlalchemy.BOOLEAN)
169
-        mockcol.reset_mock()
170
- 
171
-        xtrmlongname = ''
172
-        for _ in range(200):
173
-            xtrmlongname += 'veryvery'
174
-        xtrmlongname += 'longname'
175
-
176
-        foo = sw.create_column_object(xtrmlongname, 'TEXT')
177
-        mockcol.assert_called_once_with(xtrmlongname, sqlalchemy.TEXT)
178
-        mockcol.reset_mock()
179
-
180
-    def test_create_column_extra_fk(self):
181
-        sw = SqlWrapper()
182
-
183
-        extra = { 'foreignkey': 'bar' }
184
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
185
-        self.assertIsInstance(rescol, sqlalchemy.Column)
186
-        fk = rescol.foreign_keys.pop()
187
-        self.assertIsInstance(fk, sqlalchemy.ForeignKey)
188
-        self.assertEqual(fk._colspec, 'bar')
189
-
190
-    def test_create_column_extra_default(self):
191
-        sw = SqlWrapper()
192
-
193
-
194
-        extra = { 'default': None, 'nullable': True }
195
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
196
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
197
-        self.assertEqual(rescol.default.arg, None)
198
-
199
-        extra = { 'default': "NULL", 'nullable': True }
200
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
201
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
202
-        self.assertEqual(rescol.default.arg, "NULL")
203
-
204
-        extra = { 'default': 'null', 'nullable': True }
205
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
206
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
207
-        self.assertEqual(rescol.default.arg, 'null')
208
-
209
-        extra = { 'default': 42 }
210
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
211
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
212
-        self.assertEqual(rescol.default.arg, 42)
213
-
214
-        extra = { 'default': 'foobardefault' }
215
-        rescol = sw.create_column_object('foo', 'VARCHAR(50)', extra)
216
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
217
-        self.assertEqual(rescol.default.arg, 'foobardefault')
218
-
219
-        extra = { 'default': 'foodefault' }
220
-        rescol = sw.create_column_object('foo', 'TEXT', extra)
221
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
222
-        self.assertEqual(rescol.default.arg, 'foodefault')
223
-
224
-        extra = { 'default': True }
225
-        rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
226
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
227
-        self.assertEqual(rescol.default.arg, True)
228
-
229
-        extra = { 'default': False }
230
-        rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
231
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
232
-        self.assertEqual(rescol.default.arg, False)
233
-
234
-        extra = { 'default': "true" }
235
-        rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
236
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
237
-        self.assertEqual(rescol.default.arg, "true")
238
-
239
-        extra = { 'default': 0 }
240
-        rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
241
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
242
-        self.assertEqual(rescol.default.arg, 0)
243
-
244
-        extra = { 'default': 1 }
245
-        rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
246
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
247
-        self.assertEqual(rescol.default.arg, 1)
248
-
249
-
250
-    def test_create_column_extra_pknull(self):
251
-        sw = SqlWrapper()
252
-
253
-        for b in (True,False):
254
-            extra = { 'primarykey': b }
255
-            rescol = sw.create_column_object('foo', 'INTEGER', extra)
256
-            self.assertIsInstance(rescol, sqlalchemy.Column)
257
-            self.assertEqual(rescol.primary_key, b)
258
-
259
-            extra = { 'nullable': b }
260
-            rescol = sw.create_column_object('foo', 'INTEGER', extra)
261
-            self.assertIsInstance(rescol, sqlalchemy.Column)
262
-            self.assertEqual(rescol.nullable, b)
263
-
264
-            extra = { 'primarykey' : b, 'nullable': not b }
265
-            rescol = sw.create_column_object('foo', 'INTEGER', extra)
266
-            self.assertIsInstance(rescol, sqlalchemy.Column)
267
-            self.assertEqual(rescol.primary_key, b)
268
-            self.assertEqual(rescol.nullable, not b)
269
-
270
-            extra = { 'primarykey' : b, 'nullable': b }
271
-            rescol = sw.create_column_object('foo', 'INTEGER', extra)
272
-            self.assertIsInstance(rescol, sqlalchemy.Column)
273
-            self.assertEqual(rescol.primary_key, b)
274
-            self.assertEqual(rescol.nullable, b)
275
-
276
-
277
-
278
-    def test_create_column_extra_all(self):
279
-        sw = SqlWrapper()
280
-
281
-        extra = { 'primarykey': False, 'nullable': False, 'default':42, 'foreignkey': 'foobar'}
282
-        rescol = sw.create_column_object('foo', 'INTEGER', extra)
283
-        self.assertIsInstance(rescol, sqlalchemy.Column)
284
-        self.assertEqual(rescol.primary_key, False)
285
-        self.assertEqual(rescol.nullable, False)
286
-        self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
287
-        self.assertEqual(rescol.default.arg, 42)
288
-        fk = rescol.foreign_keys.pop()
289
-        self.assertIsInstance(fk, sqlalchemy.ForeignKey)
290
-        self.assertEqual(fk._colspec, 'foobar')
291
-
292
-    @unittest.skip('Dev') #TODO remove it
293
-    def test_create_column_badargs(self):
294
-        sw = SqlWrapper()
295
-        
296
-        cc = sw.create_column_object
297
-        ain = self.assertIsNone
298
-        
299
-        for bname in BADNAMES:
300
-            ain(cc(bname, 'INTEGER'))
301
-
302
-        ain(cc('c', 'INNNTEGER'))
303
-        ain(cc(" \t\t  ", 'TEXT'))
304
-        ain(cc('c', '             '))
305
-        ain(cc('c', 'VARCHAR(foo)'))
306
-        ain(cc('supercol', 'SUPERNOTTYPEDVARCHARSTR'))
307
-
308
-        ain(cc('c', None))
309
-        ain(cc(None, None))
310
-
311
-    @unittest.skip('Dev') #TODO remove it
312
-    def test_create_column_badextra(self):
313
-        sw = SqlWrapper()
314
-
315
-        cc = sw.create_column_object
316
-        ain = self.assertIsNone
317
-        
318
-        #Put junk in extra datas
319
-        for xtra_name in [ 'primarykey', 'nullable', 'foreignkey' ]:
320
-            for junk in [ print, sqlalchemy, SqlWrapper, '   ' ]+BADNAMES:
321
-                ain(cc('c', 'TEXT', { xtra_name: junk }))
322
-
323
-        for junk in [True, False, 52, '   ']+BADNAMES:
324
-            ain(cc('c', 'TEXT', { 'foreignkey': junk }))
325
-
326
-        for xtra_name in ('primarykey', 'nullalble'):
327
-            for junk in [4096, 'HELLO WORLD !', '   ']+BADNAMES:
328
-                ain(cc('c', 'TEXT', { xtra_name, junk } ))
329
-
330
-    @patch.object(sqlalchemy.Table, '__init__')
331
-    def test_create_table(self, mock_table):
332
-        #TODO check constraints
333
-        """ Create tables and check that the names are ok """
334
-        mock_table.return_value = None
335
-        sw = SqlWrapper()
336
-        
337
-        cols = [
338
-            { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
339
-            { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
340
-            { 'name': 'col2', 'type': 'TEXT' }
341
-        ]
342
-        
343
-        for table_name in ['supertable', 'besttableever', 'foo-table']:
344
-
345
-            params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
346
-
347
-            self.assertTrue(sw.create_table(params))
348
-            self.assertEqual(mock_table.call_args[0][0], table_name)
349
-        pass
350
-    
351
-    @patch.object(sqlalchemy.Table, 'append_column')
352
-    def test_create_table_col(self, mock_append):
353
-        #TODO check constraints
354
-        """ Create a table and check that the columns are OK """
355
-        sw = SqlWrapper()
356
-        
357
-        table_name = 'supertablecol'
358
-
359
-        cols = [
360
-            { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
361
-            { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
362
-            { 'name': 'col2', 'type': 'TEXT' }
363
-        ]
364
-
365
-        except_coltype = [ sqlalchemy.INTEGER, sqlalchemy.INTEGER, sqlalchemy.TEXT ]
366
-
367
-        params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
368
-    
369
-        sw.create_table(params) #This call return false because of the mock on append_column
370
-        
371
-        self.assertEqual(len(mock_append.mock_calls), len(cols))
372
-
373
-        for n,c in enumerate(mock_append.mock_calls):
374
-            self.assertEqual(c[1][0].name, cols[n]['name'])
375
-            self.assertIsInstance(c[1][0].type, except_coltype[n])
376
-        pass
377
-    
378
-    def test_create_table_recreate(self):
379
-        #TODO check constraints
380
-        """ Try to create the same table 2 times (except a False return the second time) """
381
-        sw = SqlWrapper()
382
-
383
-        params = { 'name': 'redondant', 'columns': [{'name':'foocol', 'type': 'INTEGER'}]}
384
-
385
-        self.assertTrue(sw.create_table(params))
386
-
387
-        params['columns'] = [{'name':'barcol', 'type': 'INTEGER'}]
388
-
389
-        self.assertFalse(sw.create_table(params))
390
-
391
-    @unittest.skip('dev') #TODO remove it
392
-    def test_create_table_badargs(self):
393
-        sw = SqlWrapper()
394
-
395
-        af = self.assertFalse
396
-        ct = sw.create_table
397
-
398
-        foocol = {'name': 'foocol', 'type': 'INTEGER'}
399
-
400
-        p = {'name': 'foo'}
401
-        af(ct(p)) #no columns
402
-        for bname in BADNAMES: #bad column name
403
-            p['columns'] = bname
404
-            af(ct(p))
405
-        p['columns']=[]; af(ct(p)) #empty columns TODO Or return True is normal ???
406
-
407
-
408
-        p = {'columns':[foocol]}
409
-        af(ct(p)) #no name
410
-        for bname in BADNAMES:
411
-            p['name'] = bname
412
-            af(ct(p))
413
-        pass
414
-
415
-    
416
-    def create_test_table(self, sw):
417
-        """ Create a table for test purpose """
418
-        table_name = 'testtable'
419
-        cols = [
420
-            { 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} },
421
-            { 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} },
422
-            { 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} },
423
-        ]
424
-        
425
-        sw.create_table( { 'name': table_name, 'columns': cols} )
426
-        pass
427
-    
428
-    def test_get_table(self):
429
-        """ Try to get the testtable (check the name and type of return) """
430
-        sw = SqlWrapper()
431
-
432
-        self.create_test_table(sw)
433
-
434
-        rt = sw.get_table('testtable')
435
-        self.assertIsInstance(rt, sqlalchemy.Table)
436
-        self.assertEqual(rt.name, 'testtable')
437
-
438
-        rt = sw.get_table('testtable', 'write')
439
-        self.assertIsInstance(rt, sqlalchemy.Table)
440
-        self.assertEqual(rt.name, 'testtable')
441
-
442
-        rt = sw.get_table('testtable', 'read')
443
-        self.assertIsInstance(rt, sqlalchemy.Table)
444
-        self.assertEqual(rt.name, 'testtable')
445
-        pass
446
-
447
-    @unittest.skip('dev') #TODO remove skip
448
-    def test_get_table_badargs(self):
449
-        sw = SqlWrapper()
450
-
451
-        self.create_test_table(sw)
452
-
453
-        for badname in BADNAMES:
454
-            with self.assertRaises((sqlalchemy.exc.NoSuchTableError, Exception)):
455
-                sw.get_table(badname)
456
-
457
-        with self.assertRaises(sqlalchemy.exc.NoSuchTableError):
458
-            sw.get_table('FOOBAR')
459
-        with self.assertRaises(Exception):
460
-            sw.get_table(print)
461
-        with self.assertRaises(Exception):
462
-            sw.get_table(1)
463
-
464
-        #bad action types
465
-        with self.assertRaises(Exception):
466
-            sw.get_table('testtable', print)
467
-        with self.assertRaises(Exception):
468
-            sw.get_table('testtable', 'foobar')
469
-        with self.assertRaises(Exception):
470
-            sw.get_table('testtable', 42)
471
-        pass
472
-
473
-    def test_drop_table(self):
474
-        sw = SqlWrapper()
475
-        
476
-        self.create_test_table(sw)
477
-
478
-        self.assertTrue(sw.drop_table('testtable'))
479
-        self.assertFalse(sw.drop_table('testtable'))
480
-        self.assertFalse(sw.drop_table('nonexisting'))
481
-        pass
482
-
483
-    def test_drop_table_badargs(self):
484
-        sw = SqlWrapper()
485
-
486
-        self.create_test_table(sw)
487
-
488
-        af = self.assertFalse
489
-
490
-        for bname in BADNAMES:
491
-            self.assertFalse(sw.drop_table(bname))
492
-
493
-    def test_create_get_drop_table(self):
494
-        sw = SqlWrapper()
495
-        
496
-        funkynames = ('standart', "wow-nice", "-really-", "test_test-test", "_test", "test_", "test42", "foobar_123", "foobar-123", "foofoo--babar")
497
-        types = ['INTEGER', 'VARCHAR(5)', 'VARCHAR(128)', 'TEXT', 'BOOLEAN']
498
-        params = dict()
499
-
500
-        cols = []
501
-        for i,name in enumerate(funkynames):
502
-            cols.append({'name':name, 'type':types[i%len(types)]})
503
-        params['columns'] = cols
504
-
505
-
506
-        for name in random.sample(funkynames, len(funkynames)):
507
-            params['name'] = name
508
-            self.assertTrue(sw.create_table(params))
509
-
510
-        for name in random.sample(funkynames, len(funkynames)):
511
-            rt = sw.get_table(name)
512
-            self.assertIsInstance(rt, sqlalchemy.Table)#do we get a table ?
513
-            self.assertEqual(rt.name, name)
514
-
515
-        for name in random.sample(funkynames, len(funkynames)):
516
-            self.assertTrue(sw.drop_table(name))
517
-        pass
518
-        
519
-

+ 0
- 175
Database/test/test_sqlwrapper_querystring.py View File

@@ -1,175 +0,0 @@
1
-import os
2
-import logging
3
-import random
4
-
5
-from unittest import TestCase
6
-import unittest
7
-
8
-from Database.sqlwrapper import SqlWrapper
9
-
10
-from django.conf import settings
11
-from Database.sqlsettings import SQLSettings
12
-
13
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
14
-
15
-#Bad strings for injection tests
16
-INJECTIONS = [ 'foo UNION SELECT 1,2,3,4--', "foo' UNION SELECT 1,2,3--", 'foo" UNION SELECT 1,2,3--', 'foo` UNION SELECT 1,2,3,4--', '--', 'foo`; SELECT 1,2,3', 'foo"; SELECT 1,2,3', "foo'; SELECT 1,2,3", "; SELECT 1,2,3" ]
17
-NAMEINJECT = INJECTIONS + [ '%', '*', "\0", "\b\b\b\b\b\b\b\b\b" ]
18
-
19
-#Valid SQL types
20
-VTYPES = [ 'integer', 'varchar(1)', 'varchar(50)', 'text', 'boolean' ]
21
-
22
-class SqlWrapperQueryStrTests(TestCase):
23
-    
24
-    def setUp(self):
25
-        #creating a test table
26
-        sw = SqlWrapper()
27
-        self.ttn = 'testtable'
28
-        self.cols = [
29
-            { 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} },
30
-            { 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} },
31
-            { 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} },
32
-            { 'name': 'testbool', 'type': 'BOOLEAN', 'extra': {'nullable':False, 'default': False}},
33
-        ]
34
-        
35
-        sw.create_table( { 'name': self.ttn, 'columns': self.cols} )
36
-
37
-
38
-        #Disable logging but CRITICAL
39
-        logging.basicConfig(level=logging.CRITICAL)
40
-        pass
41
-
42
-    def tearDown(self):
43
-        sw = SqlWrapper()
44
-        sw.drop_table(self.ttn)
45
-
46
-    @unittest.skip('dev') #TODO remove skip
47
-    def test_get_querystring(self):
48
-        sw = SqlWrapper()
49
-
50
-        actions = [ 'add_column', 'alter_column', 'drop_column' ]
51
-        dialects = [ 'default', 'mysql', 'postgresql' ]
52
-
53
-        for action in actions:
54
-            for dialect in dialects:
55
-                r = sw.get_querystring(action, dialect)
56
-                self.assertIsInstance(r, str)
57
-
58
-    @unittest.skip('dev') #TODO remove skip
59
-    def test_get_querystring_badargs(self):
60
-        sw = SqlWrapper()
61
-        
62
-        actions = [ 1, -1, print, [], 'foo']
63
-        dialects = actions
64
-        for action in actions:
65
-            for dialect in dialects:
66
-                with self.assertRaises(ValueError):
67
-                    r = sw.get_querystring(action, dialect)
68
-    
69
-    @unittest.skip('dev') #TODO remove skip
70
-    def test_add_column(self):
71
-        sw = SqlWrapper()
72
-
73
-        colnames = [ 'taddcol1', 'test-add-col', 'test_add_col', '-test', '_add', '__col__' ]
74
-
75
-        for i, name in enumerate(colnames):
76
-            col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
77
-            self.assertTrue(sw.add_column(self.ttn, col))
78
-        pass
79
-
80
-    @unittest.skip('dev') #TODO remove skip
81
-    def test_add_column_badargs(self):
82
-        sw = SqlWrapper()
83
-
84
-        coolname = 'cool'
85
-        i=0
86
-        self.assertFalse(sw.add_column(self.ttn, {'type': 'INTEGER'}))
87
-        self.assertFalse(sw.add_column(self.ttn, {'name': 'foo'}))
88
-        self.assertFalse(sw.add_column(self.ttn, dict()))
89
-        self.assertFalse(sw.add_column(self.ttn, print))
90
-        self.assertFalse(sw.add_column(self.ttn, ['foo', 'integer']))
91
-        self.assertFalse(sw.add_column(self.ttn, None))
92
-        self.assertFalse(sw.add_column(self.ttn, 42))
93
-        self.assertFalse(sw.add_column(1, {'name':'foo', 'type':'integer'}))
94
-        self.assertFalse(sw.add_column(print, {'name':'foo', 'type':'integer'}))
95
-        self.assertFalse(sw.add_column([], {'name':'foo', 'type':'integer'}))
96
-        self.assertFalse(sw.add_column(dict(), {'name':'foo', 'type':'integer'}))
97
-        for badname in NAMEINJECT:
98
-            self.assertFalse(sw.add_column(self.ttn, {'name':badname, 'type':'INTEGER'}))
99
-            self.assertFalse(sw.add_column(self.ttn, {'name':coolname+str(i), 'type':badname}))
100
-            self.assertFalse(sw.add_column(badname, {'name':coolname+str(i), 'type':'INTEGER'}))
101
-            i+=1
102
-        
103
-    @unittest.skip('dev') #TODO remove skip
104
-    def test_alter_column(self):
105
-        sw = SqlWrapper()
106
-
107
-        colnames = ['talter', 'talter1', 'test_alter', 'test-alter-col', '-test_alter', '__test_alter__']
108
-        for i,name in enumerate(random.sample(colnames, len(colnames))):
109
-            col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
110
-            self.assertTrue(sw.add_column( self.ttn, col))
111
-
112
-        for i,name in enumerate(random.sample(colnames, len(colnames))):
113
-            col = {'name': name, 'type': VTYPES[i%len(VTYPES)]}
114
-            self.assertTrue(self.ttn, col)
115
-        pass
116
-
117
-    @unittest.skip('dev') #TODO remove skip
118
-    def test_alter_column_badargs(self):
119
-        sw = SqlWrapper()
120
-
121
-        colnames = ['tabad', 'tabad1']
122
-
123
-        for i,name in enumerate(colnames):
124
-            col = { 'name': name, 'type': VTYPES[i%len(VTYPES)] }
125
-            self.assertTrue(sw.add_column(self.ttn, col))
126
-
127
-        for i,badname in enumerate(NAMEINJECT):
128
-            col = { 'name': badname, 'type': VTYPES[i%len(VTYPES)] }
129
-            self.assertFalse(sw.alter_column(self.ttn, col))
130
-
131
-            col = { 'name': colnames[i%len(colnames)], 'type': badname}
132
-            self.assertFalse(sw.alter_column(self.ttn, col))
133
-
134
-            col = { 'name': badname, 'type': NAMEINJECT[random.randint(0,len(NAMEINJECT))]}
135
-            self.assertFalse(sw.alter_column(self.ttn, col))
136
-
137
-            col = { 'name': colnames[i%len(colnames)], 'type': VTYPES[i%len(VTYPES)] }
138
-            self.assertFalse(sw.alter_column(badname, col))
139
-
140
-    def test_insert(self):
141
-        sw = SqlWrapper()
142
-
143
-        records = [
144
-            {   'pk': 0,
145
-                'testchr': 'Hello world !',
146
-                'testtext': 'Wow ! Super text... I\'m amazed',
147
-                'testbool': False
148
-            },
149
-            {   'pk': 1,
150
-                'testchr': 'Hello"world...--',
151
-                'testtext': 'Another wonderfull text. But this time with spécials chars@;,:--*/+\'{}]{[|~&ù^$*µ$£ê;<ç>\/*-+',
152
-                'testbool': True
153
-            },
154
-            {   'pk': 2 }, #default values for others
155
-            {   'pk': '3',
156
-                'testchr': None,
157
-                'testtext': None,
158
-                'testbool': 'true'
159
-            },
160
-            {   'pk': 4,
161
-                'testchr': '',
162
-                'testtext': '',
163
-                'testbool': 'false'
164
-            },
165
-            {   'pk': 5,
166
-                'testbool': 0
167
-            },
168
-            {   'pk': 6,
169
-                'testbool': 1
170
-            },
171
-            {   'pk':1024,
172
-                'testbool': False
173
-            },
174
-        ]
175
-                

+ 0
- 3
Database/tests.py View File

@@ -1,3 +0,0 @@
1
-from django.test import TestCase
2
-
3
-# Create your tests here.

Loading…
Cancel
Save