1
0
Fork 0
mirror of https://github.com/yweber/lodel2.git synced 2026-07-01 05:10:49 +02:00

Merge branch 'new_me'

This commit is contained in:
Yann 2015-09-10 11:48:35 +02:00
commit 661f7f24ea
21 changed files with 1106 additions and 1669 deletions

View file

@ -0,0 +1,2 @@
## @package EditorialModel
# @brief Group all objects usefull for editorial model management

View file

@ -0,0 +1,2 @@
## @package EditorialModel.backend
# @brief Backend that allows the EM to handle different data sources

View file

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
## @package EditorialModel.backend.json_backend
# @brief Handle json files
#
# Allows an editorial model to be loaded/saved in
# json format
import json
import datetime
from Lodel.utils.mlstring import MlString
def date_cast(date):
if len(date):
return datetime.datetime(date)
else:
return None
def int_or_none(i):
if len(i):
return int(i)
else:
return None
## Manages a Json file based backend structure
class EmBackendJson(object):
cast_methods = {
'uid': int,
'rank': int,
'class_id': int,
'fieldgroup_id': int,
'rel_to_type_id': int_or_none,
'rel_field_id': int_or_none,
'optional': bool,
'internal': bool,
'string': MlString.load,
'help_text': MlString.load,
'date_create': date_cast,
'date_update': date_cast,
}
## Constructor
#
# @param json_file str: path to the json_file used as data source
def __init__(self, json_file):
with open(json_file) as json_data:
self.data = json.loads(json_data.read())
## Loads the data in the data source json file
#
# @return list
def load(self):
data = {}
for index, component in self.data.items():
attributes = {}
for attr_name, attr_value in component.items():
if attr_name in EmBackendJson.cast_methods:
attributes[attr_name] = EmBackendJson.cast_methods[attr_name](attr_value)
else:
attributes[attr_name] = attr_value
data[int(index)] = attributes
return data
## Saves the data in the data source json file
#
# @return bool
# @todo à implémenter
def save(self):
return True

View file

@ -4,96 +4,61 @@
# @see EditorialModel::classes::EmClass
from EditorialModel.components import EmComponent
from Database import sqlutils
import sqlalchemy as sql
import EditorialModel.fieldtypes as ftypes
from EditorialModel.classtypes import EmClassType
from EditorialModel.types import EmType
#from EditorialModel.exceptions import *
#import EditorialModel.fieldtypes as ftypes
import EditorialModel
## @brief Manipulate Classes of the Editorial Model
# Create classes of object.
# @see EmClass, EmType, EmFieldGroup, EmField
# @see EmClass, EmType, EditorialModel.fieldgroups.EmFieldGroup, EmField
# @todo sortcolumn handling
class EmClass(EmComponent):
table = 'em_class'
ranked_in = 'classtype'
## @brief Specific EmClass fields
# @see EditorialModel::components::EmComponent::_fields
_fields = [
('classtype', ftypes.EmField_char),
('icon', ftypes.EmField_icon),
('sortcolumn', ftypes.EmField_char)
]
## EmClass instanciation
# @todo Classtype initialisation and test is not good EmClassType should give an answer or something like that
# @todo defines types check for icon and sortcolumn
def __init__(self, model, uid, name, classtype, icon='0', sortcolumn='rank', string=None, help_text=None, date_update=None, date_create=None, rank=None):
## Create a new class
# @param name str: name of the new class
# @param class_type EmClasstype: type of the class
# @return An EmClass instance
# @throw EmComponentExistError if an EmClass with this name and a different classtype exists
@classmethod
def create(cls, name, classtype, icon=None, sortcolumn='rank', **em_component_args):
return cls._create_db(name=name, classtype=classtype['name'], icon=icon, sortcolumn=sortcolumn, **em_component_args)
try:
getattr(EmClassType, classtype)
except AttributeError:
raise AttributeError("Unknown classtype '%s'" % classtype)
@classmethod
## Isolate SQL for EmClass::create
# @return An instance of EmClass
def _create_db(cls, name, classtype, icon, sortcolumn, **em_component_args):
#Create a new entry in the em_class table
result = super(EmClass, cls).create(name=name, classtype=classtype, icon=icon, sortcolumn=sortcolumn, **em_component_args)
self.classtype = classtype
self.icon = icon
self.sortcolumn = sortcolumn # 'rank'
super(EmClass, self).__init__(model=model, uid=uid, name=name, string=string, help_text=help_text, date_update=date_update, date_create=date_create, rank=rank)
dbe = result.db_engine
conn = dbe.connect()
#Create a new table storing LodelObjects of this EmClass
meta = sql.MetaData()
emclasstable = sql.Table(result.class_table_name, meta, sql.Column('uid', sql.VARCHAR(50), primary_key=True))
emclasstable.create(conn)
conn.close()
return result
@property
## @brief Return the table name used to stores data on this class
def class_table_name(self):
return self.name
## Check if the EmComponent is valid
# @throw EmComponentCheckError if fails
def check(self):
super(EmClass, self).check()
## @brief Delete a class if it's ''empty''
# If a class has no fieldgroups delete it
# @return bool : True if deleted False if deletion aborded
def delete(self):
fieldgroups = self.fieldgroups()
if len(fieldgroups) > 0:
return False
dbe = self.db_engine
meta = sqlutils.meta(dbe)
#Here we have to give a connection
class_table = sql.Table(self.name, meta)
meta.drop_all(tables=[class_table], bind=dbe)
return super(EmClass, self).delete()
def delete_check(self):
for emtype in self.model.components(EmType):
if emtype.class_id == self.uid:
return False
for fieldgroup in self.model.components(EditorialModel.fieldgroups.EmFieldGroup):
if fieldgroup.class_id == self.uid:
return False
return True
## Retrieve list of the field_groups of this class
# @return A list of fieldgroups instance
def fieldgroups(self):
records = self._fieldgroups_db()
fieldgroups = [EditorialModel.fieldgroups.EmFieldGroup(int(record.uid)) for record in records]
return fieldgroups
## Isolate SQL for EmClass::fieldgroups
# @return An array of dict (sqlalchemy fetchall)
def _fieldgroups_db(self):
dbe = self.db_engine
emfg = sql.Table(EditorialModel.fieldgroups.EmFieldGroup.table, sqlutils.meta(dbe))
req = emfg.select().where(emfg.c.class_id == self.uid)
conn = dbe.connect()
res = conn.execute(req)
return res.fetchall()
ret = []
for fieldgroup in self.model.components(EditorialModel.fieldgroups.EmFieldGroup):
if fieldgroup.class_id == self.uid:
ret.append(fieldgroup)
return ret
## Retrieve list of fields
# @return fields [EmField]:
@ -107,53 +72,19 @@ class EmClass(EmComponent):
## Retrieve list of type of this class
# @return types [EmType]:
def types(self):
records = self._types_db()
types = [EditorialModel.types.EmType(int(record.uid)) for record in records]
return types
## Isolate SQL for EmCLass::types
# @return An array of dict (sqlalchemy fetchall)
def _types_db(self):
dbe = self.db_engine
emtype = sql.Table(EditorialModel.types.EmType.table, sqlutils.meta(dbe))
req = emtype.select().where(emtype.c.class_id == self.uid)
conn = dbe.connect()
res = conn.execute(req)
return res.fetchall()
ret = []
for emtype in self.model.components(EmType):
if emtype.class_id == self.uid:
ret.append(emtype)
return ret
## Add a new EmType that can ben linked to this class
# @param em_type EmType: type to link
# @return success bool: done or not
def link_type(self, em_type):
table_name = self.name + '_' + em_type.name
self._link_type_db(table_name)
return True
def _link_type_db(self, table_name):
# Create a new table storing additionnal fields for the relation between the linked type and this EmClass
conn = self.db_engine.connect()
meta = sql.MetaData()
emlinketable = sql.Table(table_name, meta, sql.Column('uid', sql.VARCHAR(50), primary_key=True))
emlinketable.create(conn)
conn.close()
pass
## Retrieve list of EmType that are linked to this class
# @return types [EmType]:
def linked_types(self):
return self._linked_types_db()
def _linked_types_db(self):
dbe = self.db_engine
meta = sql.MetaData()
meta.reflect(dbe)
linked_types = []
for table in meta.tables.values():
table_name_elements = table.name.split('_')
if len(table_name_elements) == 2:
linked_types.append(EditorialModel.types.EmType(table_name_elements[1]))
return linked_types
pass

View file

@ -23,7 +23,7 @@ class EmNature(object):
# - 'attach' : what type of superior a type can have
# - 'classtype' a type can have superiors of the same classtype
# - 'type' a type can only have superiors of the same type
# - automatic : possible superiors
# - automatic : possible superiors
# - False : possible superiors must be defined
# - True : possible superiors can not be defined, they will be enforced by the ME automatically
# - maxdepth : maximum depth
@ -86,10 +86,9 @@ class EmClassType(object):
@staticmethod
def natures(classtype_name):
if not isinstance(classtype_name, str):
raise TypeError("Excepted <class str> but got "+str(type(classtype_name)))
raise TypeError("Excepted <class str> but got %s" % str(type(classtype_name)))
try:
classtype = getattr(EmClassType, classtype_name)
except AttributeError:
raise AttributeError("Unknown classtype : '"+classtype_name+"'")
raise AttributeError("Unknown classtype : '%s'" % classtype_name)
return classtype['hierarchy'].keys()

View file

