1
0
Fork 0
mirror of https://github.com/yweber/lodel2.git synced 2025-11-14 18:09:17 +01:00

Implemented DropColumn for sqlite

This commit is contained in:
Yann 2015-06-26 16:03:22 +02:00
commit 16a962fb3f
4 changed files with 51 additions and 7 deletions

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import os
import time
import sqlalchemy as sqla
from sqlalchemy.ext.compiler import compiles
@ -55,8 +56,42 @@ def visit_drop_column(element, ddlcompiler, **kw):
return 'ALTER TABLE %s DROP COLUMN %s'%(tname, colname)
@compiles(DropColumn)
##
# @warning Returns more than one query @ref Database.sqlutils.ddl_execute
def visit_drop_column(element, ddlcompiler, **kw):
raise NotImplementedError('Drop column not yet implemented for '+str(ddlcompiler.dialect.name))
prep = ddlcompiler.sql_compiler.preparer
tname = prep.format_table(element.table)
#Temporary table
tmpname = str(element.table)+'_copydropcol'
tmpTable = sqla.Table(tmpname, sqla.MetaData())
tmptname = prep.format_table(tmpTable)
query = 'ALTER TABLE %s RENAME TO %s; '%(tname, tmptname)
colname = prep.format_column(element.col)
meta = sqla.MetaData()
newtable = sqla.Table(element.table, meta)
clist = element.table.columns
cols = []
for col in clist:
if col.name != element.col.name:
newtable.append_column(col.copy())
cols.append(prep.format_column(col))
cols=', '.join(cols)
query += str(sqla.schema.CreateTable(newtable).compile(dialect = ddlcompiler.dialect))+';'
query += 'INSERT INTO %s ( %s ) SELECT %s FROM %s;' % (newtable, cols, cols, tmptname)
query += 'DROP TABLE %s'% (tmpname)
return query
#raise NotImplementedError('Drop column not yet implemented for '+str(ddlcompiler.dialect.name))
class AlterColumn(sqla.schema.DDLElement):
""" Defines the DDL for changing the type of a column """

View file

@ -91,11 +91,20 @@ def getTable(cls):
engine = cls.getDbE()
return sqla.Table(cls.table, meta(engine))
## This function is intended to execute ddl defined in sqlalter
# @warning There is a dirty workaround here, DDL should returns only one query, but DropColumn for sqlite has to return 4 queries (rename, create, insert, drop). There is a split on the compiled SQL to extract and execute one query at a time
# @param ddl DDLElement: Can be an Database.sqlalter.DropColumn Database.sqlalter.AddColumn or Database.sqlalter.AlterColumn
# @param db_engine: A database engine
# @return True if success, else False
def ddl_execute(ddl, db_engine):
conn = db_engine.connect()
req = str(ddl.compile(dialect=db_engine.dialect))
logger.debug("Executing custom raw SQL query : '"+req+"'")
ret = conn.execute(req)
queries = req.split(';')
for query in queries:
logger.debug("Executing custom raw SQL query : '"+query+"'")
ret = conn.execute(query)
if not bool(ret):
return False
conn.close()
return bool(ret)
return True

View file

@ -85,7 +85,8 @@ class EmField(EmComponent):
# @return bool : True if deleted False if deletion aborded
# @todo Check if unconditionnal deletion is correct
def delete(self):
class_table = self.get_class_table()
dbe = self.__class__.getDbE()
class_table = sql.Table(self.get_class_table(), sqlutils.meta(dbe))
field_col = sql.Column(self.name)
ddl = DropColumn(class_table, field_col)
sqlutils.ddl_execute(ddl, self.__class__.getDbE())

View file

@ -129,7 +129,6 @@ class TestField(FieldTestCase):
self.assertIn(field_column.name, field_table_columns)
pass
@unittest.skip("Delete not implemente for sqlite...")
def test_deletion(self):
field_names = ['field1', 'field2']
for name in field_names:
@ -143,7 +142,7 @@ class TestField(FieldTestCase):
for deleted_name in field_names[:i+1]:
self.assertNotIn(deleted_name, cols, "Column is not deleted")
for not_deleted_name in field_names[i+1:]:
self.assertIn(deleted_name, cols, "A bad column was deleted")
self.assertIn(not_deleted_name, cols, "A bad column was deleted")
with self.assertRaises(EmComponentNotExistError, msg="This field should be deleted"):
EmField(name)