No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 38KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel.context import LodelContext
  11. LodelContext.expose_modules(globals(), {
  12. 'lodel.logger': 'logger',
  13. 'lodel.leapi.leobject': ['CLASS_ID_FIELDNAME'],
  14. 'lodel.leapi.datahandlers.base_classes': ['Reference', 'MultipleRef'],
  15. 'lodel.exceptions': ['LodelException', 'LodelFatalError'],
  16. 'lodel.plugin.datasource_plugin': ['AbstractDatasource']})
  17. from . import utils
  18. from .exceptions import *
  19. from .utils import object_collection_name, collection_name, \
  20. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  21. ##@brief Datasource class
  22. #@ingroup plugin_mongodb_datasource
  23. class MongoDbDatasource(AbstractDatasource):
  24. ##@brief Stores existing connections
  25. #
  26. #The key of this dict is a hash of the connection string + ro parameter.
  27. #The value is a dict with 2 keys :
  28. # - conn_count : the number of instanciated datasource that use this
  29. #connection
  30. # - db : the pymongo database object instance
  31. _connections = dict()
  32. ##@brief Mapping from lodel2 operators to mongodb operator
  33. lodel2mongo_op_map = {
  34. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  35. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  36. ##@brief List of mongodb operators that expect re as value
  37. mongo_op_re = ['$in', '$nin']
  38. wildcard_re = re.compile('[^\\\\]\*')
  39. ##@brief instanciates a database object given a connection name
  40. #@param host str : hostname or IP
  41. #@param port int : mongodb listening port
  42. #@param db_name str
  43. #@param username str
  44. #@param password str
  45. #@param read_only bool : If True the Datasource is for read only, else the
  46. #Datasource is write only !
  47. def __init__(self, host, port, db_name, username, password, read_only = False):
  48. ##@brief Connections infos that can be kept securly
  49. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  50. ##@brief Is the instance read only ? (if not it's write only)
  51. self.__read_only = bool(read_only)
  52. ##@brief Uniq ID for mongodb connection
  53. self.__conn_hash= None
  54. ##@brief Stores the database cursor
  55. self.database = self.__connect(
  56. username, password, db_name, self.__read_only)
  57. ##@brief Destructor that attempt to close connection to DB
  58. #
  59. #Decrease the conn_count of associated MongoDbDatasource::_connections
  60. #item. If it reach 0 close the connection to the db
  61. #@see MongoDbDatasource::__connect()
  62. def __del__(self):
  63. self._connections[self.__conn_hash]['conn_count'] -= 1
  64. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  65. self._connections[self.__conn_hash]['db'].close()
  66. del(self._connections[self.__conn_hash])
  67. logger.info("Closing connection to database")
  68. ##@brief Provide a new uniq numeric ID
  69. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  70. #have to be uniq
  71. #@warning multiple UID broken by this method
  72. #@return an integer
  73. def new_numeric_id(self, emcomp):
  74. target = emcomp.uid_source()
  75. tuid = target._uid[0] # Multiple UID broken here
  76. results = self.select(
  77. target, field_list = [tuid], filters = [],
  78. order=[(tuid, 'DESC')], limit = 1)
  79. if len(results) == 0:
  80. return 1
  81. return results[0][tuid]+1
  82. ##@brief returns a selection of documents from the datasource
  83. #@param target Emclass
  84. #@param field_list list
  85. #@param filters list : List of filters
  86. #@param relational_filters list : List of relational filters
  87. #@param order list : List of column to order. ex: order =
  88. #[('title', 'ASC'),]
  89. #@param group list : List of tupple representing the column used as
  90. #"group by" fields. ex: group = [('title', 'ASC'),]
  91. #@param limit int : Number of records to be returned
  92. #@param offset int: used with limit to choose the start record
  93. #@return list
  94. #@todo Implement group for abstract LeObject childs
  95. def select(self, target, field_list, filters = None,
  96. relational_filters=None, order=None, group=None, limit=None,
  97. offset=0):
  98. if target.is_abstract():
  99. #Reccursiv calls for abstract LeObject child
  100. results = self.__act_on_abstract(target, filters,
  101. relational_filters, self.select, field_list = field_list,
  102. order = order, group = group, limit = limit)
  103. #Here we may implement the group
  104. #If sorted query we have to sort again
  105. if order is not None:
  106. key_fun = functools.cmp_to_key(
  107. self.__generate_lambda_cmp_order(order))
  108. results = sorted(results, key=key_fun)
  109. #If limit given apply limit again
  110. if offset > len(results):
  111. results = list()
  112. else:
  113. if limit is not None:
  114. if limit + offset > len(results):
  115. limit = len(results)-offset-1
  116. results = results[offset:offset+limit]
  117. return results
  118. # Default behavior
  119. if filters is None:
  120. filters = list()
  121. if relational_filters is None:
  122. relational_filters = list()
  123. collection_name = object_collection_name(target)
  124. collection = self.database[collection_name]
  125. query_filters = self.__process_filters(
  126. target, filters, relational_filters)
  127. query_result_ordering = None
  128. if order is not None:
  129. query_result_ordering = utils.parse_query_order(order)
  130. if group is None:
  131. if field_list is None:
  132. field_list = dict()
  133. else:
  134. f_list=dict()
  135. for fl in field_list:
  136. f_list[fl] = 1
  137. field_list = f_list
  138. field_list['_id'] = 0
  139. cursor = collection.find(
  140. spec = query_filters,
  141. fields=field_list,
  142. skip=offset,
  143. limit=limit if limit != None else 0,
  144. sort=query_result_ordering)
  145. else:
  146. pipeline = list()
  147. unwinding_list = list()
  148. grouping_dict = OrderedDict()
  149. sorting_list = list()
  150. for group_param in group:
  151. field_name = group_param[0]
  152. field_sort_option = group_param[1]
  153. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  154. unwinding_list.append({'$unwind': '$%s' % field_name})
  155. grouping_dict[field_name] = '$%s' % field_name
  156. sorting_list.append((field_name, sort_option))
  157. sorting_list.extends(query_result_ordering)
  158. pipeline.append({'$match': query_filters})
  159. if field_list is not None:
  160. pipeline.append({
  161. '$project': SON([{field_name: 1}
  162. for field_name in field_list])})
  163. pipeline.extend(unwinding_list)
  164. pipeline.append({'$group': grouping_dict})
  165. pipeline.extend({'$sort': SON(sorting_list)})
  166. if offset > 0:
  167. pipeline.append({'$skip': offset})
  168. if limit is not None:
  169. pipeline.append({'$limit': limit})
  170. results = list()
  171. for document in cursor:
  172. results.append(document)
  173. return results
  174. ##@brief Deletes records according to given filters
  175. #@param target Emclass : class of the record to delete
  176. #@param filters list : List of filters
  177. #@param relational_filters list : List of relational filters
  178. #@return int : number of deleted records
  179. def delete(self, target, filters, relational_filters):
  180. if target.is_abstract():
  181. logger.debug("Delete called on %s filtered by (%s,%s). Target is \
  182. abstract, preparing reccursiv calls" % (target, filters, relational_filters))
  183. #Deletion with abstract LeObject as target (reccursiv calls)
  184. return self.__act_on_abstract(target, filters,
  185. relational_filters, self.delete)
  186. logger.debug("Delete called on %s filtered by (%s,%s)." % (
  187. target, filters, relational_filters))
  188. #Non abstract beahavior
  189. mongo_filters = self.__process_filters(
  190. target, filters, relational_filters)
  191. #Updating backref before deletion
  192. self.__update_backref_filtered(target, filters, relational_filters,
  193. None)
  194. res = self.__collection(target).remove(mongo_filters)
  195. return res['n']
  196. ##@brief updates records according to given filters
  197. #@param target Emclass : class of the object to insert
  198. #@param filters list : List of filters
  199. #@param relational_filters list : List of relational filters
  200. #@param upd_datas dict : datas to update (new values)
  201. #@return int : Number of updated records
  202. def update(self, target, filters, relational_filters, upd_datas):
  203. self._data_cast(upd_datas)
  204. #fetching current datas state
  205. mongo_filters = self.__process_filters(
  206. target, filters, relational_filters)
  207. old_datas_l = self.__collection(target).find(
  208. mongo_filters)
  209. old_datas_l = list(old_datas_l)
  210. #Running update
  211. res = self.__update_no_backref(target, filters, relational_filters,
  212. upd_datas)
  213. #updating backref
  214. self.__update_backref_filtered(target, filters, relational_filters,
  215. upd_datas, old_datas_l)
  216. return res
  217. ##@brief Designed to be called by backref update in order to avoid
  218. #infinite updates between back references
  219. #@see update()
  220. def __update_no_backref(self, target, filters, relational_filters,
  221. upd_datas):
  222. logger.debug("Update called on %s filtered by (%s,%s) with datas \
  223. %s" % (target, filters, relational_filters, upd_datas))
  224. if target.is_abstract():
  225. #Update using abstract LeObject as target (reccursiv calls)
  226. return self.__act_on_abstract(target, filters,
  227. relational_filters, self.update, upd_datas = upd_datas)
  228. #Non abstract beahavior
  229. mongo_filters = self.__process_filters(
  230. target, filters, relational_filters)
  231. self._data_cast(upd_datas)
  232. mongo_arg = {'$set': upd_datas }
  233. res = self.__collection(target).update(mongo_filters, mongo_arg)
  234. return res['n']
  235. ## @brief Inserts a record in a given collection
  236. # @param target Emclass : class of the object to insert
  237. # @param new_datas dict : datas to insert
  238. # @return the inserted uid
  239. def insert(self, target, new_datas):
  240. self._data_cast(new_datas)
  241. logger.debug("Insert called on %s with datas : %s"% (
  242. target, new_datas))
  243. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  244. if uidname not in new_datas:
  245. raise MongoDataSourceError("Missing UID data will inserting a new \
  246. %s" % target.__class__)
  247. res = self.__collection(target).insert(new_datas)
  248. self.__update_backref(target, new_datas[uidname], None, new_datas)
  249. return str(res)
  250. ## @brief Inserts a list of records in a given collection
  251. # @param target Emclass : class of the objects inserted
  252. # @param datas_list list : list of dict
  253. # @return list : list of the inserted records' ids
  254. def insert_multi(self, target, datas_list):
  255. for datas in datas_list:
  256. self._data_cast(datas)
  257. res = self.__collection(target).insert_many(datas_list)
  258. for new_datas in datas_list:
  259. self.__update_backref(target, None, new_datas)
  260. target.make_consistency(datas=new_datas)
  261. return list(res.inserted_ids)
  262. ##@brief Update backref giving an action
  263. # @param target leObject child class
  264. # @param filters
  265. # @param relational_filters
  266. # @param new_datas None | dict : optional new datas if None mean we are deleting
  267. # @param old_datas_l None | list : if None fetch old datas from db (usefull
  268. #when modifications are made on instance before updating backrefs)
  269. # @return nothing (for the moment
  270. def __update_backref_filtered(self, target,
  271. filters, relational_filters, new_datas = None, old_datas_l = None):
  272. #Getting all the UID of the object that will be deleted in order
  273. #to update back_references
  274. if old_datas_l is None:
  275. mongo_filters = self.__process_filters(
  276. target, filters, relational_filters)
  277. old_datas_l = self.__collection(target).find(
  278. mongo_filters)
  279. old_datas_l = list(old_datas_l)
  280. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  281. for old_datas in old_datas_l:
  282. self.__update_backref(
  283. target, old_datas[uidname], old_datas, new_datas)
  284. ## @brief Update back references of an object
  285. # @ingroup plugin_mongodb_bref_op
  286. #
  287. #old_datas and new_datas arguments are set to None to indicate
  288. #insertion or deletion. Calls examples :
  289. # @par LeObject insert __update backref call
  290. # <pre>
  291. #Insert(datas):
  292. # self.make_insert(datas)
  293. # self.__update_backref(self.__class__, None, datas)
  294. # </pre>
  295. # @par LeObject delete __update backref call
  296. #Delete()
  297. # old_datas = self.datas()
  298. # self.make_delete()
  299. #  self.__update_backref(self.__class__, old_datas, None)
  300. # @par LeObject update __update_backref call
  301. # <pre>
  302. #Update(new_datas):
  303. # old_datas = self.datas()
  304. # self.make_udpdate(new_datas)
  305. #  self.__update_backref(self.__class__, old_datas, new_datas)
  306. # </pre>
  307. #
  308. # @param target LeObject child classa
  309. # @param tuid mixed : The target UID (the value that will be inserted in
  310. #back references)
  311. # @param old_datas dict : datas state before update
  312. # @param new_datas dict : datas state after the update process
  313. # retun None
  314. def __update_backref(self, target, tuid, old_datas, new_datas):
  315. #upd_dict is the dict that will allow to run updates in an optimized
  316. #way (or try to help doing it)
  317. #<pre>
  318. #Its structure looks like :
  319. # { LeoCLASS : {
  320. # UID1: (
  321. # LeoINSTANCE,
  322. # { fname1 : value, fname2: value }),
  323. # UID2 (LeoINSTANCE, {fname...}),
  324. # },
  325. # LeoClass2: {...
  326. #</pre>
  327. upd_dict = {}
  328. for fname, fdh in target.reference_handlers().items():
  329. oldd = old_datas is not None and fname in old_datas and \
  330. (not hasattr(fdh, 'default') or old_datas[fname] != fdh.default) \
  331. and not old_datas[fname] is None
  332. newd = new_datas is not None and fname in new_datas and \
  333. (not hasattr(fdh, 'default') or new_datas[fname] != fdh.default) \
  334. and not new_datas[fname] is None
  335. if (oldd and newd and old_datas[fname] == new_datas[fname])\
  336. or not(oldd or newd):
  337. #No changes or not concerned
  338. continue
  339. bref_cls = fdh.back_reference[0]
  340. bref_fname = fdh.back_reference[1]
  341. if not fdh.is_singlereference():
  342. #fdh is a multiple ref. So the update preparation will be
  343. #divided into two loops :
  344. #- one loop for deleting old datas
  345. #- one loop for inserting updated datas
  346. #
  347. #Preparing the list of values to delete or to add
  348. if newd and oldd:
  349. old_values = old_datas[fname]
  350. new_values = new_datas[fname]
  351. to_del = [ val
  352. for val in old_values
  353. if val not in new_values]
  354. to_add = [ val
  355. for val in new_values
  356. if val not in old_values]
  357. elif oldd and not newd:
  358. to_del = old_datas[fname]
  359. to_add = []
  360. elif not oldd and newd:
  361. to_del = []
  362. to_add = new_datas[fname]
  363. #Calling __back_ref_upd_one_value() with good arguments
  364. for vtype, vlist in [('old',to_del), ('new', to_add)]:
  365. for value in vlist:
  366. #fetching backref infos
  367. bref_infos = self.__bref_get_check(
  368. bref_cls, value, bref_fname)
  369. #preparing the upd_dict
  370. upd_dict = self.__update_backref_upd_dict_prepare(
  371. upd_dict, bref_infos, bref_fname, value)
  372. #preparing updated bref_infos
  373. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  374. bref_infos = (bref_cls, bref_leo, bref_dh,
  375. upd_dict[bref_cls][value][1][bref_fname])
  376. vdict = {vtype: value}
  377. #fetch and store updated value
  378. new_bref_val = self.__back_ref_upd_one_value(
  379. fname, fdh, tuid, bref_infos, **vdict)
  380. upd_dict[bref_cls][value][1][bref_fname] = new_bref_val
  381. else:
  382. #fdh is a single ref so the process is simpler, we do not have
  383. #to loop and we may do an update in only one
  384. #__back_ref_upd_one_value() call by giving both old and new
  385. #value
  386. vdict = {}
  387. if oldd:
  388. vdict['old'] = old_datas[fname]
  389. uid_val = vdict['old']
  390. if newd:
  391. vdict['new'] = new_datas[fname]
  392. if not oldd:
  393. uid_val = vdict['new']
  394. #Fetching back ref infos
  395. bref_infos = self.__bref_get_check(
  396. bref_cls, uid_val, bref_fname)
  397. #prepare the upd_dict
  398. upd_dict = self.__update_backref_upd_dict_prepare(
  399. upd_dict, bref_infos, bref_fname, uid_val)
  400. #forging update bref_infos
  401. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  402. bref_infos = (bref_cls, bref_leo, bref_dh,
  403. upd_dict[bref_cls][uid_val][1][bref_fname])
  404. #fetche and store updated value
  405. new_bref_val = self.__back_ref_upd_one_value(
  406. fname, fdh, tuid, bref_infos, **vdict)
  407. upd_dict[bref_cls][uid_val][1][bref_fname] = new_bref_val
  408. #Now we've got our upd_dict ready.
  409. #running the updates
  410. for bref_cls, uid_dict in upd_dict.items():
  411. for uidval, (leo, datas) in uid_dict.items():
  412. #MULTIPLE UID BROKEN 2 LINES BELOW
  413. self.__update_no_backref(
  414. leo.__class__, [(leo.uid_fieldname()[0], '=', uidval)],
  415. [], datas)
  416. ##@brief Utility function designed to handle the upd_dict of
  417. #__update_backref()
  418. #
  419. #Basically checks if a key exists at some level, if not create it with
  420. #the good default value (in most case dict())
  421. #@param upd_dict dict : in & out args modified by reference
  422. #@param bref_infos tuple : as returned by __bref_get_check()
  423. #@param bref_fname str : name of the field in referenced class
  424. #@param uid_val mixed : the UID of the referenced object
  425. #@return the updated version of upd_dict
  426. @staticmethod
  427. def __update_backref_upd_dict_prepare(upd_dict,bref_infos, bref_fname,
  428. uid_val):
  429. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  430. if bref_cls not in upd_dict:
  431. upd_dict[bref_cls] = {}
  432. if uid_val not in upd_dict[bref_cls]:
  433. upd_dict[bref_cls][uid_val] = (bref_leo, {})
  434. if bref_fname not in upd_dict[bref_cls][uid_val]:
  435. upd_dict[bref_cls][uid_val][1][bref_fname] = bref_value
  436. return upd_dict
  437. ##@brief Prepare a one value back reference update
  438. #@param fname str : the source Reference field name
  439. #@param fdh DataHandler : the source Reference DataHandler
  440. #@param tuid mixed : the uid of the Leo that make reference to the backref
  441. #@param bref_infos tuple : as returned by __bref_get_check() method
  442. #@param values dict : contains the old and new values (optional) with the "old" and "new" keys
  443. #@return the new back reference field value
  444. def __back_ref_upd_one_value(self, fname, fdh, tuid, bref_infos, **values):
  445. bref_cls, bref_leo, bref_dh, bref_val = bref_infos
  446. oldd = 'old' in values
  447. newdd = 'new' in values
  448. if bref_val is None:
  449. bref_val = bref_dh.empty()
  450. if not bref_dh.is_singlereference():
  451. if oldd and newdd:
  452. if tuid not in bref_val:
  453. raise MongoDbConsistencyError("The value we want to \
  454. delete in this back reference update was not found in the back referenced \
  455. object : %s. Value was : '%s'" % (bref_leo, tuid))
  456. return bref_val
  457. elif oldd and not newdd:
  458. #deletion
  459. old_value = values['old']
  460. if tuid not in bref_val:
  461. raise MongoDbConsistencyError("The value we want to \
  462. delete in this back reference update was not found in the back referenced \
  463. object : %s. Value was : '%s'" % (bref_leo, tuid))
  464. if isinstance(bref_val, tuple):
  465. bref_val = set(bref_val)
  466. if isinstance(bref_val, set):
  467. bref_val -= set([tuid])
  468. else:
  469. del(bref_val[bref_val.index(tuid)])
  470. elif not oldd and newdd:
  471. if tuid in bref_val:
  472. raise MongoDbConsistencyError("The value we want to \
  473. add in this back reference update was found in the back referenced \
  474. object : %s. Value was : '%s'" % (bref_leo, tuid))
  475. if isinstance(bref_val, tuple):
  476. bref_val = set(bref_val)
  477. if isinstance(bref_val, set):
  478. bref_val |= set([tuid])
  479. else:
  480. bref_val.append(tuid)
  481. else:
  482. #Single value backref
  483. if oldd and newdd:
  484. if bref_val != tuid:
  485. raise MongoDbConsistencyError("The backreference doesn't \
  486. have expected value. Expected was %s but found %s in %s" % (
  487. tuid, bref_val, bref_leo))
  488. return bref_val
  489. elif oldd and not newdd:
  490. #deletion
  491. if not hasattr(bref_dh, "default"):
  492. raise MongoDbConsistencyError("Unable to delete a \
  493. value for a back reference update. The concerned field don't have a default \
  494. value : in %s field %s" % (bref_leo,fname))
  495. bref_val = getattr(bref_dh, "default")
  496. elif not oldd and newdd:
  497. bref_val = tuid
  498. return bref_val
  499. ##@brief Fetch back reference informations
  500. #@warning thank's to __update_backref_act() this method is useless
  501. #@param bref_cls LeObject child class : __back_reference[0]
  502. #@param uidv mixed : UID value (the content of the reference field)
  503. #@param bref_fname str : the name of the back_reference field
  504. #@return tuple(bref_class, bref_LeObect_instance, bref_datahandler,
  505. #bref_value)
  506. #@throw MongoDbConsistencyError when LeObject instance not found given
  507. #uidv
  508. #@throw LodelFatalError if the back reference field is not a Reference
  509. #subclass (major failure)
  510. def __bref_get_check(self, bref_cls, uidv, bref_fname):
  511. bref_leo = bref_cls.get_from_uid(uidv)
  512. if bref_leo is None:
  513. raise MongoDbConsistencyError("Unable to get the object we make \
  514. reference to : %s with uid = %s" % (bref_cls, repr(uidv)))
  515. bref_dh = bref_leo.data_handler(bref_fname)
  516. if not bref_dh.is_reference():
  517. raise LodelFatalError("Found a back reference field that \
  518. is not a reference : '%s' field '%s'" % (bref_leo, bref_fname))
  519. bref_val = bref_leo.data(bref_fname)
  520. return (bref_leo.__class__, bref_leo, bref_dh, bref_val)
  521. ##@brief Act on abstract LeObject child
  522. #
  523. #This method is designed to be called by insert, select and delete method
  524. #when they encounter an abtract class
  525. #@param target LeObject child class
  526. #@param filters
  527. #@param relational_filters
  528. #@param act function : the caller method
  529. #@param **kwargs other arguments
  530. #@return sum of results (if it's an array it will result in a concat)
  531. #@todo optimization implementing a cache for __bref_get_check()
  532. def __act_on_abstract(self,
  533. target, filters, relational_filters, act, **kwargs):
  534. logger.debug("Abstract %s, running reccursiv select \
  535. on non abstract childs" % act.__name__)
  536. result = list() if act == self.select else 0
  537. if not target.is_abstract():
  538. target_childs = [target]
  539. else:
  540. target_childs = [tc for tc in target.child_classes()
  541. if not tc.is_abstract()]
  542. for target_child in target_childs:
  543. logger.debug(
  544. "Abstract %s on %s" % (act.__name__, target_child.__name__))
  545. #Add target_child to filter
  546. new_filters = copy.copy(filters)
  547. for i in range(len(filters)):
  548. fname, op, val = filters[i]
  549. if fname == CLASS_ID_FIELDNAME:
  550. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  551. fname, op, val))
  552. del(new_filters[i])
  553. new_filters.append(
  554. (CLASS_ID_FIELDNAME, '=',
  555. collection_name(target_child.__name__)))
  556. result += act(
  557. target = target_child,
  558. filters = new_filters,
  559. relational_filters = relational_filters,
  560. **kwargs)
  561. return result
  562. ##@brief Connect to database
  563. #@note this method avoid opening two times the same connection using
  564. #MongoDbDatasource::_connections static attribute
  565. #@param username str
  566. #@param password str
  567. #@param db_name str : database name
  568. #@param ro bool : If True the Datasource is for read only, else the
  569. def __connect(self, username, password, db_name, ro):
  570. conn_string = connection_string(
  571. username = username, password = password,
  572. host = self.__db_infos['host'],
  573. port = self.__db_infos['port'],
  574. db_name = db_name,
  575. ro = ro)
  576. self.__conn_hash = conn_h = hash(conn_string)
  577. if conn_h in self._connections:
  578. self._connections[conn_h]['conn_count'] += 1
  579. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  580. else:
  581. logger.info("Opening a new connection to database")
  582. self._connections[conn_h] = {
  583. 'conn_count': 1,
  584. 'db': utils.connect(conn_string)}
  585. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  586. ##@brief Return a pymongo collection given a LeObject child class
  587. #@param leobject LeObject child class (no instance)
  588. #return a pymongo.collection instance
  589. def __collection(self, leobject):
  590. return self.database[object_collection_name(leobject)]
  591. ##@brief Perform subqueries implies by relational filters and append the
  592. # result to existing filters
  593. #
  594. #The processing is divided in multiple steps :
  595. # - determine (for each relational field of the target) every collection
  596. #that are involved
  597. # - generate subqueries for relational_filters that concerns a different
  598. #collection than target collection
  599. #filters
  600. # - execute subqueries
  601. # - transform subqueries results in filters
  602. # - merge subqueries generated filters with existing filters
  603. #
  604. #@param target LeObject subclass (no instance) : Target class
  605. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  606. #@param relational_filters : same composition thant filters except that
  607. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  608. # CLASS2:RFIELD2})
  609. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  610. def __process_filters(self,target, filters, relational_filters):
  611. # Simple filters lodel2 -> pymongo converting
  612. res = self.__filters2mongo(filters, target)
  613. rfilters = self.__prepare_relational_filters(target, relational_filters)
  614. #Now that everything is well organized, begin to forge subquerie
  615. #filters
  616. self.__subqueries_from_relational_filters(target, rfilters)
  617. # Executing subqueries, creating filters from result, and injecting
  618. # them in original filters of the query
  619. if len(rfilters) > 0:
  620. logger.debug("Begining subquery execution")
  621. for fname in rfilters:
  622. if fname not in res:
  623. res[fname] = dict()
  624. subq_results = set()
  625. for leobject, sq_filters in rfilters[fname].items():
  626. uid_fname = mongo_fieldname(leobject._uid)
  627. log_msg = "Subquery running on collection {coll} with filters \
  628. '{filters}'"
  629. logger.debug(log_msg.format(
  630. coll=object_collection_name(leobject),
  631. filters=sq_filters))
  632. cursor = self.__collection(leobject).find(
  633. filter=sq_filters,
  634. projection=uid_fname)
  635. subq_results |= set(doc[uid_fname] for doc in cursor)
  636. #generating new filter from result
  637. if '$in' in res[fname]:
  638. #WARNING we allready have a IN on this field, doing dedup
  639. #from result
  640. deduped = set(res[fname]['$in']) & subq_results
  641. if len(deduped) == 0:
  642. del(res[fname]['$in'])
  643. else:
  644. res[fname]['$in'] = list(deduped)
  645. else:
  646. res[fname]['$in'] = list(subq_results)
  647. if len(rfilters) > 0:
  648. logger.debug("End of subquery execution")
  649. return res
  650. ##@brief Generate subqueries from rfilters tree
  651. #
  652. #Returned struct organization :
  653. # - 1st level keys : relational field name of target
  654. # - 2nd level keys : referenced leobject
  655. # - 3th level values : pymongo filters (dict)
  656. #
  657. #@note The only caller of this method is __process_filters
  658. #@warning No return value, the rfilters arguement is modified by
  659. #reference
  660. #
  661. #@param target LeObject subclass (no instance) : Target class
  662. #@param rfilters dict : A struct as returned by
  663. #MongoDbDatasource.__prepare_relational_filters()
  664. #@return None, the rfilters argument is modified by reference
  665. @classmethod
  666. def __subqueries_from_relational_filters(cls, target, rfilters):
  667. for fname in rfilters:
  668. for leobject in rfilters[fname]:
  669. for rfield in rfilters[fname][leobject]:
  670. #This way of doing is not optimized but allows to trigger
  671. #warnings in some case (2 different values for a same op
  672. #on a same field on a same collection)
  673. mongofilters = cls.__op_value_listconv(
  674. rfilters[fname][leobject][rfield], target.field(fname))
  675. rfilters[fname][leobject][rfield] = mongofilters
  676. ##@brief Generate a tree from relational_filters
  677. #
  678. #The generated struct is a dict with :
  679. # - 1st level keys : relational field name of target
  680. # - 2nd level keys : referenced leobject
  681. # - 3th level keys : referenced field in referenced class
  682. # - 4th level values : list of tuple(op, value)
  683. #
  684. #@note The only caller of this method is __process_filters
  685. #@warning An assertion is done : if two leobject are stored in the same
  686. #collection they share the same uid
  687. #
  688. #@param target LeObject subclass (no instance) : Target class
  689. #@param relational_filters : same composition thant filters except that
  690. #@return a struct as described above
  691. @classmethod
  692. def __prepare_relational_filters(cls, target, relational_filters):
  693. # We are going to regroup relationnal filters by reference field
  694. # then by collection
  695. rfilters = dict()
  696. if relational_filters is None:
  697. relational_filters = []
  698. for (fname, rfields), op, value in relational_filters:
  699. if fname not in rfilters:
  700. rfilters[fname] = dict()
  701. rfilters[fname] = dict()
  702. # Stores the representative leobject for associated to a collection
  703. # name
  704. leo_collname = dict()
  705. # WARNING ! Here we assert that all leobject that are stored
  706. # in a same collection are identified by the same field
  707. for leobject, rfield in rfields.items():
  708. #here we are filling a dict with leobject as index but
  709. #we are doing a UNIQ on collection name
  710. cur_collname = object_collection_name(leobject)
  711. if cur_collname not in leo_collname:
  712. leo_collname[cur_collname] = leobject
  713. rfilters[fname][leobject] = dict()
  714. #Fecthing the collection's representative leobject
  715. repr_leo = leo_collname[cur_collname]
  716. if rfield not in rfilters[fname][repr_leo]:
  717. rfilters[fname][repr_leo][rfield] = list()
  718. rfilters[fname][repr_leo][rfield].append((op, value))
  719. return rfilters
  720. ##@brief Convert lodel2 filters to pymongo conditions
  721. #@param filters list : list of lodel filters
  722. #@param target Datahandler : The datahandler to use to cast the value in the correct type
  723. #@return dict representing pymongo conditions
  724. @classmethod
  725. def __filters2mongo(cls, filters, target):
  726. res = dict()
  727. eq_fieldname = [] #Stores field with equal comparison OP
  728. for fieldname, op, value in filters:
  729. oop = op
  730. ovalue = value
  731. op, value = cls.__op_value_conv(op, value, target.field(fieldname))
  732. if op == '=':
  733. eq_fieldname.append(fieldname)
  734. if fieldname in res:
  735. logger.warning("Dropping previous condition. Overwritten \
  736. by an equality filter")
  737. res[fieldname] = value
  738. continue
  739. if fieldname in eq_fieldname:
  740. logger.warning("Dropping condition : '%s %s %s'" % (
  741. fieldname, op, value))
  742. continue
  743. if fieldname not in res:
  744. res[fieldname] = dict()
  745. if op in res[fieldname]:
  746. logger.warning("Dropping condition : '%s %s %s'" % (
  747. fieldname, op, value))
  748. else:
  749. if op not in cls.lodel2mongo_op_map:
  750. raise ValueError("Invalid operator : '%s'" % op)
  751. new_op = cls.lodel2mongo_op_map[op]
  752. res[fieldname][new_op] = value
  753. return res
  754. ##@brief Convert lodel2 operator and value to pymongo struct
  755. #
  756. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  757. #@param op str : take value in LeFilteredQuery::_query_operators
  758. #@param value mixed : the value
  759. #@param dhdl DataHandler: datahandler used to cast the values before sending them to the database
  760. #@return a tuple(mongo_op, mongo_value)
  761. @classmethod
  762. def __op_value_conv(cls, op, value, dhdl):
  763. if op not in cls.lodel2mongo_op_map:
  764. msg = "Invalid operator '%s' found" % op
  765. raise MongoDbDataSourceError(msg)
  766. mongop = cls.lodel2mongo_op_map[op]
  767. mongoval = value
  768. #Converting lodel2 wildcarded string into a case insensitive
  769. #mongodb re
  770. if mongop in cls.mongo_op_re:
  771. if value.startswith('(') and value.endswith(')'):
  772. if (dhdl.cast_type is not None):
  773. mongoval = [ dhdl.cast_type(item) for item in mongoval[1:-1].split(',') ]
  774. else:
  775. mongoval = [ item for item in mongoval[1:-1].split(',') ]
  776. elif mongop == 'like':
  777. #unescaping \
  778. mongoval = value.replace('\\\\','\\')
  779. if not mongoval.startswith('*'):
  780. mongoval = '^'+mongoval
  781. #For the end of the string it's harder to detect escaped *
  782. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  783. mongoval += '$'
  784. #Replacing every other unescaped wildcard char
  785. mongoval = cls.wildcard_re.sub('.*', mongoval)
  786. mongoval = {'$regex': mongoval, '$options': 'i'}
  787. return (op, mongoval)
  788. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  789. #@return a dict with mongo op as key and value as value...
  790. @classmethod
  791. def __op_value_listconv(cls, op_value_list, dhdl):
  792. result = dict()
  793. for op, value in op_value_list:
  794. mongop, mongoval = cls.__op_value_conv(op, value, dhdl)
  795. if mongop in result:
  796. warnings.warn("Duplicated value given for a single \
  797. field/operator couple in a query. We will keep only the first one")
  798. else:
  799. result[mongop] = mongoval
  800. return result
  801. ##@brief Generate a comparison function for post reccursion sorting in
  802. #select
  803. #@return a lambda function that take 2 dict as arguement
  804. @classmethod
  805. def __generate_lambda_cmp_order(cls, order):
  806. if len(order) == 0:
  807. return lambda a,b: 0
  808. glco = cls.__generate_lambda_cmp_order
  809. fname, cmpdir = order[0]
  810. order = order[1:]
  811. return lambda a,b: glco(order)(a,b) if a[fname] == b[fname] else (\
  812. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  813. else -1)
  814. ##@brief Correct some datas before giving them to pymongo
  815. #
  816. #For example sets has to be casted to lise
  817. #@param cls
  818. #@param datas
  819. #@return datas
  820. @classmethod
  821. def _data_cast(cls, datas):
  822. for dname in datas:
  823. if isinstance(datas[dname], set):
  824. #pymongo raises :
  825. #bson.errors.InvalidDocument: Cannot encode object: {...}
  826. #with sets
  827. datas[dname] = list(datas[dname])
  828. return datas