@ -1,15 +1,18 @@
# -*- coding: utf-8 -*-
## @file components.py
# Defines the EditorialModel::components::EmComponent class and the EditorialModel::components::ComponentNotExistError exception class
## @package EditorialModel.components
# @brief Base objects for all EditorialModel components
#
# Defines the EditorialModel::components::EmComponent class
import datetime
import logging
import sqlalchemy as sql
from Database import sqlutils
import EditorialModel.fieldtypes as ftypes
from collections import OrderedDict
import hashlib
import EditorialModel.fieldtypes as ftypes
from EditorialModel.exceptions import *
from Lodel.utils.mlstring import MlString
logger = logging.getLogger('Lodel2.EditorialModel')
@ -21,346 +24,151 @@ logger = logging.getLogger('Lodel2.EditorialModel')
# @pure
class EmComponent(object):
## The name of the engine configuration
# @todo Not a good idea to store it here
dbconf = 'default'
## The table in wich we store data for this object
# None for EmComponent
table = None
## Used by EmComponent::modify_rank
ranked_in = None
## Read only properties
_ro_properties = ['date_update', 'date_create', 'uid', 'rank', 'deleted']
## @brief List fields name and fieldtype
#
# This is a list that describe database fields common for each EmComponent child classes.
# A database field is defined here by a tuple(name, type) with name a string and type an EditorialModel.fieldtypes.EmFieldType
# @warning The EmFieldType in second position in the tuples must be a class type and not a class instance !!!
# @see EditorialModel::classes::EmClass::_fields EditorialModel::fieldgroups::EmFieldGroup::_fields EditorialModel::types::EmType::_fields EditorialModel::fields::EmField::_fields
_fields = [
('uid', ftypes.EmField_integer),
('name', ftypes.EmField_char),
('rank', ftypes.EmField_integer),
('date_update', ftypes.EmField_date),
('date_create', ftypes.EmField_date),
('string', ftypes.EmField_mlstring),
('help', ftypes.EmField_mlstring)
]
## Instaciate an EmComponent
# @param id_or_name int|str: name or id of the object
# @throw TypeError if id_or_name is not an integer nor a string
# @throw NotImplementedError if called with EmComponent
def __init__(self, id_or_name, dbconf = 'default'):
self.dbconf = dbconf
if self.dbconf:
self.db_engine = sqlutils.get_engine(dbconf)
else:
self.db_engine = False
def __init__(self, model, uid, name, string = None, help_text = None, date_update = None, date_create = None, rank = None):
if type(self) == EmComponent:
raise NotImplementedError('Abstract class')
raise NotImplementedError('Abstract class')
if model.__class__.__name__ != 'Model':
raise TypeError("Excepted type for 'model' arg is <class 'Model'> but got {} instead".format(type(model)))
## @brief An OrderedDict storing fields name and values
# Values are handled by EditorialModel::fieldtypes::EmFieldType
# @warning \ref _fields instance property is not the same than EmComponent::_fields class property. In the instance property the EditorialModel::fieldtypes::EmFieldType are instanciated to be able to handle datas
# @see EmComponent::_fields EditorialModel::fieldtypes::EmFieldType
self._fields = OrderedDict([(name, ftype()) for (name, ftype) in (EmComponent._fields + self.__class__._fields)])
self.model = model
# populate
if isinstance(id_or_name, int):
self._fields['uid'].value = id_or_name # read only propertie set
elif isinstance(id_or_name, str):
self.name = id_or_name
else:
raise TypeError('Bad argument: expecting <int> or <str> but got : ' + str(type(id_or_name)))
self.table = self.__class__.table
self.populate()
self.uid = uid
self.check_type('uid', int)
self.name = name
self.check_type('name', str)
self.string = MlString() if string is None else string
self.check_type('string', MlString)
self.help_text = MlString() if help_text is None else help_text
self.check_type('help_text', MlString)
self.date_update = datetime.datetime.now() if date_update is None else date_update #WARNING timezone !
self.check_type('date_update', datetime.datetime)
self.date_create = datetime.datetime.now() if date_create is None else date_create #WARNING timezone !
self.check_type('date_create', datetime.datetime)
## @brief Access an attribute of an EmComponent
# This method is overloads the default __getattr__ to search in EmComponents::_fields . If there is an EditorialModel::EmField with a corresponding name in the component
# it returns its value.
# @param name str: The attribute name
# @throw AttributeError if attribute don't exists
# @see EditorialModel::EmField::value
def __getattr__(self, name):
if name != '_fields' and name in self._fields:
return self._fields[name].value
else:
return super(EmComponent, self).__getattribute__(name)
#Handling specials ranks for component creation
self.rank = rank
pass
## @brief Access an EmComponent attribute
# This function overload the default __getattribute__ in order to check if the EmComponent was deleted.
# @param name str: The attribute name
# @throw EmComponentNotExistError if the component was deleted
def __getattribute__(self, name):
if super(EmComponent, self).__getattribute__('deleted'):
raise EmComponentNotExistError("This " + super(EmComponent, self).__getattribute__('__class__').__name__ + " has been deleted")
res = super(EmComponent, self).__getattribute(name)
return res
@property
## @brief Return a dict with attributes name as key and attributes value as value
# @note Used at creation and deletion to call the migration handler
def attr_dump(self):
return {fname: fval for fname, fval in self.__dict__.items() if not (fname.startswith('__') or (fname == 'uid'))}
## @brief This function has to be called after the instanciation, checks, and init manipulations are done
# @note Create a new attribute _inited that allow __setattr__ to know if it has or not to call the migration handler
def init_ended(self):
self._inited = True
## Set the value of an EmComponent attribute
# @param name str: The propertie name
# @param value *: The value
def __setattr__(self, name, value):
if name in self.__class__._ro_properties:
raise TypeError("Propertie '" + name + "' is readonly")
## @brief Reimplementation for calling the migration handler to register the change
def __setattr__(self, attr_name, value):
inited = '_inited' in self.__dict__
if inited:
# if fails raise MigrationHandlerChangeError
self.model.migration_handler.register_change(self.uid, {attr_name: getattr(self, attr_name) }, {attr_name: value} )
super(EmComponent, self).__setattr__(attr_name, value)
if inited:
self.model.migration_handler.register_model_state(hash(self.model))
## Check the type of attribute named var_name
# @param var_name str : the attribute name
# @param excepted_type tuple|type : Tuple of type or a type
# @throw AttributeError if wrong type detected
def check_type(self, var_name, excepted_type):
var = getattr(self, var_name)
if not isinstance(var, excepted_type):
raise AttributeError("Excepted %s to be an %s but got %s instead" % (var_name, str(excepted_type), str(type(var))) )
pass
if name != '_fields' and hasattr(self, '_fields') and name in object.__getattribute__(self, '_fields'):
self._fields[name].from_python(value)
else:
object.__setattr__(self, name, value)
## @brief Hash function that allows to compare two EmComponent
# @return EmComponent+ClassName+uid
def __hash__(self):
return "EmComponent"+self.__class__.__name__+str(self.uid)
return int(hashlib.md5(str(self.attr_dump).encode('utf-8')).hexdigest(),16)
## @brief Test if two EmComponent are "equals"
# @return True or False
def __eq__(self, other):
return self.__class__ == other.__class__ and self.uid == other.uid
## Lookup in the database properties of the object to populate the properties
# @throw EmComponentNotExistError if the instance is not anymore stored in database
def populate(self):
records = self._populate_db() # Db query
## Check if the EmComponent is valid
# This function has to check that rank are correct and continuous other checks are made in childs classes
# @warning Hardcoded minimum rank
# @warning Rank modified by _fields['rank'].value
# @throw EmComponentCheckError if fails
def check(self):
self.model.sort_components(self.__class__)
if self.get_max_rank() > len(self.same_rank_group()) or self.rank <= 0:
#Non continuous ranks
for i, component in enumerate(self.same_rank_group()):
component.rank = i + 1
# No need to sort again here
for record in records:
for keys in self._fields.keys():
if keys in record:
self._fields[keys].from_string(record[keys])
super(EmComponent, self).__setattr__('deleted', False)
#@classmethod
## Shortcut that return the sqlAlchemy engine
#def db_engine(cls):
# return sqlutils.get_engine(cls.dbconf)
## Do the query on the database for EmComponent::populate()
# @throw EmComponentNotExistError if the instance is not anymore stored in database
def _populate_db(self):
dbe = self.db_engine
component = sql.Table(self.table, sqlutils.meta(dbe))
req = sql.sql.select([component])
if self.uid is None:
req = req.where(component.c.name == self.name)
else:
req = req.where(component.c.uid == self.uid)
conn = dbe.connect()
res = conn.execute(req)
res = res.fetchall()
conn.close()
if not res or len(res) == 0:
raise EmComponentNotExistError("No " + self.__class__.__name__ + " found with " + ('name ' + self.name if self.uid is None else 'uid ' + str(self.uid)))
return res
## Insert a new component in the database
#
# This function create and assign a new UID and handle the date_create and date_update values
# @warning There is a mandatory argument dbconf that indicate wich database configuration to use
# @param **kwargs : Names arguments representing object properties
# @return An instance of the created component
# @throw TypeError if an element of kwargs isn't a valid object propertie or if a mandatory argument is missing
# @throw RuntimeError if the creation fails at database level
# @todo Check that every mandatory _fields are given in args
# @todo Stop using datetime.datetime.utcnow() for date_update and date_create init
@classmethod
def create(cls, **kwargs):
#Checking for invalid arguments
valid_args = [ name for name,_ in (cls._fields + EmComponent._fields)]
for argname in kwargs:
if argname in ['date_update', 'date_create', 'rank', 'uid']: # Automatic properties
raise TypeError("Invalid argument : " + argname)
elif argname not in valid_args:
raise TypeError("Unexcepted keyword argument '"+argname+"' for "+cls.__name__+" creation")
#Check uniq names constraint
try:
name = kwargs['name']
exist = cls(name)
for kname in kwargs:
if not (getattr(exist, kname) == kwargs[kname]):
raise EmComponentExistError("An "+cls.__name__+" named "+name+" allready exists with a different "+kname)
logger.info("Trying to create an "+cls.__name__+" that allready exist with same attribute. Returning the existing one")
return exist
except EmComponentNotExistError:
pass
# Mandatory fields check (actual fieldtypes don't allow this check
#for name in cls._fields:
# if cls._fields[name].notNull and cls._fields[name].default == None:
# raise TypeError("Missing argument : "+name)
if 'dbconf' in kwargs:
if not kwargs['db_engine']:
raise NotImplementedError("Its a nonsense to call create with no database")
dbconf = kwargs['dbconf']
else:
dbconf = 'default'
dbe = sqlutils.get_engine(dbconf)
kwargs['uid'] = cls.new_uid(dbe)
kwargs['date_update'] = kwargs['date_create'] = datetime.datetime.utcnow()
conn = dbe.connect()
kwargs['rank'] = cls._get_max_rank( kwargs[cls.ranked_in], dbe )+1
table = sql.Table(cls.table, sqlutils.meta(dbe))
req = table.insert(kwargs)
if not conn.execute(req):
raise RuntimeError("Unable to create the "+cls.__class__.__name__+" EmComponent ")
conn.close()
return cls(kwargs['name'], dbconf)
## Write the representation of the component in the database
# @return bool
# @todo stop using datetime.datetime.utcnow() for date_update update
def save(self):
values = {}
for name, field in self._fields.items():
values[name] = field.to_sql()
# Don't allow creation date overwritting
#if 'date_create' in values:
#del values['date_create']
#logger.warning("date_create supplied for save, but overwritting of date_create not allowed, the date will not be changed")
values['date_update'] = datetime.datetime.utcnow()
self._save_db(values)
## Do the query in the datbase for EmComponent::save()
# @param values dict: A dictionnary of the values to insert
# @throw RunTimeError if it was unable to do the Db update
def _save_db(self, values):
""" Do the query on the db """
dbe = self.db_engine
component = sql.Table(self.table, sqlutils.meta(dbe))
req = sql.update(component, values=values).where(component.c.uid == self.uid)
conn = dbe.connect()
res = conn.execute(req)
conn.close()
if not res:
raise RuntimeError("Unable to save the component in the database")
## Delete this component data in the database
# @return bool : True if deleted False if deletion aborded
# @throw RunTimeError if it was unable to do the deletion
def delete(self):
#<SQL>
dbe = self.db_engine
component = sql.Table(self.table, sqlutils.meta(dbe))
req = component.delete().where(component.c.uid == self.uid)
conn = dbe.connect()
res = conn.execute(req)
conn.close()
if not res:
raise RuntimeError("Unable to delete the component in the database")
#</SQL>
super(EmComponent, self).__setattr__('deleted', True)
return True
## @brief Delete predicate. Indicates if a component can be deleted
# @return True if deletion OK else return False
def delete_check(self):
raise NotImplementedError("Virtual method")
## @brief Get the maximum rank given an EmComponent child class and a ranked_in filter
# @param ranked_in_value mixed: The rank "family"
# @return -1 if no EmComponent found else return an integer >= 0
@classmethod
def _get_max_rank(cls, ranked_in_value, dbe):
component = sql.Table(cls.table, sqlutils.meta(dbe))
req = sql.sql.select([component.c.rank]).where(getattr(component.c, cls.ranked_in) == ranked_in_value).order_by(component.c.rank.desc())
c = dbe.connect()
res = c.execute(req)
res = res.fetchone()
c.close()
if res != None:
return res['rank']
else:
return -1
## Only make a call to the class method
# @return A positive integer or -1 if no components
# @see EmComponent::_get_max_rank()
def get_max_rank(self, ranked_in_value):
return self.__class__._get_max_rank(ranked_in_value, self.db_engine)
def get_max_rank(self):
components = self.same_rank_group()
return 1 if len(components) == 0 else components[-1].rank
## Return an array of instances that are concerned by the same rank
# @return An array of instances that are concerned by the same rank
def same_rank_group(self):
components = self.model.components(self.__class__)
ranked_in = self.__class__.ranked_in
return [ c for c in components if getattr(c, ranked_in) == getattr(self, ranked_in) ]
## Set a new rank for this component
# @note This function assume that ranks are properly set from 1 to x with no gap
#
# @warning Hardcoded minimum rank
# @warning Rank modified by _fields['rank'].value
#
# @param new_rank int: The new rank
# @return True if success False if not
#
# @throw TypeError If bad argument type
# @throw ValueError if out of bound value
def set_rank(self, new_rank):
if not isinstance(new_rank, int):
raise TypeError("Excepted <class int> but got "+str(type(new_rank)))
if new_rank < 0 or new_rank > self.get_max_rank(getattr(self, self.ranked_in)):
if new_rank <= 0 or new_rank > self.get_max_rank():
raise ValueError("Invalid new rank : "+str(new_rank))
mod = new_rank - self.rank #Allow to know the "direction" of the "move"
mod = new_rank - self.rank #Indicates the "direction" of the "move"
if mod == 0: #No modifications
if mod == 0:
return True
limits = [ self.rank + ( 1 if mod > 0 else -1), new_rank ] #The range of modified ranks
limits.sort()
dbe = self.db_engine
conn = dbe.connect()
table = sqlutils.get_table(self)
for component in [ c for c in self.same_rank_group() if c.rank >= limits[0] and c.rank <= limits[1] ] :
component.rank = component.rank + ( -1 if mod > 0 else 1 )
#Selecting the components that will be modified
req = table.select().where( getattr(table.c, self.ranked_in) == getattr(self, self.ranked_in)).where(table.c.rank >= limits[0]).where(table.c.rank <= limits[1])
self.rank = new_rank
res = conn.execute(req)
if not res: #Db error... Maybe false is a bit silent for a failuer
return False
self.model.sort_components(self.__class__)
rows = res.fetchall()
pass
updated_ranks = [{'b_uid': self.uid, 'b_rank': new_rank}]
for row in rows:
updated_ranks.append({'b_uid': row['uid'], 'b_rank': row['rank'] + (-1 if mod > 0 else 1)})
req = table.update().where(table.c.uid == sql.bindparam('b_uid')).values(rank=sql.bindparam('b_rank'))
res = conn.execute(req, updated_ranks)
conn.close()
if res:
#Instance rank update
self._fields['rank'].value = new_rank
return bool(res)
## @brief Modify a rank given a sign and a new_rank
# - If sign is '=' set the rank to new_rank
# - If sign is '-' set the rank to cur_rank - new_rank
# - If sign is '+' set the rank to cur_rank + new_rank
# @param new_rank int: The new_rank or rank modifier
# @param sign str: Can be one of '=', '+', '-'
# @return True if success False if fails
# @throw TypeError If bad argument type
# @throw ValueError if out of bound value
def modify_rank(self,new_rank, sign='='):
if not isinstance(new_rank, int) or not isinstance(sign, str):
raise TypeError("Excepted <class int>, <class str>. But got "+str(type(new_rank))+", "+str(type(sign)))
if sign == '+':
return self.set_rank(self.rank + new_rank)
elif sign == '-':
return self.set_rank(self.rank - new_rank)
elif sign == '=':
return self.set_rank(new_rank)
else:
raise ValueError("Excepted one of '=', '+', '-' for sign argument, but got "+sign)
## Modify a rank given an integer modifier
# @param rank_mod int : can be a negative positive or zero integer
# @throw TypeError if rank_mod is not an integer
# @throw ValueError if rank_mod is out of bound
def modify_rank(self, rank_mod):
if not isinstance(rank_mod, int):
raise TypeError("Excepted <class int>, <class str>. But got "+str(type(rank_mod))+", "+str(type(sign)))
try:
self.set_rank(self.rank + rank_mod)
except ValueError:
raise ValueError("The rank modifier '"+str(rank_mod)+"' is out of bounds")
## @brief Return a string representation of the component
# @return A string representation of the component
@ -393,19 +201,3 @@ class EmComponent(object):
return uid
## @brief An exception class to tell that a component don't exist
class EmComponentNotExistError(Exception):
pass
## @brief Raised on uniq constraint error at creation
# This exception class is dedicated to be raised when create() method is called
# if an EmComponent with this name but different parameters allready exist
class EmComponentExistError(Exception):
pass
## @brief An exception class to tell that no ranking exist yet for the group of the object
class EmComponentRankingNotExistError(Exception):
pass

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
## @brief An exception class to tell that a component don't exist
class EmComponentNotExistError(Exception):
pass
## @brief Raised on uniq constraint error at creation
# This exception class is dedicated to be raised when create() method is called
# if an EmComponent with this name but different parameters allready exist
class EmComponentExistError(Exception):
pass
## @brief An exception class to tell that no ranking exist yet for the group of the object
class EmComponentRankingNotExistError(Exception):
pass
## @brief An exception class to return a failure reason for EmComponent.check() method
class EmComponentCheckError(Exception):
pass
class MigrationHandlerChangeError(Exception):
pass

View file

