Brak opisu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_sqlwrapper.py 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. import os
  2. import logging
  3. import random
  4. from unittest import TestCase
  5. from unittest.mock import MagicMock, Mock, patch, call
  6. import unittest
  7. import sqlalchemy
  8. from Database.sqlwrapper import SqlWrapper
  9. from django.conf import settings
  10. from Database.sqlsettings import SQLSettings
  11. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
  12. BADNAMES = ['', " \t ", "bad--", "%", "*", "`", '"', "'", "\0", "hello`World", 'Hello"world', 'foo%', '*bar.*', '%%', "hello\0world", print, 42 ]
  13. for c in ('*','%','`', "'", '"', "\\"):
  14. for _ in range(16):
  15. c = "\\"+c
  16. BADNAMES.append(c)
  17. BADNAMES+=[chr(i) for i in list(range(0,0x09))+[0x0b, 0x0c]+list(range(0x0e, 0x01f))]
  18. class SqlWrapperTests(TestCase):
  19. def setUp(self):
  20. #Overwriting database conf
  21. SQLSettings.DB_READ_CONNECTION_NAME = 'testdb'
  22. SQLSettings.DB_WRITE_CONNECTION_NAME = 'testdb'
  23. self.testdb = os.path.join('/tmp/', 'lodel2_testdb.sqlite3')
  24. settings.DATABASES['testdb'] = {
  25. 'ENGINE': 'sqlite',
  26. 'NAME': self.testdb,
  27. }
  28. #Disable logging but CRITICAL
  29. logging.basicConfig(level=logging.CRITICAL)
  30. pass
  31. def tearDown(self):
  32. try:
  33. os.unlink(self.testdb) #Removing the test database
  34. except FileNotFoundError: pass
  35. """ Testing standart instanciation of sqlwrapper """
  36. def test_init_sqlwrapper(self):
  37. sw = SqlWrapper()
  38. self.assertIsInstance(sw, SqlWrapper)
  39. sw2 = SqlWrapper()
  40. self.assertIsInstance(sw2, SqlWrapper)
  41. def test_get_engine(self):
  42. sw = SqlWrapper()
  43. engine = sw.get_engine(SQLSettings.DB_READ_CONNECTION_NAME)
  44. self.assertIsInstance(engine, sqlalchemy.engine.base.Engine)
  45. def test_get_engine_badargs(self):
  46. sw = SqlWrapper()
  47. with self.assertRaises(Exception):
  48. sw.get_engine(0)
  49. with self.assertRaises(Exception):
  50. sw.get_engine('')
  51. with self.assertRaises(Exception):
  52. sw.get_engine(' ')
  53. @patch.object(SqlWrapper, 'get_read_engine')
  54. def test_execute_queries(self, mock_read):
  55. queries = [ 'SELECT * FROM foo', 'SELECT * FROM bar', 'SELECT foo.id, bar.id FROM foo, bar WHERE foo.id = 42 AND bar.name = \'hello world !\'' ]
  56. mock_read.return_value = MagicMock()
  57. mock_engine = mock_read.return_value
  58. mock_engine.connect.return_value = MagicMock()
  59. mock_conn = mock_engine.connect.return_value
  60. sw = SqlWrapper()
  61. #One query execution
  62. sw.execute(queries[0], 'read')
  63. mock_conn.execute.assert_called_with(queries[0])
  64. mock_conn.reset_mock()
  65. #multiple queries execution
  66. sw.execute(queries, 'read')
  67. except_calls = [ call(q) for q in queries ]
  68. self.assertEqual(except_calls, mock_conn.execute.mock_calls)
  69. @patch.object(sqlalchemy.ForeignKeyConstraint, '__init__')
  70. def test_create_fk_constraint_mock(self, mock_fk):
  71. mock_fk.return_value = None
  72. sw = SqlWrapper()
  73. sw.create_foreign_key_constraint_object('foo','bar', 'foobar')
  74. mock_fk.asser_called_with(['foo'], ['bar'], 'foobar')
  75. def test_create_fk_constraint(self):
  76. sw = SqlWrapper()
  77. fk = sw.create_foreign_key_constraint_object('foo', 'bar', 'foobar')
  78. self.assertIsInstance(fk, sqlalchemy.ForeignKeyConstraint)
  79. @unittest.skip('Dev') #TODO remove it
  80. def test_create_fk_constraint_badargs(self):
  81. sw = SqlWrapper()
  82. with self.assertRaises(Exception):
  83. sw.create_foreign_key_constraint_object(['foo', 'bar'], 'foofoo', 'babar')
  84. with self.assertRaises(Exception):
  85. sw.create_foreign_key_constraint_object('bar', 2, 'babar')
  86. with self.assertRaises(Exception):
  87. sw.create_foreign_key_constraint_object(None, 'foo', 'bar')
  88. with self.assertRaises(Exception):
  89. sw.create_foreign_key_constraint_object('bar', None, 'babar')
  90. with self.assertRaises(Exception):
  91. sw.create_foreign_key_constraint_object(None, None, 'babar')
  92. with self.assertRaises(Exception):
  93. sw.create_foreign_key_constraint_object(print, 'foo', 'babar')
  94. with self.assertRaises(Exception):
  95. sw.create_foreign_key_constraint_object('foo', print, 'babar')
  96. with self.assertRaises(Exception):
  97. sw.create_foreign_key_constraint_object('foo', 'bar', print)
  98. with self.assertRaises(Exception):
  99. sw.create_foreign_key_constraint_object('bar', 'foo', 42)
  100. with self.assertRaises(Exception):
  101. sw.create_foreign_key_constraint_object('bar', 'foo', ' ')
  102. with self.assertRaises(Exception):
  103. sw.create_foreign_key_constraint_object(" \t ", 'foo')
  104. with self.assertRaises(Exception):
  105. sw.create_foreign_key_constraint_object('bar', 'foo', " \t ")
  106. foocol = sqlalchemy.Column('foo',sqlalchemy.INTEGER)
  107. with self.assertRaises(Exception):
  108. sw.create_foreign_key_constraint_object(foocol, foocol)
  109. with self.assertRaises(Exception):
  110. sw.create_foreign_key_constraint_object(foocol, 'bar')
  111. with self.assertRaises(Exception):
  112. sw.create_foreign_key_constraint_object('foo', foocol)
  113. with self.assertRaises(Exception):
  114. sw.create_foreign_key_constraint_object('foo', 'bar', foocol)
  115. @patch.object(sqlalchemy.Column, '__init__')
  116. def test_create_column(self, mockcol):
  117. mockcol.return_value = None
  118. sw = SqlWrapper()
  119. foo = sw.create_column_object('foo', 'INTEGER')
  120. mockcol.assert_called_once_with('foo', sqlalchemy.INTEGER)
  121. mockcol.reset_mock()
  122. foo = sw.create_column_object('foo', 'VARCHAR(50)')
  123. self.assertEqual(mockcol.call_args[0][0], 'foo')
  124. self.assertIsInstance(mockcol.call_args[0][1], sqlalchemy.VARCHAR)
  125. self.assertEqual(mockcol.call_args[0][1].length, 50)
  126. mockcol.reset_mock()
  127. foo = sw.create_column_object('foo', 'TEXT')
  128. mockcol.assert_called_once_with('foo', sqlalchemy.TEXT)
  129. mockcol.reset_mock()
  130. foo = sw.create_column_object('foo', 'DATE')
  131. mockcol.assert_called_once_with('foo', sqlalchemy.DATE)
  132. mockcol.reset_mock()
  133. foo = sw.create_column_object('foo', 'BOOLEAN')
  134. mockcol.assert_called_once_with('foo', sqlalchemy.BOOLEAN)
  135. mockcol.reset_mock()
  136. xtrmlongname = ''
  137. for _ in range(200):
  138. xtrmlongname += 'veryvery'
  139. xtrmlongname += 'longname'
  140. foo = sw.create_column_object(xtrmlongname, 'TEXT')
  141. mockcol.assert_called_once_with(xtrmlongname, sqlalchemy.TEXT)
  142. mockcol.reset_mock()
  143. def test_create_column_extra_fk(self):
  144. sw = SqlWrapper()
  145. extra = { 'foreignkey': 'bar' }
  146. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  147. self.assertIsInstance(rescol, sqlalchemy.Column)
  148. fk = rescol.foreign_keys.pop()
  149. self.assertIsInstance(fk, sqlalchemy.ForeignKey)
  150. self.assertEqual(fk._colspec, 'bar')
  151. def test_create_column_extra_default(self):
  152. sw = SqlWrapper()
  153. extra = { 'default': None, 'nullable': True }
  154. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  155. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  156. self.assertEqual(rescol.default.arg, None)
  157. extra = { 'default': "NULL", 'nullable': True }
  158. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  159. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  160. self.assertEqual(rescol.default.arg, "NULL")
  161. extra = { 'default': 'null', 'nullable': True }
  162. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  163. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  164. self.assertEqual(rescol.default.arg, 'null')
  165. extra = { 'default': 42 }
  166. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  167. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  168. self.assertEqual(rescol.default.arg, 42)
  169. extra = { 'default': 'foobardefault' }
  170. rescol = sw.create_column_object('foo', 'VARCHAR(50)', extra)
  171. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  172. self.assertEqual(rescol.default.arg, 'foobardefault')
  173. extra = { 'default': 'foodefault' }
  174. rescol = sw.create_column_object('foo', 'TEXT', extra)
  175. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  176. self.assertEqual(rescol.default.arg, 'foodefault')
  177. extra = { 'default': True }
  178. rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
  179. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  180. self.assertEqual(rescol.default.arg, True)
  181. extra = { 'default': False }
  182. rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
  183. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  184. self.assertEqual(rescol.default.arg, False)
  185. extra = { 'default': "true" }
  186. rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
  187. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  188. self.assertEqual(rescol.default.arg, "true")
  189. extra = { 'default': 0 }
  190. rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
  191. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  192. self.assertEqual(rescol.default.arg, 0)
  193. extra = { 'default': 1 }
  194. rescol = sw.create_column_object('foo', 'BOOLEAN', extra)
  195. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  196. self.assertEqual(rescol.default.arg, 1)
  197. def test_create_column_extra_pknull(self):
  198. sw = SqlWrapper()
  199. for b in (True,False):
  200. extra = { 'primarykey': b }
  201. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  202. self.assertIsInstance(rescol, sqlalchemy.Column)
  203. self.assertEqual(rescol.primary_key, b)
  204. extra = { 'nullable': b }
  205. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  206. self.assertIsInstance(rescol, sqlalchemy.Column)
  207. self.assertEqual(rescol.nullable, b)
  208. extra = { 'primarykey' : b, 'nullable': not b }
  209. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  210. self.assertIsInstance(rescol, sqlalchemy.Column)
  211. self.assertEqual(rescol.primary_key, b)
  212. self.assertEqual(rescol.nullable, not b)
  213. extra = { 'primarykey' : b, 'nullable': b }
  214. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  215. self.assertIsInstance(rescol, sqlalchemy.Column)
  216. self.assertEqual(rescol.primary_key, b)
  217. self.assertEqual(rescol.nullable, b)
  218. def test_create_column_extra_all(self):
  219. sw = SqlWrapper()
  220. extra = { 'primarykey': False, 'nullable': False, 'default':42, 'foreignkey': 'foobar'}
  221. rescol = sw.create_column_object('foo', 'INTEGER', extra)
  222. self.assertIsInstance(rescol, sqlalchemy.Column)
  223. self.assertEqual(rescol.primary_key, False)
  224. self.assertEqual(rescol.nullable, False)
  225. self.assertIsInstance(rescol.default, sqlalchemy.ColumnDefault)
  226. self.assertEqual(rescol.default.arg, 42)
  227. fk = rescol.foreign_keys.pop()
  228. self.assertIsInstance(fk, sqlalchemy.ForeignKey)
  229. self.assertEqual(fk._colspec, 'foobar')
  230. @unittest.skip('Dev') #TODO remove it
  231. def test_create_column_badargs(self):
  232. sw = SqlWrapper()
  233. cc = sw.create_column_object
  234. ain = self.assertIsNone
  235. for bname in BADNAMES:
  236. ain(cc(bname, 'INTEGER'))
  237. ain(cc('c', 'INNNTEGER'))
  238. ain(cc(" \t\t ", 'TEXT'))
  239. ain(cc('c', ' '))
  240. ain(cc('c', 'VARCHAR(foo)'))
  241. ain(cc('supercol', 'SUPERNOTTYPEDVARCHARSTR'))
  242. ain(cc('c', None))
  243. ain(cc(None, None))
  244. @unittest.skip('Dev') #TODO remove it
  245. def test_create_column_badextra(self):
  246. sw = SqlWrapper()
  247. cc = sw.create_column_object
  248. ain = self.assertIsNone
  249. #Put junk in extra datas
  250. for xtra_name in [ 'primarykey', 'nullable', 'foreignkey' ]:
  251. for junk in [ print, sqlalchemy, SqlWrapper, ' ' ]+BADNAMES:
  252. ain(cc('c', 'TEXT', { xtra_name: junk }))
  253. for junk in [True, False, 52, ' ']+BADNAMES:
  254. ain(cc('c', 'TEXT', { 'foreignkey': junk }))
  255. for xtra_name in ('primarykey', 'nullalble'):
  256. for junk in [4096, 'HELLO WORLD !', ' ']+BADNAMES:
  257. ain(cc('c', 'TEXT', { xtra_name, junk } ))
  258. @patch.object(sqlalchemy.Table, '__init__')
  259. def test_create_table(self, mock_table):
  260. #TODO check constraints
  261. """ Create tables and check that the names are ok """
  262. mock_table.return_value = None
  263. sw = SqlWrapper()
  264. cols = [
  265. { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
  266. { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
  267. { 'name': 'col2', 'type': 'TEXT' }
  268. ]
  269. for table_name in ['supertable', 'besttableever', 'foo-table']:
  270. params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
  271. self.assertTrue(sw.create_table(params))
  272. self.assertEqual(mock_table.call_args[0][0], table_name)
  273. pass
  274. @patch.object(sqlalchemy.Table, 'append_column')
  275. def test_create_table_col(self, mock_append):
  276. #TODO check constraints
  277. """ Create a table and check that the columns are OK """
  278. sw = SqlWrapper()
  279. table_name = 'supertablecol'
  280. cols = [
  281. { 'name': 'supercol', 'type': 'INTEGER', 'extra': { 'nullable': True } },
  282. { 'name': 'superpk', 'type': 'INTEGER', 'extra': { 'primarykey': True} },
  283. { 'name': 'col2', 'type': 'TEXT' }
  284. ]
  285. except_coltype = [ sqlalchemy.INTEGER, sqlalchemy.INTEGER, sqlalchemy.TEXT ]
  286. params = { 'name': table_name, 'columns': cols, 'constraints':dict() }
  287. sw.create_table(params) #This call return false because of the mock on append_column
  288. self.assertEqual(len(mock_append.mock_calls), len(cols))
  289. for n,c in enumerate(mock_append.mock_calls):
  290. self.assertEqual(c[1][0].name, cols[n]['name'])
  291. self.assertIsInstance(c[1][0].type, except_coltype[n])
  292. pass
  293. def test_create_table_recreate(self):
  294. #TODO check constraints
  295. """ Try to create the same table 2 times (except a False return the second time) """
  296. sw = SqlWrapper()
  297. params = { 'name': 'redondant', 'columns': [{'name':'foocol', 'type': 'INTEGER'}]}
  298. self.assertTrue(sw.create_table(params))
  299. params['columns'] = [{'name':'barcol', 'type': 'INTEGER'}]
  300. self.assertFalse(sw.create_table(params))
  301. @unittest.skip('dev') #TODO remove it
  302. def test_create_table_badargs(self):
  303. sw = SqlWrapper()
  304. af = self.assertFalse
  305. ct = sw.create_table
  306. foocol = {'name': 'foocol', 'type': 'INTEGER'}
  307. p = {'name': 'foo'}
  308. af(ct(p)) #no columns
  309. for bname in BADNAMES: #bad column name
  310. p['columns'] = bname
  311. af(ct(p))
  312. p['columns']=[]; af(ct(p)) #empty columns TODO Or return True is normal ???
  313. p = {'columns':[foocol]}
  314. af(ct(p)) #no name
  315. for bname in BADNAMES:
  316. p['name'] = bname
  317. af(ct(p))
  318. pass
  319. def create_test_table(self, sw):
  320. """ Create a table for test purpose """
  321. table_name = 'testtable'
  322. cols = [
  323. { 'name': 'pk', 'type': 'INTEGER', 'extra': {'primarykey': True} },
  324. { 'name': 'testtxt', 'type': 'TEXT', 'extra': {'nullable': True, 'default': 'hello'} },
  325. { 'name': 'testchr', 'type': 'VARCHAR(50)', 'extra': {'nullable': True, 'default': 'hello world'} },
  326. ]
  327. sw.create_table( { 'name': table_name, 'columns': cols} )
  328. pass
  329. def test_get_table(self):
  330. """ Try to get the testtable (check the name and type of return) """
  331. sw = SqlWrapper()
  332. self.create_test_table(sw)
  333. rt = sw.get_table('testtable')
  334. self.assertIsInstance(rt, sqlalchemy.Table)
  335. self.assertEqual(rt.name, 'testtable')
  336. rt = sw.get_table('testtable', 'write')
  337. self.assertIsInstance(rt, sqlalchemy.Table)
  338. self.assertEqual(rt.name, 'testtable')
  339. rt = sw.get_table('testtable', 'read')
  340. self.assertIsInstance(rt, sqlalchemy.Table)
  341. self.assertEqual(rt.name, 'testtable')
  342. pass
  343. @unittest.skip('dev') #TODO remove skip
  344. def test_get_table_badargs(self):
  345. sw = SqlWrapper()
  346. self.create_test_table(sw)
  347. for badname in BADNAMES:
  348. with self.assertRaises((sqlalchemy.exc.NoSuchTableError, Exception)):
  349. sw.get_table(badname)
  350. with self.assertRaises(sqlalchemy.exc.NoSuchTableError):
  351. sw.get_table('FOOBAR')
  352. with self.assertRaises(Exception):
  353. sw.get_table(print)
  354. with self.assertRaises(Exception):
  355. sw.get_table(1)
  356. #bad action types
  357. with self.assertRaises(Exception):
  358. sw.get_table('testtable', print)
  359. with self.assertRaises(Exception):
  360. sw.get_table('testtable', 'foobar')
  361. with self.assertRaises(Exception):
  362. sw.get_table('testtable', 42)
  363. pass
  364. def test_drop_table(self):
  365. sw = SqlWrapper()
  366. self.create_test_table(sw)
  367. self.assertTrue(sw.drop_table('testtable'))
  368. self.assertFalse(sw.drop_table('testtable'))
  369. self.assertFalse(sw.drop_table('nonexisting'))
  370. pass
  371. def test_drop_table_badargs(self):
  372. sw = SqlWrapper()
  373. self.create_test_table(sw)
  374. af = self.assertFalse
  375. for bname in BADNAMES:
  376. self.assertFalse(sw.drop_table(bname))
  377. def test_create_get_drop_table(self):
  378. sw = SqlWrapper()
  379. funkynames = ('standart', "wow-nice", "-really-", "test_test-test", "_test", "test_", "test42", "foobar_123", "foobar-123", "foofoo--babar")
  380. types = ['INTEGER', 'VARCHAR(5)', 'VARCHAR(128)', 'TEXT', 'BOOLEAN']
  381. params = dict()
  382. cols = []
  383. for i,name in enumerate(funkynames):
  384. cols.append({'name':name, 'type':types[i%len(types)]})
  385. params['columns'] = cols
  386. for name in random.sample(funkynames, len(funkynames)):
  387. params['name'] = name
  388. self.assertTrue(sw.create_table(params))
  389. for name in random.sample(funkynames, len(funkynames)):
  390. rt = sw.get_table(name)
  391. self.assertIsInstance(rt, sqlalchemy.Table)#do we get a table ?
  392. self.assertEqual(rt.name, name)
  393. for name in random.sample(funkynames, len(funkynames)):
  394. self.assertTrue(sw.drop_table(name))
  395. pass