@ -1,13 +1,9 @@
#-*- coding: utf-8 -*-
from EditorialModel.components import EmComponent
from EditorialModel.fields import EmField
from EditorialModel.classes import EmClass
import EditorialModel.fieldtypes as ftypes
from Database import sqlutils
import sqlalchemy as sql
import EditorialModel
from EditorialModel.exceptions import EmComponentCheckError
## Represents groups of EmField associated with an EmClass
@ -16,38 +12,55 @@ import EditorialModel
# @see EditorialModel::fields::EmField EditorialModel::classes::EmClass
class EmFieldGroup(EmComponent):
## The database table name
table = 'em_fieldgroup'
ranked_in = 'class_id'
## List of fields
_fields = [('class_id', ftypes.EmField_integer)]
## EmFieldGroup instanciation
def __init__(self, model, uid, name, class_id, string=None, help_text=None, date_update=None, date_create=None, rank=None):
self.class_id = class_id
self.check_type('class_id', int)
super(EmFieldGroup, self).__init__(model=model, uid=uid, name=name, string=string, help_text=help_text, date_update=date_update, date_create=date_create, rank=rank)
@classmethod
## Create a new EmFieldGroup
#
# Save it in database and return an instance*
# @param name str: The name of the new EmFieldGroup
# @param em_class EmClass : An EditorialModel::classes::EmClass instance
# @param **em_component_args : @ref EditorialModel::components::create()
# @throw EmComponentExistError If an EmFieldGroup with this name allready exists
# @throw TypeError If an argument is of an unexepted type
def create(cls, name, em_class, **em_component_args):
if not isinstance(name, str):
raise TypeError("Excepting <class str> as name. But got " + str(type(name)))
## Check if the EmFieldGroup is valid
# @throw EmComponentCheckError if fails
def check(self):
super(EmFieldGroup, self).check()
em_class = self.model.component(self.class_id)
if not em_class:
raise EmComponentCheckError("class_id contains a non existing uid '%s'" % str(self.class_id))
if not isinstance(em_class, EmClass):
raise TypeError("Excepting <class EmClass> as em_class. But got "+str(type(name)))
raise EmComponentCheckError("class_id cointains an uid from a component that is not an EmClass but an %s" % type(em_class))
return super(EmFieldGroup, cls).create(name=name, class_id=em_class.uid, **em_component_args)
## Deletes a fieldgroup
# @return True if the deletion is possible, False if not
def delete_check(self):
# all the EmField objects contained in this fieldgroup should be deleted first
fieldgroup_fields = self.fields()
if len(fieldgroup_fields) > 0:
raise NotEmptyError("This Fieldgroup still contains fields. It can't be deleted then")
return True
## Get the list of associated fields
# if type_id, the fields will be filtered to represent selected fields of this EmType
# @return A list of EmField instance
def fields(self):
meta = sqlutils.meta(self.db_engine)
field_table = sql.Table(EditorialModel.fields.EmField.table, meta)
req = field_table.select(field_table.c.uid).where(field_table.c.fieldgroup_id == self.uid)
conn = self.db_engine.connect()
res = conn.execute(req)
rows = res.fetchall()
conn.close()
return [EditorialModel.fields.EmField(row['uid']) for row in rows]
def fields(self, type_id=0):
if not type_id:
fields = [field for field in self.model.components(EmField) if field.fieldgroup_id == self.uid]
else:
# for an EmType, fields have to be filtered
em_type = self.model.component(type_id)
fields = []
for field in self.model.components(EmField):
if field.fieldgroup_id != self.uid or (field.optional and field.uid not in em_type.fields_list):
continue
# don't include relational field if parent should not be included
if field.rel_field_id:
parent = self.model.component(field.rel_field_id)
if parent.optional and parent.uid not in em_type.fields_list:
continue
fields.append(field)
return fields
class NotEmptyError(Exception):
pass

View file

@ -1,14 +1,8 @@
#-*- coding: utf-8 -*-
from EditorialModel.components import EmComponent
from EditorialModel.fieldtypes import EmField_boolean, EmField_char, EmField_integer, EmField_icon, get_field_type
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.classes import EmClass
from Database import sqlutils
from Database.sqlalter import DropColumn, AddColumn
import sqlalchemy as sql
from EditorialModel.exceptions import EmComponentCheckError
import EditorialModel
## EmField (Class)
@ -16,99 +10,38 @@ import sqlalchemy as sql
# Represents one data for a lodel2 document
class EmField(EmComponent):
table = 'em_field'
ranked_in = 'fieldgroup_id'
_fields = [
('fieldtype', EmField_char),
('fieldgroup_id', EmField_integer),
('rel_to_type_id', EmField_integer),
('rel_field_id', EmField_integer),
('optional', EmField_boolean),
('internal', EmField_boolean),
('icon', EmField_icon)
]
## Create (Function)
#
# Creates a new EmField and instanciates it
#
# @static
#
# @param name str: Name of the field
# @param fieldgroup EmFieldGroup: Field group in which the field is
# @param fieldtype EmFieldType: Type of the field
# @param optional int: is the field optional ? (default=0)
# @param internal int: is the field internal ? (default=0)
# @param rel_to_type_id int: default=0
# @param rel_field_id int: default=0
# @param icon int: default=0
# @param **em_component_args : @ref EditorialModel::components::create()
#
# @throw TypeError
# @throw RuntimeError if the associated column creation fails
# @throw EmComponentExistError if an EmField with this name allready exists in this fieldgroup
# @see EmComponent::__init__()
# @staticmethod
@classmethod
def create(cls, name, fieldgroup, fieldtype, optional=0, internal=0, rel_to_type_id=0, rel_field_id=0, icon=None, **em_component_args):
created_field = super(EmField, cls).create(
name=name,
fieldgroup_id=fieldgroup.uid,
fieldtype=fieldtype.name,
optional=optional,
internal=internal,
rel_to_type_id=rel_to_type_id,
rel_field_id=rel_field_id,
icon=icon,
**em_component_args
)
if not created_field.add_field_column_to_class_table():
raise RuntimeError("Unable to create the column for the EmField " + str(created_field))
## Instanciate a new EmField
# @todo define and test type for icon and fieldtype
def __init__(self, model, uid, name, fieldgroup_id, fieldtype, optional=False, internal=False, rel_to_type_id=None, rel_field_id=None, icon='0', string=None, help_text=None, date_update=None, date_create=None, rank=None):
return created_field
self.fieldgroup_id = fieldgroup_id
self.check_type('fieldgroup_id', int)
self.fieldtype = fieldtype
self.optional = optional
self.check_type('optional', bool)
self.internal = internal
self.check_type('internal', bool)
self.rel_to_type_id = rel_to_type_id
self.check_type('rel_to_type_id', (int, type(None)))
self.rel_field_id = rel_field_id
self.check_type('rel_field_id', (int, type(None)))
self.icon = icon
super(EmField, self).__init__(model=model, uid=uid, name=name, string=string, help_text=help_text, date_update=date_update, date_create=date_create, rank=rank)
## Check if the EmField is valid
# @return True if valid False if not
def check(self):
super(EmField, self).check()
em_fieldgroup = self.model.component(self.fieldgroup_id)
if not em_fieldgroup:
raise EmComponentCheckError("fieldgroup_id contains a non existing uid : '%d'" % self.fieldgroup_id)
if not isinstance(em_fieldgroup, EditorialModel.fieldgroups.EmFieldGroup):
raise EmComponentCheckError("fieldgroup_id contains an uid from a component that is not an EmFieldGroup but a %s" % str(type(em_fieldgroup)))
## @brief Delete a field if it's not linked
# @return bool : True if deleted False if deletion aborded
# @todo Check if unconditionnal deletion is correct
def delete(self):
dbe = self.db_engine
class_table = sql.Table(self.get_class_table(), sqlutils.meta(dbe))
field_col = sql.Column(self.name)
ddl = DropColumn(class_table, field_col)
sqlutils.ddl_execute(ddl, self.db_engine)
return super(EmField, self).delete()
## add_field_column_to_class_table (Function)
#
# Adds a column representing the field in its class' table
#
# @return True in case of success, False if not
def add_field_column_to_class_table(self):
dbe = self.db_engine
fieldtype = get_field_type(self.fieldtype)
new_column = sql.Column(name=self.name, **(fieldtype.sqlalchemy_args()))
class_table = sql.Table(self.get_class_table(), sqlutils.meta(dbe))
ddl = AddColumn(class_table, new_column)
return sqlutils.ddl_execute(ddl, dbe)
## get_class_table (Function)
#
# Gets the name of the table of the class corresponding to the field
#
# @return Name of the table
def get_class_table(self):
return self.get_class().class_table_name
## @brief Get the class that contains this field
# @return An EmClass instance
def get_class(self):
#<SQL>
dbe = self.db_engine
meta = sqlutils.meta(dbe)
conn = dbe.connect()
fieldgroup_table = sql.Table(EmFieldGroup.table, meta)
req = fieldgroup_table.select().where(fieldgroup_table.c.uid == self.fieldgroup_id)
res = conn.execute(req)
row = res.fetchone()
#</SQL>
return EmClass(row['class_id'])
def delete_check(self):
return True

View file

@ -1,7 +1,6 @@
#-*- coding: utf-8 -*-
from Lodel.utils.mlstring import MlString
from sqlalchemy import Column, INTEGER, BOOLEAN, VARCHAR
import datetime
## get_field_type (Function)
@ -86,9 +85,6 @@ class EmField_integer(EmFieldType):
def sql_column(self):
return "int(11) NOT NULL"
def sqlalchemy_args(self):
return {'type_': INTEGER, 'nullable': False, 'default': 0}
## EmField_boolean (Class)
#
@ -99,7 +95,7 @@ class EmField_boolean(EmFieldType):
super(EmField_boolean, self).__init__('boolean')
def from_string(self, value):
if value:
if value and value != "0":
self.value = True
else:
self.value = False
@ -112,9 +108,6 @@ class EmField_boolean(EmFieldType):
def sql_column(self):
return "tinyint(1) DEFAULT NULL"
def sqlalchemy_args(self):
return {'type_': BOOLEAN, 'nullable': True, 'default': None}
## EmField_char (Class)
#
@ -138,9 +131,6 @@ class EmField_char(EmFieldType):
def sql_column(self):
return "varchar(250) DEFAULT NULL"
def sqlalchemy_args(self):
return {'type_': VARCHAR(250), 'nullable': True, 'default': None}
## EmField_date (Class)
#
@ -166,9 +156,6 @@ class EmField_date(EmFieldType):
def sql_column(self):
return "varchar(250) DEFAULT NULL"
def sqlalchemy_args(self):
return {'type_': VARCHAR(250), 'nullable': True, 'default': None}
## EmField_mlstring (Class)
#
@ -189,8 +176,6 @@ class EmField_mlstring(EmFieldType):
def sql_column(self):
return "varchar(250) DEFAULT NULL"
def sqlalchemy_args(self):
return {'type_': VARCHAR(250), 'nullable': True, 'default': None}
class EmField_icon(EmFieldType):
@ -209,6 +194,4 @@ class EmField_icon(EmFieldType):
def sql_column(self):
pass
def sqlalchemy_args(self):
return {'type_': INTEGER, 'nullable': True, 'default': None}

View file

@ -0,0 +1,6 @@
## @package EditorialModel.migrationhandler
# @brief Tell to an editorial model wether or not a wanted modification is possible
#
# The migration handler provide an API for the EditorialModel to ask wether or not a model modification
# is possible/allowed.
#

View file

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
## @package EditorialModel.migrationhandler.dummy
# @brief A dummy migration handler
#
# According to it every modifications are possible
#
## Manage Model changes
class DummyMigrationHandler(object):
def __init__(self, debug=False):
self.debug = debug
## @brief Record a change in the EditorialModel and indicate wether or not it is possible to make it
# @note The states ( initial_state and new_state ) contains only fields that changes
# @param uid int : The uid of the change EmComponent
# @param initial_state dict | None : dict with field name as key and field value as value. Representing the original state. None mean creation of a new component.
# @param new_state dict | None : dict with field name as key and field value as value. Representing the new state. None mean component deletion
# @throw EditorialModel.exceptions.MigrationHandlerChangeError if the change was refused
def register_change(self, uid, initial_state, new_state):
if self.debug:
print("\n##############")
print("DummyMigrationHandler debug. Changes for component with uid %d :" % uid)
if initial_state is None:
print("Component creation (uid = %d): \n\t" % uid, new_state)
elif new_state is None:
print("Component deletion (uid = %d): \n\t" % uid, initial_state)
else:
field_list = set(initial_state.keys()).union(set(new_state.keys()))
for field_name in field_list:
str_chg = "\t%s " % field_name
if field_name in initial_state:
str_chg += "'" + str(initial_state[field_name]) + "'"
else:
str_chg += " creating "
str_chg += " => "
if field_name in new_state:
str_chg += "'" + str(new_state[field_name]) + "'"
else:
str_chg += " deletion "
print(str_chg)
print("##############\n")
def register_model_state(self, state_hash):
if self.debug:
print("New EditorialModel state registered : '%s'" % state_hash)

194
EditorialModel/model.py Normal file
View file

@ -0,0 +1,194 @@
# -*- coding: utf-8 -*-
## @file editorialmodel.py
# Manage instance of an editorial model
from EditorialModel.migrationhandler.dummy import DummyMigrationHandler
from EditorialModel.classes import EmClass
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.fields import EmField
from EditorialModel.types import EmType
from EditorialModel.exceptions import EmComponentCheckError, EmComponentNotExistError, MigrationHandlerChangeError
import hashlib
## Manages the Editorial Model
class Model(object):
components_class = [EmClass, EmField, EmFieldGroup, EmType]
## Constructor
#
# @param backend unknown: A backend object instanciated from one of the classes in the backend module
def __init__(self, backend, migration_handler=None):
self.migration_handler = DummyMigrationHandler() if migration_handler is None else migration_handler
self.backend = backend
self._components = {'uids': {}, 'EmClass': [], 'EmType': [], 'EmField': [], 'EmFieldGroup': []}
self.load()
def __hash__(self):
components_dump = ""
for _, comp in self._components['uids'].items():
components_dump += str(hash(comp))
hashstring = hashlib.new('sha512')
hashstring.update(components_dump.encode('utf-8'))
return int(hashstring.hexdigest(), 16)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@staticmethod
## Given a name return an EmComponent child class
# @param class_name str : The name to identify an EmComponent class
# @return A python class or False if the class_name is not a name of an EmComponent child class
def emclass_from_name(class_name):
for cls in Model.components_class:
if cls.__name__ == class_name:
return cls
return False
@staticmethod
## Given a python class return a name
# @param cls : The python class we want the name
# @return A class name as string or False if cls is not an EmComponent child class
def name_from_emclass(em_class):
if em_class not in Model.components_class:
return False
return em_class.__name__
## Loads the structure of the Editorial Model
#
# Gets all the objects contained in that structure and creates a dict indexed by their uids
# @todo Change the thrown exception when a components check fails
# @throw ValueError When a component class don't exists
def load(self):
datas = self.backend.load()
for uid, kwargs in datas.items():
#Store and delete the EmComponent class name from datas
cls_name = kwargs['component']
del kwargs['component']
cls = self.emclass_from_name(cls_name)
if cls:
kwargs['uid'] = uid
# create a dict for the component and one indexed by uids, store instanciated component in it
self._components['uids'][uid] = cls(self, **kwargs)
self._components[cls_name].append(self._components['uids'][uid])
else:
raise ValueError("Unknow EmComponent class : '" + cls_name + "'")
#Sorting by rank
for component_class in Model.components_class:
self.sort_components(component_class)
#Check integrity
for uid, component in self._components['uids'].items():
try:
component.check()
except EmComponentCheckError as exception_object:
raise EmComponentCheckError("The component with uid %d is not valid. Check returns the following error : \"%s\"" % (uid, str(exception_object)))
#Everything is done. Indicating that the component initialisation is over
component.init_ended()
## Saves data using the current backend
def save(self):
return self.backend.save()
## Given a EmComponent child class return a list of instances
# @param cls EmComponent : A python class
# @return a list of instances or False if the class is not an EmComponent child
def components(self, cls):
key_name = self.name_from_emclass(cls)
return False if key_name is False else self._components[key_name]
## Return an EmComponent given an uid
# @param uid int : An EmComponent uid
# @return The corresponding instance or False if uid don't exists
def component(self, uid):
return False if uid not in self._components['uids'] else self._components['uids'][uid]
## Sort components by rank in Model::_components
# @param emclass pythonClass : The type of components to sort
# @throw AttributeError if emclass is not valid
def sort_components(self, component_class):
if component_class not in self.components_class:
raise AttributeError("Bad argument emclass : '" + component_class + "', excpeting one of " + str(self.components_class))
self._components[self.name_from_emclass(component_class)] = sorted(self.components(component_class), key=lambda comp: comp.rank)
## Return a new uid
# @return a new uid
def new_uid(self):
used_uid = [int(uid) for uid in self._components['uids'].keys()]
return sorted(used_uid)[-1] + 1 if len(used_uid) > 0 else 1
## Create a component from a component type and datas
#
# @note if datas does not contains a rank the new component will be added last
# @note datas['rank'] can be an integer or two specials strings 'last' or 'first'
# @param component_type str : a component type ( component_class, component_fieldgroup, component_field or component_type )
# @param datas dict : the options needed by the component creation
# @throw ValueError if datas['rank'] is not valid (too big or too small, not an integer nor 'last' or 'first' )
# @todo Handle a raise from the migration handler
def create_component(self, component_type, datas):
em_obj = self.emclass_from_name(component_type)
rank = 'last'
if 'rank' in datas:
rank = datas['rank']
del datas['rank']
datas['uid'] = self.new_uid()
em_component = em_obj(self, **datas)
self._components['uids'][em_component.uid] = em_component
self._components[self.name_from_emclass(em_component.__class__)].append(em_component)
em_component.rank = em_component.get_max_rank()
if rank != 'last':
em_component.set_rank(1 if rank == 'first' else rank)
#everything done, indicating that initialisation is over
em_component.init_ended()
#register the creation in migration handler
try:
self.migration_handler.register_change(em_component.uid, None, em_component.attr_dump)
except MigrationHandlerChangeError as exception_object:
#Revert the creation
self.components(em_component.__class__).remove(em_component)
del self._components['uids'][em_component.uid]
raise exception_object
self.migration_handler.register_model_state(hash(self))
return em_component
## Delete a component
# @param uid int : Component identifier
# @throw EmComponentNotExistError
# @todo unable uid check
# @todo Handle a raise from the migration handler
def delete_component(self, uid):
#register the deletion in migration handler
self.migration_handler.register_change(uid, self.component(uid).attr_dump, None)
em_component = self.component(uid)
if not em_component:
raise EmComponentNotExistError()
if em_component.delete_check():
self._components[self.name_from_emclass(em_component.__class__)].remove(em_component)
del self._components['uids'][uid]
#Register the new EM state
self.migration_handler.register_model_state(hash(self))
return True
## Changes the current backend
#
# @param backend unknown: A backend object
def set_backend(self, backend):
self.backend = backend
## Returns a list of all the EmClass objects of the model
def classes(self):
return list(self._components[self.name_from_emclass(EmClass)])

View file

@ -0,0 +1,53 @@
{
"1" : {
"component":"EmClass", "name":"textes", "string":"{\"fre\":\"Texte\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "classtype":"entity", "icon":"0", "sortcolumn":"rank"
},
"2" : {
"component":"EmClass", "name":"personnes", "string":"{\"fre\":\"Personnes\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "classtype":"person", "icon":"0", "sortcolumn":"rank"
},
"3" : {
"component":"EmFieldGroup", "name":"info", "string":"{\"fre\":\"Info\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"1"
},
"4" : {
"component":"EmField", "name":"titre", "string":"{\"fre\":\"Titre\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"3", "rel_to_type_id":"", "rel_field_id":"", "optional":0, "internal":"", "icon":"0"
},
"5" : {
"component":"EmType", "name":"article", "string":"{\"fre\":\"Article\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"1", "icon":"0", "sortcolumn":"rank", "fields_list":[7]
},
"6" : {
"component":"EmType", "name":"personne", "string":"{\"fre\":\"Personne\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"2", "icon":"0", "sortcolumn":"rank", "fields_list":[10]
},
"7" : {
"component":"EmField", "name":"soustitre", "string":"{\"fre\":\"Sous-titre\"}", "help_text":"{}", "rank":"2", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"3", "rel_to_type_id":"", "rel_field_id":"", "optional":1, "internal":"", "icon":"0"
},
"8" : {
"component":"EmFieldGroup", "name":"civilité", "string":"{\"fre\":\"Civilité\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"2"
},
"9" : {
"component":"EmField", "name":"nom", "string":"{\"fre\":\"Nom\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"8", "rel_to_type_id":"", "rel_field_id":"", "optional":0, "internal":"", "icon":"0"
},
"10" : {
"component":"EmField", "name":"prenom", "string":"{\"fre\":\"Preom\"}", "help_text":"{}", "rank":"2", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"8", "rel_to_type_id":"", "rel_field_id":"", "optional":1, "internal":"", "icon":"0"
},
"11" : {
"component":"EmField", "name":"auteur", "string":"{\"fre\":\"Auteur\"}", "help_text":"{}", "rank":"3", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"17", "rel_to_type_id":"6", "rel_field_id":"", "optional":1, "internal":"", "icon":"0"
},
"12" : {
"component":"EmField", "name":"adresse", "string":"{\"fre\":\"Adresse\"}", "help_text":"{}", "rank":"4", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"17", "rel_to_type_id":"", "rel_field_id":"11", "optional":0, "internal":"", "icon":"0"
},
"13" : {
"component":"EmClass", "name":"publication", "string":"{\"fre\":\"Publication\"}", "help_text":"{}", "rank":"2", "date_update":"", "date_create":"", "classtype":"entity", "icon":"0", "sortcolumn":"rank"
},
"14" : {
"component":"EmType", "name":"rubrique", "string":"{\"fre\":\"Rubrique\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"13", "icon":"0", "sortcolumn":"rank", "subordinates_list":{"parent":[5,14]}
},
"15" : {
"component":"EmFieldGroup", "name":"info", "string":"{\"fre\":\"Info\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "class_id":"13"
},
"16" : {
"component":"EmField", "name":"titre", "string":"{\"fre\":\"Titre\"}", "help_text":"{}", "rank":"1", "date_update":"", "date_create":"", "fieldtype":"", "fieldgroup_id":"15", "rel_to_type_id":"", "rel_field_id":"", "optional":0, "internal":"", "icon":"0"
},
"17" : {
"component":"EmFieldGroup", "name":"gens", "string":"{\"fre\":\"Gens\"}", "help_text":"{}", "rank":"2", "date_update":"", "date_create":"", "class_id":"1"
}
}

View file

@ -3,6 +3,7 @@
"""
import os
import logging
from unittest import TestCase
import unittest
@ -15,24 +16,32 @@ from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.types import EmType
from EditorialModel.fields import EmField
import EditorialModel.fieldtypes as fieldTypes
from EditorialModel.model import Model
from EditorialModel.backend.json_backend import EmBackendJson
from Database import sqlutils, sqlsetup
import sqlalchemy as sqla
#from Database import sqlutils, sqlsetup
#import sqlalchemy as sqla
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
EM_TEST = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'me.json')
EM_TEST_OBJECT = None
## run once for this module
# define the Database for this module (an sqlite database)
def setUpModule():
settings.LODEL2SQLWRAPPER['db']['default'] = {'ENGINE':'sqlite', 'NAME':'/tmp/testdb.sqlite'}
global EM_TEST_OBJECT
EM_TEST_OBJECT = Model(EmBackendJson(EM_TEST))
logging.basicConfig(level=logging.CRITICAL)
#settings.LODEL2SQLWRAPPER['db']['default'] = {'ENGINE':'sqlite', 'NAME':'/tmp/testdb.sqlite'}
class ClassesTestCase(TestCase):
# run before every instanciation of the class
@classmethod
def setUpClass(cls):
sqlsetup.init_db()
pass
#sqlsetup.init_db()
# run before every function of the class
def setUp(self):
@ -45,44 +54,25 @@ class ClassesTestCase(TestCase):
class TestEmClassCreation(ClassesTestCase):
# create a new EmClass, then test on it
@classmethod
def setUpClass(cls):
def test_create(self):
ClassesTestCase.setUpClass()
EmClass.create('testClass', EmClassType.entity)
testClass = EM_TEST_OBJECT.create_component(EmClass.__name__, {'name': 'testclass1', 'classtype': EmClassType.entity['name']})
# test if a table 'testClass' was created
# should be able to select on the created table
def test_table_em_classes(self):
""" Testing ability of EmClass to crate its associated table """
conn = sqlutils.get_engine().connect()
a = sqlutils.meta(conn)
try:
newtable = sqla.Table('testClass', sqlutils.meta(conn))
req = sqla.sql.select([newtable])
res = conn.execute(req)
res = res.fetchall()
conn.close()
except:
self.fail("unable to select table testClass")
self.assertEqual(res, [])
#We check the uid
self.assertEqual(testClass.uid, 18)
# the uid should be 1
def test_uid(self):
""" testing uid """
cl = EmClass('testClass')
self.assertEqual(cl.uid, 1)
# We check that the class has been added in the right list in the model object
class_components_records = EM_TEST_OBJECT.components(EmClass)
self.assertIn(testClass, class_components_records)
# the name should be the one given
def test_classname(self):
""" Testing name consistency on instanciation """
cl = EmClass('testClass')
self.assertEqual(cl.name, 'testClass')
# the name should be the one given
testClass = EM_TEST_OBJECT.component(testClass.uid)
self.assertEqual(testClass.name, 'testclass1')
# the classtype should have the name of the EmClassType
testClass = EM_TEST_OBJECT.component(testClass.uid)
self.assertEqual(testClass.classtype, EmClassType.entity['name'])
# the classtype should have the name of the EmClassType
def test_classtype(self):
""" Testing classtype consistency """
cl = EmClass('testClass')
self.assertEqual(cl.classtype, EmClassType.entity['name'])
# Testing class deletion (and associated table drop)
class TestEmClassDeletion(ClassesTestCase):

View file

@ -1,657 +1,94 @@
import os
import datetime
import time
import logging
import json
import shutil
#from django.test import TestCase
from django.conf import settings
from unittest import TestCase
import unittest
from EditorialModel.model import Model
from EditorialModel.components import EmComponent
from EditorialModel.classes import EmClass
from EditorialModel.classtypes import EmClassType
from EditorialModel.types import EmType
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.fields import EmField
from EditorialModel.components import EmComponent, EmComponentNotExistError, EmComponentExistError
import EditorialModel.fieldtypes as ftypes
from EditorialModel.test.utils import *
from Lodel.utils.mlstring import MlString
from Database import sqlutils
from Database import sqlsetup
import sqlalchemy as sqla
from EditorialModel.backend.json_backend import EmBackendJson
from EditorialModel.migrationhandler.dummy import DummyMigrationHandler
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
TEST_COMPONENT_DBNAME = 'test_em_component_db.sqlite'
#=#############=#
# TESTS SETUP #
#=#############=#
def setUpModule():
""" This function is run once for this module.
The goal are to overwrtie Db configs, and prepare objects for test_case initialisation
"""
cleanDb(TEST_COMPONENT_DBNAME)
setDbConf(TEST_COMPONENT_DBNAME)
#Disable logging but CRITICAL
logging.basicConfig(level=logging.CRITICAL)
#testDB setup
tables = sqlsetup.get_schema()
ttest = { 'name':'ttest',
'columns': [
{"name":"uid", "type":"INTEGER", "extra":{"foreignkey":"uids.uid", "nullable":False, "primarykey":True}},
{"name":"name", "type":"VARCHAR(50)", "extra":{"nullable":False, "unique":True}},
{"name":"string", "type":"TEXT"},
{"name":"help", "type":"TEXT"},
{"name":"rank", "type":"INTEGER"},
{"name":"rank_fam", "type":"VARCHAR(1)"},
{"name":"date_update", "type":"DATETIME"},
{"name":"date_create", "type":"DATETIME"}
]
}
tables.append(ttest)
globals()['tables'] = tables
#Creating db structure
initTestDb(TEST_COMPONENT_DBNAME)
setDbConf(TEST_COMPONENT_DBNAME)
sqlsetup.init_db('default', False, tables)
dbe = sqlutils.get_engine('default')
# Insertion of testings datas
conn = dbe.connect()
test_table = sqla.Table(EmTestComp.table, sqlutils.meta(dbe))
uids_table = sqla.Table('uids', sqlutils.meta(dbe))
#Creating uid for the EmTestComp
for v in ComponentTestCase.test_values:
uid = v['uid']
req = uids_table.insert(values={'uid':uid, 'table': EmTestComp.table })
conn.execute(req)
# WARNING !!! Rank has to be ordened and incremented by one for the modify_rank tests
for i in range(len(ComponentTestCase.test_values)):
ComponentTestCase.test_values[i]['date_create'] = datetime.datetime.utcnow()
ComponentTestCase.test_values[i]['date_update'] = datetime.datetime.utcnow()
ComponentTestCase.test_values[i]['rank_fam'] = '1'
req = test_table.insert(values=ComponentTestCase.test_values)
conn.execute(req)
conn.close()
saveDbState(TEST_COMPONENT_DBNAME)
logging.getLogger().setLevel(logging.CRITICAL)
pass
def tearDownModule():
cleanDb(TEST_COMPONENT_DBNAME)
"""
try:
os.unlink(TEST_COMPONENT_DBNAME)
except:pass
try:
os.unlink(TEST_COMPONENT_DBNAME+'_bck')
except:pass
"""
#A dummy EmComponent child class use to make tests
class EmTestComp(EmComponent):
table = 'ttest'
ranked_in = 'rank_fam'
_fields = [('rank_fam', ftypes.EmField_char)]
# The parent class of all other test cases for component
# It defines a SetUp function and some utility functions for EmComponent tests
class ComponentTestCase(TestCase):
test_values = [
{ 'uid': 1, 'name': 'test', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}', 'rank': 0},
{ 'uid': 2, 'name': 'test-em_comp', 'string': '{"fr":"Super test comp"}', 'help': '{}', 'rank': 1},
{ 'uid': 3, 'name': 'test2', 'string': '{}', 'help': '{}', 'rank': 2},
{ 'uid': 42, 'name': 'foo', 'string': '{"foo":"bar"}', 'help': '{"foo":"foobar"}', 'rank': 3},
{ 'uid': 84, 'name': '123', 'string': '{"num":"456"}', 'help': '{"num":"4242"}', 'rank': 4},
{ 'uid': 1025, 'name': 'name', 'string': '{}', 'help': '{}', 'rank': 5},
]
@property
def tables(self):
return globals()['tables']
class TestEmComponent(unittest.TestCase):
def setUp(self):
self.dber = sqlutils.get_engine('default')
self.test_values = self.__class__.test_values
#Db RAZ
#shutil.copyfile(TEST_COMPONENT_DBNAME+'_bck', globals()['component_test_dbfilename'])
restoreDbState(TEST_COMPONENT_DBNAME)
pass
self.me = Model(EmBackendJson('EditorialModel/test/me.json'))
def check_equals(self, excepted_val, test_comp, check_date=True, msg=''):
""" This function check that a EmTestComp has excepted_val for values """
val = excepted_val
self.assertIsInstance(test_comp, EmTestComp, msg)
for vname in val:
if vname in ['string', 'help']: #Special test for mlStrings
#MlString comparison
vml = json.loads(val[vname])
for vn in vml:
self.assertEqual(vml[vn], getattr(test_comp, vname).get(vn), msg)
elif vname in ['date_create', 'date_update']:
# Datetime comparison
if check_date:
self.assertEqualDatetime(val[vname], getattr(test_comp, vname), vname+" assertion error : "+msg)
else:
prop = vname
self.assertEqual(getattr(test_comp, prop), val[vname], msg+"Inconsistency for "+prop+" property")
pass
def assertEqualDatetime(self, d1,d2, msg=""):
""" Compare a date from the database with a datetime (that have microsecs, in db we dont have microsecs) """
self.assertTrue( d1.year == d2.year
and d1.month == d2.month
and d1.day == d2.day
and d1.hour == d2.hour
and d1.minute == d2.minute
and d1.second == d2.second, msg+" Error the two dates differs : '"+str(d1)+"' '"+str(d2)+"'")
def assertEqualMlString(self, ms1, ms2, msg=""):
""" Compare two MlStrings """
ms1t = ms1.translations
ms2t = ms2.translations
self.assertEqual(set(name for name in ms1t), set(name for name in ms2t), msg+" The two MlString hasn't the same lang list")
for n in ms1t:
self.assertEqual(ms1t[n], ms2t[n])
def run(self, result=None):
super(ComponentTestCase, self).run(result)
#=#############=#
# TESTS BEGIN #
#=#############=#
#===========================#
# EmComponent.__init__ #
#===========================#
class TestInit(ComponentTestCase):
def test_component_abstract_init(self):
""" Test not valid call (from EmComponent) of __init__ """
def test_init(self):
""" Testing that __init__ is abstract for an EmComponent """
with self.assertRaises(NotImplementedError):
test_comp = EmComponent(2)
with self.assertRaises(NotImplementedError):
test_comp = EmComponent('name')
pass
foo = EmComponent(self.me, self.me.new_uid(), 'invalid instanciation')
def test_component_init_not_exist(self):
""" Test __init__ with non existing objects """
with self.assertRaises(EmComponentNotExistError):
test_comp = EmTestComp('not_exist')
# TODO this assertion depends of the EmComponent behavior when instanciate with an ID
#with self.assertRaises(EmComponentNotExistError):
# test_comp = EmTestComp(4096)
pass
def test_component_init_uid(self):
""" Test __init__ with numerical ID """
for val in self.test_values:
test_comp = EmTestComp(val['uid'])
self.assertIsInstance(test_comp, EmTestComp)
self.assertEqual(test_comp.uid, val['uid'])
pass
def test_component_init_name(self):
""" Test __init__ with names """
for val in self.test_values:
test_comp = EmTestComp(val['name'])
self.check_equals(val, test_comp)
pass
def test_component_init_badargs(self):
for badarg in [ print, json, [], [1,2,3,4,5,6], {'hello': 'world'} ]:
with self.assertRaises(TypeError):
EmTestComp(badarg)
pass
#=======================#
# EmComponent.new_uid #
#=======================#
class TestUid(ComponentTestCase):
def test_newuid(self):
""" Test valid calls for new_uid method """
for _ in range(10):
nuid = EmTestComp.new_uid(self.dber)
def test_hashes(self):
""" Testing __hash__ and __eq__ methos """
me1 = Model(EmBackendJson('EditorialModel/test/me.json'))
me2 = Model(EmBackendJson('EditorialModel/test/me.json'), migration_handler = DummyMigrationHandler(True))
conn = self.dber.connect()
tuid = sqla.Table('uids', sqlutils.meta(self.dber))
req = sqla.select([tuid]).where(tuid.c.uid == nuid)
rep = conn.execute(req)
res = rep.fetchall()
for comp_class in [EmClass, EmType, EmField, EmFieldGroup]:
comp_l1 = me1.components(comp_class)
comp_l2 = me2.components(comp_class)
for i, comp1 in enumerate(comp_l1):
comp2 = comp_l2[i]
self.assertEqual(hash(comp1), hash(comp2), "hashes differs for two EmComponent({}) instanciated from the same backend and files".format(comp_class.__name__))
self.assertTrue(comp1 == comp2)
comp1.modify_rank(1)
self.assertNotEqual(hash(comp1), hash(comp2), "hashes are the same after a modification of rank on one of the two components")
self.assertFalse(comp1 == comp2)
comp2.modify_rank(2)
self.assertEqual(hash(comp1), hash(comp2), "hashes differs for two EmComponent({}) after applying the same modifications on both".format(comp_class.__name__))
self.assertTrue(comp1 == comp2)
def test_modify_rank(self):
""" Testing modify_rank and set_rank method """
cls = self.me.classes()[0]
orig_rank = cls.rank
cls.modify_rank(1)
self.assertEqual(orig_rank, cls.rank - 1)
cls.modify_rank(-1)
self.assertEqual(orig_rank, cls.rank)
cls.set_rank(1)
self.assertEqual(cls.rank, 1)
cls.set_rank(2)
self.assertEqual(cls.rank, 2)
max_rank = cls.get_max_rank()
cls.set_rank(max_rank)
self.assertEqual(cls.rank, max_rank)
with self.assertRaises(ValueError):
cls.modify_rank(1)
with self.assertRaises(ValueError):
cls.modify_rank(-10)
with self.assertRaises(ValueError):
cls.set_rank(0)
with self.assertRaises(ValueError):
cls.set_rank(10)
with self.assertRaises(ValueError):
cls.set_rank(-10)
def test_check(self):
""" Testing check method """
cls = self.me.classes()[0]
cls.rank = 10000
cls.check()
self.assertEqual(cls.rank, cls.get_max_rank())
cls.rank = -1000
cls.check()
self.assertEqual(cls.rank, 1)
self.assertEqual(len(res), 1, "Error when selecting : mutliple rows returned for 1 UID")
res = res[0]
self.assertEqual(res.uid, nuid, "Selected UID didn't match created uid")
self.assertEqual(res.table, EmTestComp.table, "Table not match with class table : expected '"+res.table+"' but got '"+EmTestComp.table+"'")
pass
def test_newuid_abstract(self):
""" Test not valit call for new_uid method """
with self.assertRaises(NotImplementedError):
EmComponent.new_uid(self.dber)
pass
#=======================#
# EmComponent.save #
#=======================#
class TestSave(ComponentTestCase):
def _savecheck(self, test_comp, newval):
""" Utility function for test_component_save_namechange """
test_comp2 = EmTestComp(newval['name'])
#Check if properties other than date are equals in the instance fetched from Db
self.check_equals(newval, test_comp2, check_date=False)
#Check if the date_update has been updated
self.assertTrue(newval['date_update'] < test_comp2.date_update, "The updated date_update is more in past than its previous value : old date : '"+str(newval['date_update'])+"' new date '"+str(test_comp2.date_update)+"'")
#Check if the date_create didn't change
self.assertEqualDatetime(newval['date_create'], test_comp2.date_create)
#Check if the instance fecthed from Db and the one used to call save have the same properties
for prop in ['name', 'help', 'string', 'date_update', 'date_create', 'rank' ]:
if prop in ['string', 'help']:
assertion = self.assertEqualMlString
elif prop == 'date_create':
assertion = self.assertEqualDatetime
elif prop == 'date_update':
assertion = self.assertLess
else:
assertion = self.assertEqual
assertion(getattr(test_comp, prop), getattr(test_comp2, prop), "Save don't propagate modification properly. The '"+prop+"' property hasn't the exepted value in instance fetched from Db : ")
pass
def test_component_save_setattr(self):
""" Checking save method after different changes using setattr """
val = self.test_values[0] #The row we will modify
test_comp = EmTestComp(val['name'])
self.check_equals(val, test_comp)
newval = val.copy()
time.sleep(2) # We have to sleep 2 secs here, so the update_date will be at least 2 secs more than newval['date_update']
#name change
newval['name'] = test_comp.name = 'newname'
test_comp.save()
self._savecheck(test_comp, newval)
self.assertTrue(True)
#help change
newval['help'] = '{"fr": "help fr", "en":"help en", "es":"help es"}'
test_comp.help = MlString.load(newval['help'])
test_comp.save()
self._savecheck(test_comp, newval)
self.assertTrue(True)
#string change
newval['string'] = '{"fr": "string fr", "en":"string en", "es":"string es"}'
test_comp.string = MlString.load(newval['string'])
test_comp.save()
self._savecheck(test_comp, newval)
self.assertTrue(True)
#no change
test_comp.save()
self._savecheck(test_comp, newval)
self.assertTrue(True)
#change all
test_comp.name = newval['name'] = test_comp.name = 'newnewname'
newval['help'] = '{"fr": "help fra", "en":"help eng", "es":"help esp"}'
test_comp.help = MlString.load(newval['help'])
newval['string'] = '{"fr": "string FR", "en":"string EN", "es":"string ES", "foolang":"foofoobar"}'
test_comp.string = MlString.load(newval['string'])
test_comp.save()
self._savecheck(test_comp, newval)
self.assertTrue(True)
pass
def test_component_save_illegalchanges(self):
""" checking that the save method forbids some changes """
val = self.test_values[1]
changes = { 'date_create': datetime.datetime(1982,4,2,13,37), 'date_update': datetime.datetime(1982,4,2,22,43), 'rank': 42 }
for prop in changes:
test_comp = EmTestComp(val['name'])
self.check_equals(val, test_comp, False)
with self.assertRaises(TypeError):
setattr(test_comp, prop, changes[prop])
test_comp.save()
test_comp2 = EmTestComp(val['name'])
if prop == 'date_create':
assertion = self.assertEqualDatetime
elif prop == 'date_update':
continue
else: #rank
assertion = self.assertEqual
assertion(getattr(test_comp,prop), val[prop], "When using setattr the "+prop+" of a component is set : ")
assertion(getattr(test_comp2, prop), val[prop], "When using setattr and save the "+prop+" of a loaded component is set : ")
pass
#====================#
# EmComponent.create #
#====================#
class TestCreate(ComponentTestCase):
def test_create(self):
"""Testing EmComponent.create()"""
vals = {'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'}
tc = EmTestComp.create(**vals)
self.check_equals(vals, tc, "The created EmTestComp hasn't the good properties values")
tcdb = EmTestComp('created1')
self.check_equals(vals, tc, "When fetched from Db the created EmTestComp hasn't the good properties values")
# This test assume that string and help has default values
vals = { 'name': 'created2', 'rank_fam': 'f' }
tc = EmTestComp.create(**vals)
self.check_equals(vals, tc, "The created EmTestComp hasn't the good properties values")
tcdb = EmTestComp('created1')
self.check_equals(vals, tc, "When fetched from Db the created EmTestComp hasn't the good properties values")
pass
def test_create_badargs(self):
"""Testing EmComponent.create() with bad arguments"""
with self.assertRaises(TypeError, msg="But given a function as argument"):
tc = EmTestComp.create(print)
with self.assertRaises(TypeError, msg="But values contains date_create and date_update"):
vals = { 'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en" :"help test", "fr":"test help"}', 'rank': 6, 'date_create': 0 , 'date_update': 0 }
tc = EmTestComp.create(**vals)
with self.assertRaises(TypeError, msg="But no name was given"):
vals = { 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en" :"help test", "fr":"test help"}', 'rank': 6, 'date_create': 0 , 'date_update': 0 }
tc = EmTestComp.create(**vals)
with self.assertRaises(TypeError, msg="But no rank_fam was given"):
vals = { 'name': 'created1', 'string': '{"fr":"testcomp"}', 'help': '{"en" :"help test", "fr":"test help"}', 'rank': 6, 'date_create': 0 , 'date_update': 0 }
tc = EmTestComp.create(**vals)
with self.assertRaises(TypeError, msg="But invalid keyword argument given"):
vals = {'invalid': 42, 'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'}
tc = EmTestComp.create(**vals)
pass
def test_create_existing_failure(self):
""" Testing that create fails when trying to create an EmComponent with an existing name but different properties """
vals = {'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'}
tc = EmTestComp.create(**vals)
with self.assertRaises(EmComponentExistError, msg="Should raise because attribute differs for a same name"):
vals['rank_fam'] = 'e'
EmTestComp.create(**vals)
pass
def test_create_existing(self):
""" Testing that create dont fails when trying to create twice the same EmComponent """
vals = {'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'}
tc = EmTestComp.create(**vals)
try:
tc2 = EmTestComp.create(**vals)
except EmComponentExistError as e:
self.fail("create raises but should return the existing EmComponent instance instead")
self.assertEqual(tc.uid, tc2.uid, "Created twice the same EmComponent")
pass
def testGetMaxRank(self):
old = EmTestComp._get_max_rank('f', self.dber)
EmTestComp.create(name="foobartest", rank_fam = 'f')
n = EmTestComp._get_max_rank('f', self.dber)
self.assertEqual(old+1, n, "Excepted value was "+str(old+1)+" but got "+str(n))
self.assertEqual(EmTestComp._get_max_rank('z', self.dber), -1)
pass
#====================#
# EmComponent.delete #
#====================#
class TestDelete(ComponentTestCase):
def test_delete(self):
""" Create and delete TestComponent """
vals = [
{'name': 'created1', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'},
{'name': 'created2', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'},
{'name': 'created3', 'rank_fam': 'f', 'string': '{"fr":"testcomp"}', 'help': '{"en":"help test", "fr":"test help"}'},
]
tcomps = []
for val in vals:
tcomps.append(EmTestComp.create(**val))
failmsg = "This component should be deleted"
for i,tcv in enumerate(vals):
tc = EmTestComp(tcv['name'])
tc.delete()
with self.assertRaises(EmComponentNotExistError, msg = failmsg):
tc2 = EmTestComp(tcv['name'])
with self.assertRaises(EmComponentNotExistError, msg = failmsg):
tmp = tc.uid
with self.assertRaises(EmComponentNotExistError, msg = failmsg):
tmp = tc.__str__()
with self.assertRaises(EmComponentNotExistError, msg = failmsg):
tmp = tc.name
with self.assertRaises(EmComponentNotExistError, msg = failmsg):
print(tc)
for j in range(i+1,len(vals)):
try:
tc = EmTestComp(vals[j]['name'])
except EmComponentNotExistError:
self.fail('EmComponent should not be deleted')
self.assertTrue(True)
pass
#===========================#
# EmComponent.modify_rank #
#===========================#
class TestModifyRank(ComponentTestCase):
def dump_ranks(self):
names = [ v['name'] for v in self.test_values ]
ranks=""
for i in range(len(names)):
tc = EmTestComp(names[i])
ranks += " "+str(tc.rank)
return ranks
def test_modify_rank_absolute(self):
""" Testing modify_rank with absolute rank """
names = [ v['name'] for v in self.test_values ]
nmax = len(names)-1
#moving first to 3
#-----------------
test_comp = EmTestComp(names[0])
test_comp.modify_rank(3, '=')
self.assertEqual(test_comp.rank, 3, "Called modify_rank(3, '=') but rank is '"+str(test_comp.rank)+"'. Ranks dump : "+self.dump_ranks())
tc2 = EmTestComp(names[0])
self.assertEqual(tc2.rank, 3, "Called modify_rank(3, '=') but rank is '"+str(tc2.rank)+"'. Ranks dump : "+self.dump_ranks())
for i in range(1,4):
test_comp = EmTestComp(names[i])
self.assertEqual(test_comp.rank, i-1, "Excepted rank was '"+str(i-1)+"' but found '"+str(test_comp.rank)+"'. Ranks dump : "+self.dump_ranks())
for i in [4,nmax]:
test_comp = EmTestComp(names[i])
self.assertEqual(test_comp.rank, i, "Rank wasn't excepted to change, but : previous value was '"+str(i)+"' current value is '"+str(test_comp.rank)+"'. Ranks dump : "+self.dump_ranks())
#undoing last rank change
test_comp = EmTestComp(names[0])
test_comp.modify_rank(0,'=')
self.assertEqual(test_comp.rank, 0)
tc2 = EmTestComp(names[0])
self.assertEqual(tc2.rank, 0)
#moving last to 2
#----------------
test_comp = EmTestComp(names[nmax])
test_comp.modify_rank(2, '=')
for i in [0,1]:
test_comp = EmTestComp(names[i])
self.assertEqual(test_comp.rank, i)
for i in range(3,nmax-1):
test_comp = EmTestComp(names[i])
self.assertEqual(test_comp.rank, i+1, "Excepted rank was '"+str(i+1)+"' but found '"+str(test_comp.rank)+"'. Ranks dump : "+self.dump_ranks())
#undoing last rank change
test_comp = EmTestComp(names[nmax])
test_comp.modify_rank(nmax,'=')
self.assertEqual(test_comp.rank, nmax)
#Checking that we are in original state again
for i,name in enumerate(names):
test_comp = EmTestComp(name)
self.assertEqual(test_comp.rank, i, "Excepted rank was '"+str(i-1)+"' but found '"+str(test_comp.rank)+"'. Ranks dump : "+self.dump_ranks())
#Inverting the list
#------------------
for i,name in enumerate(names):
test_comp = EmTestComp(name)
test_comp.modify_rank(0,'=')
self.assertEqual(test_comp.rank, 0)
for j in range(0,i+1):
test_comp = EmTestComp(names[j])
self.assertEqual(test_comp.rank, i-j)
for j in range(i+1,nmax+1):
test_comp = EmTestComp(names[j])
self.assertEqual(test_comp.rank, j)
pass
def test_modify_rank_relative(self):
""" Testing modify_rank with relative rank modifier """
names = [ v['name'] for v in self.test_values ]
nmax = len(names)-1
test_comp = EmTestComp(names[0])
#Running modify_rank(i,'+') and the modify_rank(i,'-') for i in range(1,nmax)
for i in range(1,nmax):
test_comp.modify_rank(i,'+')
self.assertEqual(test_comp.rank, i, "The instance (name="+names[0]+") on wich we applied the modify_rank doesn't have expected rank : expected '"+str(i)+"' but got '"+str(test_comp.rank)+"'")
test_comp2 = EmTestComp(names[0])
self.assertEqual(test_comp.rank, i, "The instance fetched in Db does'n't have expected rank : expected '"+str(i)+"' but got '"+str(test_comp.rank)+"'")
for j in range(1,i+1):
test_comp2 = EmTestComp(names[j])
self.assertEqual(test_comp2.rank, j-1, self.dump_ranks())
for j in range(i+1,nmax+1):
test_comp2 = EmTestComp(names[j])
self.assertEqual(test_comp2.rank, j, self.dump_ranks())
test_comp.modify_rank(i,'-')
self.assertEqual(test_comp.rank, 0, "The instance on wich we applied the modify_rank -"+str(i)+" doesn't have excepted rank : excepted '0' but got '"+str(test_comp.rank)+"'")
test_comp2 = EmTestComp(names[0])
self.assertEqual(test_comp.rank, 0, "The instance fetched in Db does'n't have expected rank : expected '0' but got '"+str(test_comp.rank)+"'"+self.dump_ranks())
for j in range(1,nmax+1):
test_comp2 = EmTestComp(names[j])
self.assertEqual(test_comp2.rank, j, self.dump_ranks())
test_comp = EmTestComp(names[3])
test_comp.modify_rank(2,'+')
self.assertEqual(test_comp.rank, 5)
tc2 = EmTestComp(names[3])
self.assertEqual(tc2.rank,5)
for i in [4,5]:
tc2 = EmTestComp(names[i])
self.assertEqual(tc2.rank, i-1)
for i in range(0,3):
tc2 = EmTestComp(names[i])
self.assertEqual(tc2.rank, i)
test_comp.modify_rank(2, '-')
self.assertEqual(test_comp.rank, 3)
for i in range(0,6):
tc2 = EmTestComp(names[i])
self.assertEqual(tc2.rank, i)
pass
def test_modify_rank_badargs(self):
""" Testing modify_rank with bad arguments """
names = [ v['name'] for v in self.test_values ]
tc = EmTestComp(names[3])
badargs = [
#Bad types
(('0','+'), TypeError),
((0, 43), TypeError),
((print, '='), TypeError),
((3, print), TypeError),
((0.0, '='), TypeError),
#Bad new_rank
((-1, '='), ValueError),
((-1,), ValueError),
#Bad sign
((2, 'a'), ValueError),
((1, '=='), ValueError),
((1, '+-'), ValueError),
((1, 'Hello world !'), ValueError),
#Out of bounds
((42*10**9, '+'), ValueError),
((-42*10**9, '+'), ValueError),
((len(names), '+'), ValueError),
((len(names), '-'), ValueError),
((len(names), '='), ValueError),
((4, '-'), ValueError),
((3, '+'), ValueError),
]
for (args, err) in badargs:
with self.assertRaises(err, msg="Bad arguments supplied : "+str(args)+" for a component at rank 3 but no error raised"):
tc.modify_rank(*args)
self.assertEqual(tc.rank, 3, "The function raises an error but modify the rank")
pass

View file

@ -1,40 +1,28 @@
import os
import logging
import datetime
from django.conf import settings
from unittest import TestCase
import unittest
from EditorialModel.components import EmComponent, EmComponentNotExistError
from EditorialModel.fields import EmField
from EditorialModel.classes import EmClass
from EditorialModel.classtypes import EmClassType
from EditorialModel.types import EmType
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.test.utils import *
from EditorialModel.fieldtypes import *
from EditorialModel.model import Model
from EditorialModel.backend.json_backend import EmBackendJson
from Database import sqlutils
EM_TEST = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'me.json')
EM_TEST_OBJECT = None
import sqlalchemy as sqla
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
TEST_FIELD_DBNAME = 'lodel2_test_field_db.sqlite'
## SetUpModule
#
# This function is called once for this module.
# It is designed to overwrite the database configurations, and prepare objects for test_case initialization
def setUpModule():
initTestDb(TEST_FIELD_DBNAME)
setDbConf(TEST_FIELD_DBNAME)
logging.basicConfig(level=logging.CRITICAL)
pass
global EM_TEST_OBJECT
EM_TEST_OBJECT = Model(EmBackendJson(EM_TEST))
#initTestDb(TEST_FIELD_DBNAME)
#setDbConf(TEST_FIELD_DBNAME)
#logging.basicConfig(level=logging.CRITICAL)
def tearDownModule():
cleanDb(TEST_FIELD_DBNAME)
#cleanDb(TEST_FIELD_DBNAME)
pass
@ -46,66 +34,12 @@ class FieldTestCase(TestCase):
@classmethod
def setUpClass(cls):
sqlsetup.init_db()
# Generation of the test data
testclass = EmClass.create("testclass1",EmClassType.entity)
EmField_integer()
EmFieldGroup.create('fieldgrp1',testclass)
EmType.create('testtype1',testclass)
saveDbState(TEST_FIELD_DBNAME)
pass
def setUp(self):
restoreDbState(TEST_FIELD_DBNAME)
self.testClass = EmClass("testclass1")
self.testFieldType = EmField_integer()
self.testFieldgroup = EmFieldGroup('fieldgrp1')
self.testType = EmType('testtype1')
self.test_fieldtype = 'integer'
self.test_fieldgroup = EM_TEST_OBJECT.component(3)
## Get_Field_Records (Function)
#
# Returns the list of fields corresponding to a given uid
#
# @param field EmField: EmField object
# @return Number of found records
def get_field_records(self,field):
return self._get_field_records_Db(field)
## _Get_Field_Records_Db (Function)
#
# Queries the database to get the list of fields for a given uid
#
# @param field EmField: EmField object
# @return Number of found records
def _get_field_records_Db(self,field):
dbe = field.db_engine
fieldtable = sqla.Table(EmField.table, sqlutils.meta(dbe))
conn = dbe.connect()
req = fieldtable.select().where(fieldtable.c.uid==field.uid).where(fieldtable.c.name==field.name)
res = conn.execute(req).fetchall()
return len(res)
## Get_table_columns (Function)
#
# Returns the columns list of a table
#
# @param table_name str: Name of the table
# @return list of columns
def get_table_columns(self,table_name):
return self._get_table_columns_Db(table_name)
## _Get_table_columns_Db (Function)
#
# Queries the database to get the list of columns of a table
#
# @param table_name str: Name of the table
# @return list of columns
def _get_table_columns_Db(self, table_name):
dbe = self.testClass.db_engine
table = sqla.Table(table_name, sqlutils.meta(dbe))
return table.c
## TestField (Class)
#
@ -115,46 +49,36 @@ class TestField(FieldTestCase):
## Test_create (Function)
#
# tests the creation process of a field
def testCreate(self):
""" Testing fields creation process """
'''
field_values = {
'name':'testfield1',
'fieldgroup_id' : self.testFieldgroup.uid,
'fieldtype' : self.testFieldType,
'rel_to_type_id': self.testType.uid
}
'''
field = EmField.create(name='testfield1', fieldgroup=self.testFieldgroup, fieldtype=self.testFieldType)
def test_create(self):
# We check that the field has been added in the em_field table
field_records = self.get_field_records(field)
self.assertGreater(field_records,0)
field = EM_TEST_OBJECT.create_component(EmField.__name__, {'name': 'testfield1', 'fieldgroup_id': self.test_fieldgroup.uid, 'fieldtype': self.test_fieldtype})
# We check that the field has been added as a column in the corresponding table
field_table_columns = self.get_table_columns(field.get_class_table())
field_column_args = self.testFieldType.sqlalchemy_args()
field_column_args['name']='testfield1'
field_column = sqla.Column(**field_column_args)
self.assertIn(field_column.name, field_table_columns)
pass
# We check that the field has been added
field_records = EM_TEST_OBJECT.component(field.uid)
self.assertIsNot(field_records, False)
# We check that the field has been added in the right list in the model object
field_components_records = EM_TEST_OBJECT.components(EmField)
self.assertIn(field, field_components_records)
## Test_Deletion
#
# tests the deletion process of a field
def test_deletion(self):
""" Testing fields deletion process """
fields = []
field_names = ['field1', 'field2']
# We create the two fields
for name in field_names:
EmField.create(name=name, fieldgroup=self.testFieldgroup, fieldtype = self.testFieldType)
fields.append(EM_TEST_OBJECT.create_component(EmField.__name__, {'name': name, 'fieldgroup_id': self.test_fieldgroup.uid, 'fieldtype': self.test_fieldtype}))
for i,name in enumerate(field_names):
test_field = EmField(name)
self.assertTrue(test_field.delete())
for field in fields:
# We check if the delete process was performed to the end
self.assertTrue(EM_TEST_OBJECT.delete_component(field.uid))
cols = self.get_table_columns(self.testClass.name)
for deleted_name in field_names[:i+1]:
self.assertNotIn(deleted_name, cols, "Column is not deleted")
for not_deleted_name in field_names[i+1:]:
self.assertIn(not_deleted_name, cols, "A bad column was deleted")
with self.assertRaises(EmComponentNotExistError, msg="This field should be deleted"):
EmField(name)
# We check that the field object is not in the editorial model anymore
self.assertFalse(EM_TEST_OBJECT.component(field.uid))
# We check that the field object is not in the EmField components list
field_components_records = EM_TEST_OBJECT.components(EmField)
self.assertNotIn(field, field_components_records)

View file

@ -1,28 +1,15 @@
import os
import logging
import datetime
import shutil
from django.conf import settings
from unittest import TestCase
import unittest
from EditorialModel.components import EmComponent, EmComponentNotExistError
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.classes import EmClass
from EditorialModel.types import EmType
from EditorialModel.fields import EmField
from EditorialModel.classtypes import EmClassType
from EditorialModel.fieldtypes import *
from EditorialModel.test.utils import *
from Database import sqlutils
import sqlalchemy as sqla
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
from Lodel.utils.mlstring import MlString
from EditorialModel.model import Model
from EditorialModel.backend.json_backend import EmBackendJson
#=###########=#
# TESTS SETUP #
@ -30,33 +17,24 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Lodel.settings")
TEST_FIELDGROUP_DBNAME = 'test_em_fieldgroup_db.sqlite'
EM_TEST = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'me.json')
EM_TEST_OBJECT = None
def setUpModule():
global EM_TEST_OBJECT
EM_TEST_OBJECT = Model(EmBackendJson(EM_TEST))
logging.basicConfig(level=logging.CRITICAL)
initTestDb(TEST_FIELDGROUP_DBNAME)
setDbConf(TEST_FIELDGROUP_DBNAME)
#Classes creation
EmClass.create("entity1", EmClassType.entity)
EmClass.create("entity2", EmClassType.entity)
EmClass.create("entry1", EmClassType.entry)
EmClass.create("entry2", EmClassType.entry)
EmClass.create("person1", EmClassType.person)
EmClass.create("person2", EmClassType.person)
saveDbState(TEST_FIELDGROUP_DBNAME)
#shutil.copyfile(TEST_FIELDGROUP_DBNAME, globals()['fieldgroup_test_dbfilename']+'_bck')
def tearDownModule():
cleanDb(TEST_FIELDGROUP_DBNAME)
pass
class FieldGroupTestCase(TestCase):
def setUp(self):
restoreDbState(TEST_FIELDGROUP_DBNAME)
pass
@ -67,62 +45,35 @@ class TestInit(FieldGroupTestCase):
def setUp(self):
super(TestInit, self).setUp()
dbe = sqlutils.get_engine()
conn = sqlutils.get_engine().connect()
ent1 = EmClass('entity1')
idx1 = EmClass('entry1')
self.creadate = datetime.datetime.utcnow()
#Test fieldgroup
self.tfg = [
{ 'uid': EmFieldGroup.new_uid(dbe), 'name': 'fg1', 'string': '{"fr":"Super Fieldgroup"}', 'help': '{"en":"help"}', 'rank': 0 , 'class_id': ent1.uid, 'date_create' : self.creadate, 'date_update': self.creadate},
{ 'uid': EmFieldGroup.new_uid(dbe), 'name': 'fg2', 'string': '{"fr":"Super Fieldgroup"}', 'help': '{"en":"help"}', 'rank': 1 , 'class_id': ent1.uid, 'date_create': self.creadate, 'date_update': self.creadate},
{ 'uid': EmFieldGroup.new_uid(dbe), 'name': 'fg3', 'string': '{"fr":"Super Fieldgroup"}', 'help': '{"en":"help"}', 'rank': 2 , 'class_id': idx1.uid, 'date_create': self.creadate, 'date_update': self.creadate},
self.tfgs = [
{"name": "testfg1", "string": MlString({"fre": "Gens"}), "help_text": MlString({}), "class_id": 1},
{"name": "testfg2", "string": MlString({"fre": "Gens"}), "help_text": MlString({}), "class_id": 1},
{"name": "testfg3", "string": MlString({"fre": "Civilité"}), "help_text": MlString({}), "class_id": 2}
]
req = sqla.Table('em_fieldgroup', sqlutils.meta(sqlutils.get_engine())).insert(self.tfg)
conn.execute(req)
conn.close()
pass
for tfg in self.tfgs:
fieldgroup = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, tfg)
def test_init(self):
""" Test that EmFieldgroup are correctly instanciated compare to self.tfg """
for tfg in self.tfg:
fg = EmFieldGroup(tfg['name'])
for tfg in self.tfgs:
fieldgroup = EM_TEST_OBJECT.component(tfg['uid'])
for attr in tfg:
if attr in ['string', 'help']:
v = MlString.load(tfg[attr])
else:
if attr != 'uid':
v = tfg[attr]
self.assertEqual(getattr(fg, attr), v, "The propertie '"+attr+"' fetched from Db don't match excepted value")
for tfg in self.tfg:
fg = EmFieldGroup(tfg['uid'])
for attr in tfg:
if attr in ['string', 'help']:
v = MlString.load(tfg[attr])
else:
v = tfg[attr]
self.assertEqual(getattr(fg, attr), v, "The propertie '"+attr+"' fetched from Db don't match excepted value")
pass
self.assertEqual(getattr(fieldgroup, attr), v, "The '" + attr + "' property fetched from backend doesn't match the excepted value")
def test_init_badargs(self):
""" Test that EmFieldgroup fail when bad arguments are given """
baduid = self.tfg[2]['uid'] + 4096
""" Tests that EmFieldGroup init fails when bad arguments are given"""
baduid = self.tfgs[2]['uid'] + 4096
badname = 'NonExistingName'
with self.assertRaises(EmComponentNotExistError, msg="Should raise because fieldgroup with id "+str(baduid)+" should not exist"):
fg = EmFieldGroup(baduid)
with self.assertRaises(EmComponentNotExistError, msg="Should raise because fieldgroup named "+badname+" should not exist"):
fg = EmFieldGroup(badname)
with self.assertRaises(TypeError, msg="Should raise a TypeError when crazy arguments are given"):
fg = EmFieldGroup(print)
with self.assertRaises(TypeError, msg="Should raise a TypeError when crazy arguments are given"):
fg = EmFieldGroup(['hello', 'world'])
pass
# TODO Voir si on garde le return False de Model.component() ou si on utilise plutôt une exception EmComponentNotExistError en modifiant le reste du code source pour gérer ce cas
self.assertFalse(EM_TEST_OBJECT.component(baduid), msg="Should be False because fieldgroup with id " + str(baduid) + " should not exist")
self.assertFalse(EM_TEST_OBJECT.component(badname), msg="Should be False because fieldgroup with id " + str(badname) + " should not exist")
self.assertFalse(EM_TEST_OBJECT.component(print), msg="Should be False when a function name is given as argument")
with self.assertRaises(TypeError, msg="Should raise when crazy arguments are given"):
fieldgroup = EM_TEST_OBJECT.component(['hello', 'world'])
#=====================#
# EmFieldgroup.create #
@ -130,56 +81,55 @@ class TestInit(FieldGroupTestCase):
class TestCreate(FieldGroupTestCase):
def test_create(self):
""" Does create actually create a fieldgroup ? """
"""Does create actually create a fieldgroup ?"""
params = {
'EmClass entity instance': EM_TEST_OBJECT.component(1),
'EmClass entry instance': EM_TEST_OBJECT.component(2)
}
params = { 'EmClass entity instance': EmClass('entity1'),
'EmClass entry instance': EmClass('entry1'),
'EmClass person instance': EmClass('person1'),
}
for i,param_name in enumerate(params):
for i, param_name in enumerate(params):
arg = params[param_name]
if isinstance(arg, EmClass):
cl = arg
else:
cl = EmClass(arg)
cl = EM_TEST_OBJECT.component(arg)
fgname = 'new_fg'+str(i)
fg = EmFieldGroup.create(fgname, arg)
self.assertEqual(fg.name, fgname, "EmFieldGroup.create() dont instanciate name correctly")
self.assertEqual(fg.class_id, cl.uid, "EmFieldGroup.create() dont instanciate class_id correctly")
fieldgroup_name = 'new_fg' + str(i)
fieldgroup = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': fieldgroup_name, 'class_id': arg.uid})
self.assertEqual(fieldgroup.name, fieldgroup_name, "Model.create_component() doesn't instanciate name correctly")
self.assertEqual(fieldgroup.class_id, cl.uid, "Model.create_component() doesn't instanciate class_id correctly")
nfg = EmFieldGroup(fgname)
nfg = EM_TEST_OBJECT.component(fieldgroup.uid)
#Checking object property
for fname in fg._fields:
self.assertEqual(getattr(nfg,fname), getattr(fg,fname), "Msg inconsistency when a created fieldgroup is fecthed from Db (in "+fname+" property)")
pass
# Checking object property
for fname in fieldgroup.__dict__:
self.assertEqual(getattr(nfg, fname), getattr(fieldgroup, fname), "Msg inconsistency when a created fieldgroup is fetched from the backend (in " + fname + " property)")
def test_create_badargs(self):
""" Does create fails when badargs given ? """
badargs = {
'EmClass type (not an instance)': EmClass,
'Non Existing id': 9000,
'Another component instance': EM_TEST_OBJECT.create_component(EmType.__name__, {'name': 'fooType', 'class_id': EM_TEST_OBJECT.component(1).uid}),
'A function': print
}
badargs = { 'EmClass type (not an instance)': EmClass,
'Non Existing name': 'fooClassThatDontExist',
'Non Existing Id': 4042, #Hope that it didnt exist ;)
'Another component instance': EmType.create('fooType', EmClass('entity1')),
'A function': print
}
for i,badarg_name in enumerate(badargs):
with self.assertRaises(TypeError, msg="Should raise because trying to give "+badarg_name+" as em_class"):
fg = EmFieldGroup.create('new_fg'+i, badargs[badarg_name])
for i, badarg_name in enumerate(badargs):
with self.assertRaises(TypeError, msg="Should raise because trying to give " + badarg_name + " an em_class object as value"):
fieldgroup = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': 'new_fg' + i, 'class_id': badargs[badarg_name].uid})
#Creating a fieldgroup to test duplicate name
exfg = EmFieldGroup.create('existingfg', EmClass('entity1'))
badargs = { 'an integer': (42, TypeError),
'a function': (print, TypeError),
'an EmClass': (EmClass('entry1'), TypeError),
}
# Creating a fieldgroup to test duplicate name
exfg = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': 'existingfg', 'class_id': EM_TEST_OBJECT.component(1).uid})
badargs = {
'an integer': (42, AttributeError),
'a function': (print, AttributeError),
'an EmClass': (EM_TEST_OBJECT.component(2), AttributeError)
}
for badarg_name in badargs:
(badarg,expt) = badargs[badarg_name]
with self.assertRaises(expt, msg="Should raise because trying to give "+badarg_name+" as first argument"):
fg = EmFieldGroup.create(badarg, EmClass('entity1'))
(badarg, expt) = badargs[badarg_name]
with self.assertRaises(expt, msg="Should raise because trying to give " + badarg_name + " as first argument"):
fieldgroup = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': badarg, 'class_id': EM_TEST_OBJECT.component(1).uid})
#=====================#
# EmFieldgroup.fields #
@ -188,53 +138,49 @@ class TestFields(FieldGroupTestCase):
def setUp(self):
super(TestFields, self).setUp()
self.fg1 = EmFieldGroup.create('testfg', EmClass('entity1'))
self.fg2 = EmFieldGroup.create('testfg2', EmClass('entry1'))
self.fg3 = EmFieldGroup.create('testfg3', EmClass('entry1'))
self.fg1 = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': 'testfg1', 'class_id': EM_TEST_OBJECT.component(1).uid})
self.fg2 = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': 'testfg2', 'class_id': EM_TEST_OBJECT.component(2).uid})
self.fg3 = EM_TEST_OBJECT.create_component(EmFieldGroup.__name__, {'name': 'testfg3', 'class_id': EM_TEST_OBJECT.component(1).uid})
def test_fields(self):
""" Does it returns actually associated fields ? """
#Creating fields
# Creating fields
test_fields1 = [
{ 'name': 'field1', 'fieldgroup': self.fg1, 'fieldtype': EmField_integer() },
{ 'name': 'field2', 'fieldgroup': self.fg1, 'fieldtype': EmField_integer() },
{ 'name': 'field4', 'fieldgroup': self.fg1, 'fieldtype': EmField_integer() },
{'name': 'field1', 'fieldgroup_id': self.fg1.uid, 'fieldtype': 'integer'},
{'name': 'field2', 'fieldgroup_id': self.fg1.uid, 'fieldtype': 'integer'},
{'name': 'field4', 'fieldgroup_id': self.fg1.uid, 'fieldtype': 'integer'}
]
test_fields2 = [
{ 'name': 'field3', 'fieldgroup': self.fg2, 'fieldtype': EmField_integer() },
test_fields2 = [
{'name': 'field3', 'fieldgroup_id': self.fg2.uid, 'fieldtype': 'integer'}
]
excepted1 = []
expected1 = []
for finfo in test_fields1:
f = EmField.create(**finfo)
excepted1.append(f.uid)
field = EM_TEST_OBJECT.create_component(EmField.__name__, finfo)
expected1.append(field.uid)
for finfo in test_fields2:
f = EmField.create(**finfo)
field = EM_TEST_OBJECT.create_component(EmField.__name__, finfo)
excepted1 = set(excepted1)
expected1 = set(expected1)
tests = {
'newly': EmFieldGroup('testfg'),
'old' : self.fg1
'newly': EM_TEST_OBJECT.component(self.fg1.uid),
'old': self.fg1
}
for name in tests:
fg = tests[name]
flist = fg.fields()
fieldgroup = tests[name]
flist = fieldgroup.fields()
res = []
for f in flist:
res.append(f.uid)
self.assertEqual(set(res), set(excepted1))
for field in flist:
res.append(field.uid)
self.assertEqual(set(res), set(expected1))
def test_empty_fields(self):
""" Testing fields method on an empty fieldgroup """
fg = self.fg3
flist = fg.fields()
self.assertEqual(len(flist), 0)
pass
fieldgroup = self.fg3
fields_list = fieldgroup.fields()
self.assertEqual(len(fields_list), 0)

View file

@ -0,0 +1,67 @@
import unittest
from EditorialModel.model import Model
from EditorialModel.classes import EmClass
from EditorialModel.types import EmType
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.fields import EmField
from EditorialModel.backend.json_backend import EmBackendJson
from EditorialModel.migrationhandler.dummy import DummyMigrationHandler
class TestModel(unittest.TestCase):
def setUp(self):
self.me = Model(EmBackendJson('EditorialModel/test/me.json'))
def test_init(self):
""" Instanciation test """
model = Model(EmBackendJson('EditorialModel/test/me.json'))
self.assertTrue(isinstance(model, Model))
model = Model(EmBackendJson('EditorialModel/test/me.json'), migration_handler = DummyMigrationHandler(True))
self.assertTrue(isinstance(model, Model))
def test_components(self):
""" Test components fetching """
for comp_class in [EmClass, EmType, EmField, EmFieldGroup]:
comp_l = self.me.components(comp_class)
for component in comp_l:
self.assertTrue(isinstance(component, comp_class), "Model.components method don't return EmComponent of the right type. Asked for {} but got {}".format(type(comp_class), type(component)))
def test_sort_components(self):
""" Test that Model.sort_components method actually sort components """
#disordering an EmClass
cl_l = self.me.components(EmClass)
last_class = cl_l[0]
last_class.rank = 10000
self.me.sort_components(EmClass)
self.assertEqual(self.me._components['EmClass'][-1].uid, last_class.uid, "The methode sort_components don't really sort by rank")
def test_new_uid(self):
""" Test that Model.new_uid return a new uniq uid """
new_uid = self.me.new_uid()
self.assertNotIn(new_uid, self.me._components['uids'].keys())
def test_hash(self):
""" Test that __hash__ and __eq__ works properly on models """
me = Model(EmBackendJson('EditorialModel/test/me.json'))
me2 = Model(EmBackendJson('EditorialModel/test/me.json'), migration_handler = DummyMigrationHandler(True))
self.assertEqual(hash(me), hash(me2), "When instanciate from the same backend & file but with another migration handler the hashes differs")
self.assertTrue(me, me2)
cl_l = me.classes()
cl_l[0].modify_rank(1)
self.assertNotEqual(hash(me), hash(me2), "After a class rank modification the hashes are the same")
self.assertFalse(me, me2)
cl_l = me2.classes()
cl_l[0].modify_rank(1)
self.assertEqual(hash(me), hash(me2), "After doing sames modifications in the two models the hashes differs")
self.assertTrue(me, me2)

View file

@ -1,13 +1,13 @@
#-*- coding: utf-8 -*-
from Database import sqlutils
import sqlalchemy as sql
# from Database import sqlutils
# import sqlalchemy as sql
import EditorialModel
from EditorialModel.components import EmComponent
from EditorialModel.fieldgroups import EmFieldGroup
from EditorialModel.fields import EmField
from EditorialModel.classtypes import EmClassType
from EditorialModel.classtypes import EmClassType, EmNature
from EditorialModel.exceptions import *
import EditorialModel.fieldtypes as ftypes
import EditorialModel.classes
@ -20,17 +20,32 @@ import EditorialModel.classes
# @see EditorialModel::components::EmComponent
# @todo sortcolumn handling
class EmType(EmComponent):
table = 'em_type'
table_hierarchy = 'em_type_hierarchy'
ranked_in = 'class_id'
## @brief Specific EmClass fields
# @see EditorialModel::components::EmComponent::_fields
_fields = [
('class_id', ftypes.EmField_integer),
('icon', ftypes.EmField_icon),
('sortcolumn', ftypes.EmField_char)
]
## Instanciate a new EmType
# @todo define and check types for icon and sortcolumn
# @todo better check self.subordinates
def __init__(self, model, uid, name, class_id, fields_list = [], subordinates_list = {}, icon = '0', sortcolumn = 'rank', string = None, help_text = None, date_update = None, date_create = None, rank = None):
self.class_id = class_id
self.check_type('class_id', int)
self.fields_list = fields_list
self.check_type('fields_list', list)
for l_uid in self.fields_list:
if not isinstance(l_uid, int):
raise AttributeError("Excepted fields_list to be a list of integers, but found a +"+str(type(l_uid))+" in it")
self.subordinates_list = subordinates_list
self.check_type('subordinates_list', dict)
for nature, uids in self.subordinates_list.items():
if nature not in [EmNature.PARENT, EmNature.TRANSLATION, EmNature.IDENTITY]:
raise AttributeError("Nature '%s' of subordinates is not allowed !" % nature)
for uid in uids:
if not isinstance(uid, int):
raise AttributeError("Excepted subordinates of nature '%s' to be a list int !" % nature)
self.icon = icon
self.sortcolumn = sortcolumn
super(EmType, self).__init__(model=model, uid=uid, name=name, string=string, help_text=help_text, date_update=date_update, date_create=date_create, rank=rank)
@classmethod
## Create a new EmType and instanciate it
@ -45,18 +60,17 @@ class EmType(EmComponent):
def create(cls, name, em_class, sortcolumn='rank', **em_component_args):
return super(EmType, cls).create(name=name, class_id=em_class.uid, sortcolumn=sortcolumn, **em_component_args)
@property
## Return an sqlalchemy table for type hierarchy
# @return sqlalchemy em_type_hierarchy table object
# @todo Don't hardcode table name
def _table_hierarchy(self):
return sql.Table(self.__class__.table_hierarchy, sqlutils.meta(self.db_engine))
@property
## Return the EmClassType of the type
# @return EditorialModel.classtypes.*
def classtype(self):
return getattr(EmClassType, EditorialModel.classes.EmClass(self.class_id).classtype)
return getattr(EmClassType, self.em_class.classtype)
@property
## Return an instance of the class this type belongs to
# @return EditorialModel.EmClass
def em_class(self):
return self.model.component(self.class_id)
## @brief Delete an EmType
# The deletion is only possible if a type is not linked by any EmClass
@ -64,57 +78,32 @@ class EmType(EmComponent):
# @return True if delete False if not deleted
# @todo Check if the type is not linked by any EmClass
# @todo Check if there is no other ''non-deletion'' conditions
def delete(self):
subs = self.subordinates()
if sum([len(subs[subnat]) for subnat in subs]) > 0:
def delete_check(self):
if sum(self.subordinates_list) > 0:
return False
#Delete all relation with superiors
for nature, sups in self.superiors().items():
for sup in sups:
self.del_superior(sup, nature)
return super(EmType, self).delete()
return True
## Get the list of associated fieldgroups
## Get the list of non empty associated fieldgroups
# @return A list of EmFieldGroup instance
def field_groups(self):
meta = sqlutils.meta(self.db_engine)
fg_table = sql.Table(EmFieldGroup.table, meta)
req = fg_table.select(fg_table.c.uid).where(fg_table.c.class_id == self.class_id)
conn = self.db_engine.connect()
res = conn.execute(req)
rows = res.fetchall()
conn.close()
return [EmFieldGroup(row['uid']) for row in rows]
## Get the list of all Emfield possibly associated with this type
# @return A list of EmField instance
def all_fields(self):
res = []
for fieldgroup in self.field_groups():
res += fieldgroup.fields()
return res
def fieldgroups(self):
fieldgroups = [fieldgroup for fieldgroup in self.em_class.fieldgroups() if len(fieldgroup.fields(self.uid))]
return fieldgroups
## Return selected optional field
# @return A list of EmField instance
def selected_fields(self):
dbe = self.db_engine
meta = sqlutils.meta(dbe)
conn = dbe.connect()
table = sql.Table('em_field_type', meta)
res = conn.execute(table.select().where(table.c.type_id == self.uid))
return [EditorialModel.fields.EmField(row['field_id']) for row in res.fetchall()]
selected = [self.model.component(field_id) for field_id in self.fields_list]
return selected
## Return the list of associated fields
# @return A list of EmField instance
def fields(self):
result = list()
for field in self.all_fields():
if not field.optional:
result.append(field)
return result + self.selected_fields()
fields = [field for fieldgroup in self.fieldgroups() for field in fieldgroup.fields(self.uid)]
return fields
## Select_field (Function)
#
@ -127,7 +116,9 @@ class EmType(EmComponent):
# @throw ValueError if field is not optional or is not associated with this type
# @see EmType::_opt_field_act()
def select_field(self, field):
return self._opt_field_act(field, True)
if field.uid in self.fields_list:
return True
return self._change_field_list(field, True)
## Unselect_field (Function)
#
@ -140,7 +131,9 @@ class EmType(EmComponent):
# @throw ValueError if field is not optional or is not associated with this type
# @see EmType::_opt_field_act()
def unselect_field(self, field):
return self._opt_field_act(field, False)
if field.uid not in self.fields_list:
return True
return self._change_field_list(field, False)
## @brief Select or unselect an optional field
# @param field EmField: The EmField to select or unselect
@ -149,27 +142,20 @@ class EmType(EmComponent):
#
# @throw TypeError if field is not an EmField instance
# @throw ValueError if field is not optional or is not associated with this type
def _opt_field_act(self, field, select=True):
def _change_field_list(self, field, add=True): # TODO voir si on conserve l'argument "select"
if not isinstance(field, EmField):
raise TypeError("Excepted <class EmField> as field argument. But got " + str(type(field)))
if not field in self.all_fields():
if not field in self.em_class.fields():
raise ValueError("This field is not part of this type")
if not field.optional:
raise ValueError("This field is not optional")
dbe = self.db_engine
meta = sqlutils.meta(dbe)
conn = dbe.connect()
table = sql.Table('em_field_type', meta)
if select:
req = table.insert({'type_id': self.uid, 'field_id': field.uid})
if add:
self.fields_list.append(field.uid)
else:
req = table.delete().where(table.c.type_id == self.uid and table.c.field_id == field.uid)
self.fields_list.remove(field.uid)
res = conn.execute(req)
conn.close()
return bool(res)
return True
## Get the list of associated hooks
# @note Not conceptualized yet
@ -200,7 +186,7 @@ class EmType(EmComponent):
# EmType instance
# @throw RuntimeError if a nature fetched from db is not valid
def subordinates(self):
return self._sub_or_sup(False)
return { nature: [ self.model.component(tuid) for tuid in self.subordinates_list[nature] ] for nature in self.subordinates_list }
## @brief Get the list of subordinates EmType
# Get a list of EmType instance that have this EmType for superior
@ -208,43 +194,17 @@ class EmType(EmComponent):
# EmType instance
# @throw RuntimeError if a nature fetched from db is not valid
# @see EmType::_sub_or_sup()
# @todo reimplementation needed
def superiors(self):
return self._sub_or_sup(True)
## @brief Return the list of subordinates or superiors for an EmType
# This is the logic function that implements EmType::subordinates() and EmType::superiors()
# @param sup bool: If True returns superiors, if False returns..... subordinates
# @return A dict with relation nature as keys and list of subordinates/superiors as values
# @throw RunTimeError if a nature fetched from db is not valid
# @see EmType::subordinates(), EmType::superiors()
def _sub_or_sup(self, sup=True):
conn = self.db_engine.connect()
htable = self._table_hierarchy
type_table = sqlutils.get_table(self)
req = htable.select()
if sup:
col = htable.c.subordinate_id
else:
col = htable.c.superior_id
req = req.where(col == self.uid)
res = conn.execute(req)
rows = res.fetchall()
conn.close()
result = dict()
for nature in EmClassType.natures(self.classtype['name']):
result[nature] = []
for row in rows:
if row['nature'] not in result:
#Maybe security issue ?
raise RuntimeError("Unreconized nature from database : "+row['nature'])
to_fetch = 'superior_id' if sup else 'subordinate_id'
result[row['nature']].append(EmType(row[to_fetch]))
return result
superiors = {}
for em_type in self.model.components(EmType):
for nature, field_uids in em_type.subordinates_list.items():
if self.uid in field_uids:
if nature in superiors:
superiors[nature].append(em_type)
else:
superiors[nature] = [em_type]
return superiors
## Add a superior in the type hierarchy
# @param em_type EmType: An EmType instance
@ -268,21 +228,22 @@ class EmType(EmComponent):
elif self.name != em_type.name:
raise ValueError("Not allowed to put a different em_type as superior in a relation of nature '" + relation_nature + "'")
conn = self.db_engine.connect()
htable = self._table_hierarchy
values = {'subordinate_id': self.uid, 'superior_id': em_type.uid, 'nature': relation_nature}
req = htable.insert(values=values)
try:
conn.execute(req)
except sql.exc.IntegrityError:
ret = False
else:
ret = True
finally:
conn.close()
return ret
# TODO Réimplémenter
# conn = self.db_engine.connect()
# htable = self._table_hierarchy
# values = {'subordinate_id': self.uid, 'superior_id': em_type.uid, 'nature': relation_nature}
# req = htable.insert(values=values)
#
# try:
# conn.execute(req)
# except sql.exc.IntegrityError:
# ret = False
# else:
# ret = True
# finally:
# conn.close()
#
# return ret
## Delete a superior in the type hierarchy
# @param em_type EmType: An EmType instance
@ -293,34 +254,94 @@ class EmType(EmComponent):
if relation_nature not in EmClassType.natures(self.classtype['name']):
raise ValueError("Invalid nature for add_superior : '" + relation_nature + "'. Allowed relations for this type are " + str(EmClassType.natures(self.classtype['name'])))
conn = self.db_engine.connect()
htable = self._table_hierarchy
req = htable.delete(htable.c.superior_id == em_type.uid and htable.c.nature == relation_nature)
conn.execute(req)
conn.close()
# TODO Réimplémenter
# conn = self.db_engine.connect()
# htable = self._table_hierarchy
# req = htable.delete(htable.c.superior_id == em_type.uid and htable.c.nature == relation_nature)
# conn.execute(req)
# conn.close()
## @brief Get the list of linked type
# Types are linked with special fields called relation_to_type fields
# @return a list of EmType
# @see EmFields
def linked_types(self):
return self._linked_types_db()
return self._linked_types_db() # TODO changer l'appel
## @brief Return the list of all the types linked to this type, should they be superiors or subordinates
# @return A list of EmType objects
def _linked_types_db(self):
conn = self.db_engine.connect()
htable = self._table_hierarchy
req = htable.select(htable.c.superior_id, htable.c.subordinate_id)
req = req.where(sql.or_(htable.c.subordinate_id == self.uid, htable.c.superior_id == self.uid))
# def _linked_types_db(self):
# conn = self.db_engine.connect()
# htable = self._table_hierarchy
# req = htable.select(htable.c.superior_id, htable.c.subordinate_id)
# req = req.where(sql.or_(htable.c.subordinate_id == self.uid, htable.c.superior_id == self.uid))
#
# res = conn.execute(req)
# rows = res.fetchall()
# conn.close()
#
# rows = dict(zip(rows.keys(), rows))
# result = []
# for row in rows:
# result.append(EmType(row['subordinate_id'] if row['superior_id'] == self.uid else row['superior_id']))
#
# return result
## Checks if the EmType is valid
# @throw EmComponentCheckError if check fails
def check(self):
super(EmType, self).check()
em_class = self.model.component(self.class_id)
if not em_class:
raise EmComponentCheckError("class_id contains an uid that does not exists '%d'" % self.class_id)
if not isinstance(em_class, EditorialModel.classes.EmClass):
raise EmComponentCheckError("class_id contains an uid from a component that is not an EmClass but a %s" % str(type(emc_class)))
for i,fuid in enumerate(self.fields_list):
field = self.model.component(fuid)
if not field:
raise EmComponentCheckError("The element %d of selected_field is a non existing uid '%d'"%(i, fuid))
if not isinstance(field, EmField):
raise EmComponentCheckError("The element %d of selected_field is not an EmField but a %s" % (i, str(type(field)) ))
if not field.optional:
raise EmComponentCheckError("The element %d of selected_field is an EmField not optional" % i )
if field.fieldgroup_id not in [ fg.uid for fg in self.fieldgroups() ]:
raise EmComponentCheckErrro("The element %d of selected_field is an EmField that is part of an EmFieldGroup that is not associated with this EmType" % i)
for nature in self.subordinates_list:
for i, tuid in enumerate(self.subordinates_list[nature]):
em_type = self.model.component(tuid)
if not em_type:
raise EmComponentCheckError("The element %d of subordinates contains a non existing uid '%d'" % (i, tuid))
if not isinstance(em_type, EmType):
raise EmComponentCheckError("The element %d of subordinates contains a component that is not an EmType but a %s" % (i, str(type(em_type))))
if nature not in EmClassType.natures(self.em_class.classtype):
raise EmComponentCheckError("The relation nature '%s' of the element %d of subordinates is not valid for this EmType classtype '%s'", (nature, i, self.classtype) )
nat_spec = getattr(EmClassType, self.em_class.classtype)['hierarchy'][nature]
if nat_spec['attach'] == 'classtype':
if self.classtype != em_type.classtype:
raise EmComponentCheckError("The element %d of subordinates is of '%s' classtype. But the current type is of '%s' classtype, and relation nature '%s' require two EmType of same classtype" % (i, em_type.classtype, self.classtype, nature) )
elif nat_spec['attach'] == 'type':
if self.uid != em_type.uid:
raise EmComponentCheckError("The element %d of subordinates is a different EmType. But the relation nature '%s' require the same EmType" % (i, nature))
else:
raise NotImplementedError("The nature['attach'] '%s' is not implemented in this check !" % nat_spec['attach'])
if 'max_depth' in nat_spec and nat_spec['max_depth'] > 0:
depth = 1
cur_type = em_type
while depth >= nat_spec['max_depth']:
depth +=1
if len(cur_type.subordinates()[nature]) == 0:
break
else:
raise EmComponentCheckError("The relation with the element %d of subordinates has a depth superior than the maximum depth ( %d ) allowed by the relation's nature ( '%s' )" %( i, nat_spec['max_depth'], nature) )
for nature in self.subordinates():
nat_spec = getattr(EmClassType, self.em_class.classtype)['hierarchy'][nature]
if 'max_child' in nat_spec and nat_spec['max_child'] > 0:
if len(self.subordinates()[nature]) > nat_spec['max_child']:
raise EmComponentCheckError("The EmType has more child than allowed in the relation's nature : %d > %d" (len(self.subordinates()[nature], nat_spec['max_child'])))
#pass
res = conn.execute(req)
rows = res.fetchall()
conn.close()
rows = dict(zip(rows.keys(), rows))
result = []
for row in rows:
result.append(EmType(row['subordinate_id'] if row['superior_id'] == self.uid else row['superior_id']))
return result

View file

@ -1,4 +1 @@
Django==1.7.8
psycopg2==2.6
SQLAlchemy==1.0.4
PyMySQL==0.6.6