No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 36KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel import logger
  11. from lodel.leapi.leobject import CLASS_ID_FIELDNAME
  12. from lodel.leapi.datahandlers.base_classes import Reference, MultipleRef
  13. from lodel.exceptions import LodelException, LodelFatalError
  14. from lodel.plugin.datasource_plugin import AbstractDatasource
  15. from . import utils
  16. from .exceptions import *
  17. from .utils import object_collection_name, collection_name, \
  18. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  19. ##@brief Datasource class
  20. #@ingroup plugin_mongodb_datasource
  21. class MongoDbDatasource(AbstractDatasource):
  22. ##@brief Stores existing connections
  23. #
  24. #The key of this dict is a hash of the connection string + ro parameter.
  25. #The value is a dict with 2 keys :
  26. # - conn_count : the number of instanciated datasource that use this
  27. #connection
  28. # - db : the pymongo database object instance
  29. _connections = dict()
  30. ##@brief Mapping from lodel2 operators to mongodb operator
  31. lodel2mongo_op_map = {
  32. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  33. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  34. ##@brief List of mongodb operators that expect re as value
  35. mongo_op_re = ['$in', '$nin']
  36. wildcard_re = re.compile('[^\\\\]\*')
  37. ##@brief instanciates a database object given a connection name
  38. #@param host str : hostname or IP
  39. #@param port int : mongodb listening port
  40. #@param db_name str
  41. #@param username str
  42. #@param password str
  43. #@param read_only bool : If True the Datasource is for read only, else the
  44. #Datasource is write only !
  45. def __init__(self, host, port, db_name, username, password, read_only = False):
  46. ##@brief Connections infos that can be kept securly
  47. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  48. ##@brief Is the instance read only ? (if not it's write only)
  49. self.__read_only = bool(read_only)
  50. ##@brief Uniq ID for mongodb connection
  51. self.__conn_hash= None
  52. ##@brief Stores the database cursor
  53. self.database = self.__connect(
  54. username, password, db_name, self.__read_only)
  55. ##@brief Destructor that attempt to close connection to DB
  56. #
  57. #Decrease the conn_count of associated MongoDbDatasource::_connections
  58. #item. If it reach 0 close the connection to the db
  59. #@see MongoDbDatasource::__connect()
  60. def __del__(self):
  61. self._connections[self.__conn_hash]['conn_count'] -= 1
  62. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  63. self._connections[self.__conn_hash]['db'].close()
  64. del(self._connections[self.__conn_hash])
  65. logger.info("Closing connection to database")
  66. ##@brief Provide a new uniq numeric ID
  67. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  68. #have to be uniq
  69. #@warning multiple UID broken by this method
  70. #@return an integer
  71. def new_numeric_id(self, emcomp):
  72. target = emcomp.uid_source()
  73. tuid = target._uid[0] # Multiple UID broken here
  74. results = self.select(
  75. target, field_list = [tuid], filters = [],
  76. order=[(tuid, 'DESC')], limit = 1)
  77. if len(results) == 0:
  78. return 1
  79. return results[0][tuid]+1
  80. ##@brief returns a selection of documents from the datasource
  81. #@param target Emclass
  82. #@param field_list list
  83. #@param filters list : List of filters
  84. #@param relational_filters list : List of relational filters
  85. #@param order list : List of column to order. ex: order =
  86. #[('title', 'ASC'),]
  87. #@param group list : List of tupple representing the column used as
  88. #"group by" fields. ex: group = [('title', 'ASC'),]
  89. #@param limit int : Number of records to be returned
  90. #@param offset int: used with limit to choose the start record
  91. #@return list
  92. #@todo Implement group for abstract LeObject childs
  93. def select(self, target, field_list, filters = None,
  94. relational_filters=None, order=None, group=None, limit=None,
  95. offset=0):
  96. logger.debug("Select %s on %s filtered by %s and %s " % (
  97. field_list, target, filters, relational_filters))
  98. if target.is_abstract():
  99. #Reccursiv calls for abstract LeObject child
  100. results = self.__act_on_abstract(target, filters,
  101. relational_filters, self.select, field_list = field_list,
  102. order = order, group = group, limit = limit)
  103. #Here we may implement the group
  104. #If sorted query we have to sort again
  105. if order is not None:
  106. results = sorted(results,
  107. key=functools.cmp_to_key(
  108. self.__generate_lambda_cmp_order(order)))
  109. #If limit given apply limit again
  110. if offset > len(results):
  111. results = list()
  112. else:
  113. if limit is not None:
  114. if limit + offset > len(results):
  115. limit = len(results)-offset-1
  116. results = results[offset:offset+limit]
  117. return results
  118. # Default behavior
  119. if filters is None:
  120. filters = list()
  121. if relational_filters is None:
  122. relational_filters = list()
  123. collection_name = object_collection_name(target)
  124. collection = self.database[collection_name]
  125. query_filters = self.__process_filters(
  126. target, filters, relational_filters)
  127. query_result_ordering = None
  128. if order is not None:
  129. query_result_ordering = utils.parse_query_order(order)
  130. if group is None:
  131. if field_list is None:
  132. field_list = dict()
  133. else:
  134. f_list=dict()
  135. for fl in field_list:
  136. f_list[fl] = 1
  137. field_list = f_list
  138. field_list['_id'] = 0
  139. cursor = collection.find(
  140. spec = query_filters,
  141. fields=field_list,
  142. skip=offset,
  143. limit=limit if limit != None else 0,
  144. sort=query_result_ordering)
  145. else:
  146. pipeline = list()
  147. unwinding_list = list()
  148. grouping_dict = OrderedDict()
  149. sorting_list = list()
  150. for group_param in group:
  151. field_name = group_param[0]
  152. field_sort_option = group_param[1]
  153. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  154. unwinding_list.append({'$unwind': '$%s' % field_name})
  155. grouping_dict[field_name] = '$%s' % field_name
  156. sorting_list.append((field_name, sort_option))
  157. sorting_list.extends(query_result_ordering)
  158. pipeline.append({'$match': query_filters})
  159. if field_list is not None:
  160. pipeline.append({
  161. '$project': SON([{field_name: 1}
  162. for field_name in field_list])})
  163. pipeline.extend(unwinding_list)
  164. pipeline.append({'$group': grouping_dict})
  165. pipeline.extend({'$sort': SON(sorting_list)})
  166. if offset > 0:
  167. pipeline.append({'$skip': offset})
  168. if limit is not None:
  169. pipeline.append({'$limit': limit})
  170. results = list()
  171. for document in cursor:
  172. results.append(document)
  173. return results
  174. ##@brief Deletes records according to given filters
  175. #@param target Emclass : class of the record to delete
  176. #@param filters list : List of filters
  177. #@param relational_filters list : List of relational filters
  178. #@return int : number of deleted records
  179. def delete(self, target, filters, relational_filters):
  180. if target.is_abstract():
  181. logger.debug("Delete called on %s filtered by (%s,%s). Target is \
  182. abstract, preparing reccursiv calls" % (target, filters, relational_filters))
  183. #Deletion with abstract LeObject as target (reccursiv calls)
  184. return self.__act_on_abstract(target, filters,
  185. relational_filters, self.delete)
  186. logger.debug("Delete called on %s filtered by (%s,%s)." % (
  187. target, filters, relational_filters))
  188. #Non abstract beahavior
  189. mongo_filters = self.__process_filters(
  190. target, filters, relational_filters)
  191. #Updating backref before deletion
  192. self.__update_backref_filtered(target, filters, relational_filters,
  193. None)
  194. res = self.__collection(target).remove(mongo_filters)
  195. return res['n']
  196. ##@brief updates records according to given filters
  197. #@param target Emclass : class of the object to insert
  198. #@param filters list : List of filters
  199. #@param relational_filters list : List of relational filters
  200. #@param upd_datas dict : datas to update (new values)
  201. #@return int : Number of updated records
  202. def update(self, target, filters, relational_filters, upd_datas):
  203. res = self.__update_no_backref(target, filters, relational_filters,
  204. upd_datas)
  205. self.__update_backref_filtered(target, filters, relational_filters,
  206. upd_datas)
  207. return res
  208. ##@brief Designed to be called by backref update in order to avoid
  209. #infinite updates between back references
  210. #@see update()
  211. def __update_no_backref(self, target, filters, relational_filters,
  212. upd_datas):
  213. logger.debug("Update called on %s filtered by (%s,%s) with datas \
  214. %s" % (target, filters, relational_filters, upd_datas))
  215. if target.is_abstract():
  216. #Update using abstract LeObject as target (reccursiv calls)
  217. return self.__act_on_abstract(target, filters,
  218. relational_filters, self.update, upd_datas = upd_datas)
  219. #Non abstract beahavior
  220. mongo_filters = self.__process_filters(
  221. target, filters, relational_filters)
  222. mongo_arg = {'$set': upd_datas }
  223. res = self.__collection(target).update(mongo_filters, mongo_arg)
  224. return res['n']
  225. ## @brief Inserts a record in a given collection
  226. # @param target Emclass : class of the object to insert
  227. # @param new_datas dict : datas to insert
  228. # @return the inserted uid
  229. def insert(self, target, new_datas):
  230. logger.debug("Insert called on %s with datas : %s"% (
  231. target, new_datas))
  232. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  233. if uidname not in new_datas:
  234. raise MongoDataSourceError("Missing UID data will inserting a new \
  235. %s" % target.__class__)
  236. res = self.__collection(target).insert(new_datas)
  237. self.__update_backref(target, new_datas[uidname], None, new_datas)
  238. return str(res)
  239. ## @brief Inserts a list of records in a given collection
  240. # @param target Emclass : class of the objects inserted
  241. # @param datas_list list : list of dict
  242. # @return list : list of the inserted records' ids
  243. def insert_multi(self, target, datas_list):
  244. res = self.__collection(target).insert_many(datas_list)
  245. for new_datas in datas_list:
  246. self.__update_backref(target, None, new_datas)
  247. target.make_consistency(datas=new_datas)
  248. return list(res.inserted_ids)
  249. ##@brief Update backref giving an action
  250. #@param target leObject child class
  251. #@param filters
  252. #@param relational_filters,
  253. #@param new_datas None | dict : optional new datas if None mean we are deleting
  254. #@return nothing (for the moment
  255. def __update_backref_filtered(self, target,
  256. filters, relational_filters, new_datas = None):
  257. #Getting all the UID of the object that will be deleted in order
  258. #to update back_references
  259. mongo_filters = self.__process_filters(
  260. target, filters, relational_filters)
  261. old_datas_l = self.__collection(target).find(
  262. mongo_filters)
  263. old_datas_l = list(old_datas_l)
  264. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  265. for old_datas in old_datas_l:
  266. self.__update_backref(
  267. target, old_datas[uidname], old_datas, new_datas)
  268. ##@brief Update back references of an object
  269. #@ingroup plugin_mongodb_bref_op
  270. #
  271. #old_datas and new_datas arguments are set to None to indicate
  272. #insertion or deletion. Calls examples :
  273. #@par LeObject insert __update backref call
  274. #<pre>
  275. #Insert(datas):
  276. # self.make_insert(datas)
  277. # self.__update_backref(self.__class__, None, datas)
  278. #</pre>
  279. #@par LeObject delete __update backref call
  280. #Delete()
  281. # old_datas = self.datas()
  282. # self.make_delete()
  283. #  self.__update_backref(self.__class__, old_datas, None)
  284. #@par LeObject update __update_backref call
  285. #<pre>
  286. #Update(new_datas):
  287. # old_datas = self.datas()
  288. # self.make_udpdate(new_datas)
  289. #  self.__update_backref(self.__class__, old_datas, new_datas)
  290. #</pre>
  291. #
  292. #@param target LeObject child classa
  293. #@param tuid mixed : The target UID (the value that will be inserted in
  294. #back references)
  295. #@param old_datas dict : datas state before update
  296. #@param new_datas dict : datas state after the update process
  297. #retun None
  298. def __update_backref(self, target, tuid, old_datas, new_datas):
  299. #upd_dict is the dict that will allow to run updates in an optimized
  300. #way (or try to help doing it)
  301. #
  302. #It's struct looks like :
  303. # { LeoCLASS : {
  304. # UID1: (
  305. # LeoINSTANCE,
  306. # { fname1 : value, fname2: value }),
  307. # UID2 (LeoINSTANCE, {fname...}),
  308. # },
  309. # LeoClass2: {...
  310. #
  311. upd_dict = {}
  312. for fname, fdh in target.reference_handlers().items():
  313. oldd = old_datas is not None and fname in old_datas and \
  314. hasattr(fdh, 'default') and old_datas[fname] != fdh.default
  315. newd = new_datas is not None and fname in new_datas and \
  316. hasattr(fdh, 'default') and new_datas[fname] != fdh.default
  317. if (oldd and newd and old_datas[fname] == new_datas[fname])\
  318. or not(oldd or newd):
  319. #No changes or not concerned
  320. continue
  321. bref_cls = fdh.back_reference[0]
  322. bref_fname = fdh.back_reference[1]
  323. if issubclass(fdh.__class__, MultipleRef):
  324. #fdh is a multiple ref. So the update preparation will be
  325. #divided into two loops :
  326. #- one loop for deleting old datas
  327. #- one loop for inserting updated datas
  328. #
  329. #Preparing the list of values to delete or to add
  330. if newd and oldd:
  331. old_values = old_datas[fname]
  332. new_values = new_datas[fname]
  333. to_del = [ val
  334. for val in old_values
  335. if val not in new_values]
  336. to_add = [ val
  337. for val in new_values
  338. if val not in old_values]
  339. elif oldd and not newd:
  340. to_del = old_datas[fname]
  341. to_add = []
  342. elif not oldd and newd:
  343. to_del = []
  344. to_add = new_datas[fname]
  345. #Calling __back_ref_upd_one_value() with good arguments
  346. for vtype, vlist in [('old',to_del), ('new', to_add)]:
  347. for value in vlist:
  348. #fetching backref infos
  349. bref_infos = self.__bref_get_check(
  350. bref_cls, value, bref_fname)
  351. #preparing the upd_dict
  352. upd_dict = self.__update_backref_upd_dict_prepare(
  353. upd_dict, bref_infos, bref_fname, value)
  354. #preparing updated bref_infos
  355. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  356. bref_infos = (bref_cls, bref_leo, bref_dh,
  357. upd_dict[bref_cls][value][1][bref_fname])
  358. vdict = {vtype: value}
  359. #fetch and store updated value
  360. new_bref_val = self.__back_ref_upd_one_value(
  361. fname, fdh, tuid, bref_infos, **vdict)
  362. upd_dict[bref_cls][value][1][bref_fname] = new_bref_val
  363. else:
  364. #fdh is a single ref so the process is simpler, we do not have
  365. #to loop and we may do an update in only one
  366. #__back_ref_upd_one_value() call by giving both old and new
  367. #value
  368. vdict = {}
  369. if oldd:
  370. vdict['old'] = new_datas[fname]
  371. uid_val = vdict['old']
  372. if newd:
  373. vdict['new'] = new_datas[fname]
  374. if not oldd:
  375. uid_val = vdict['new']
  376. #Fetching back ref infos
  377. bref_infos = self.__bref_get_check(
  378. bref_cls, uid_val, bref_fname)
  379. #prepare the upd_dict
  380. upd_dict = self.__update_backref_upd_dict_prepare(
  381. upd_dict, bref_infos, bref_fname, uid_val)
  382. #forging update bref_infos
  383. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  384. bref_infos = tuple(bref_cls, bref_leo, bref_dh,
  385. upd_dict[bref_cls][uid_val][1][bref_fname])
  386. #fetche and store updated value
  387. new_bref_val = self.__back_ref_upd_one_value(
  388. fname, fdh, tuid, **vdict)
  389. upd_dict[bref_cls][uid_val][1][bref_fname] = new_bref_val
  390. #Now we've got our upd_dict ready.
  391. #running the updates
  392. for bref_cls, uid_dict in upd_dict.items():
  393. for uidval, (leo, datas) in uid_dict.items():
  394. #MULTIPLE UID BROKEN 2 LINES BELOW
  395. self.__update_no_backref(
  396. leo.__class__, [(leo.uid_fieldname()[0], '=', uidval)],
  397. [], datas)
  398. ##@brief Utility function designed to handle the upd_dict of
  399. #__update_backref()
  400. #
  401. #Basically checks if a key exists at some level, if not create it with
  402. #the good default value (in most case dict())
  403. #@param upd_dict dict : in & out args modified by reference
  404. #@param bref_infos tuple : as returned by __bref_get_check()
  405. #@param bref_fname str : name of the field in referenced class
  406. #@param uid_val mixed : the UID of the referenced object
  407. #@return the updated version of upd_dict
  408. @staticmethod
  409. def __update_backref_upd_dict_prepare(upd_dict,bref_infos, bref_fname,
  410. uid_val):
  411. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  412. if bref_cls not in upd_dict:
  413. upd_dict[bref_cls] = {}
  414. if uid_val not in upd_dict[bref_cls]:
  415. upd_dict[bref_cls][uid_val] = (bref_leo, {})
  416. if bref_fname not in upd_dict[bref_cls][uid_val]:
  417. upd_dict[bref_cls][uid_val][1][bref_fname] = bref_value
  418. return upd_dict
  419. ##@brief Prepare a one value back reference update
  420. #@param fname str : the source Reference field name
  421. #@param fdh DataHandler : the source Reference DataHandler
  422. #@param tuid mixed : the uid of the Leo that make reference to the backref
  423. #@param bref_infos tuple : as returned by __bref_get_check() method
  424. #@param old mixed : (optional **values) the old value
  425. #@param new mixed : (optional **values) the new value
  426. #@return the new back reference field value
  427. def __back_ref_upd_one_value(self, fname, fdh, tuid, bref_infos, **values):
  428. bref_cls, bref_leo, bref_dh, bref_val = bref_infos
  429. oldd = 'old' in values
  430. newdd = 'new' in values
  431. if bref_val is None:
  432. bref_val = bref_dh.empty()
  433. if issubclass(bref_dh.__class__, MultipleRef):
  434. if oldd and newdd:
  435. if tuid not in bref_val:
  436. raise MongoDbConsistencyError("The value we want to \
  437. delete in this back reference update was not found in the back referenced \
  438. object : %s. Value was : '%s'" % (bref_leo, tuid))
  439. return bref_val
  440. elif oldd and not newdd:
  441. #deletion
  442. old_value = values['old']
  443. if tuid not in bref_val:
  444. raise MongoDbConsistencyError("The value we want to \
  445. delete in this back reference update was not found in the back referenced \
  446. object : %s. Value was : '%s'" % (bref_leo, tuid))
  447. if isinstance(bref_val, set):
  448. bref_val -= set([tuid])
  449. else:
  450. del(bref_val[bref_val.index(tuid)])
  451. elif not oldd and newdd:
  452. if tuid in bref_val:
  453. raise MongoDbConsistencyError("The value we want to \
  454. add in this back reference update was found in the back referenced \
  455. object : %s. Value was : '%s'" % (bref_leo, tuid))
  456. if isinstance(bref_val, set):
  457. bref_val |= set([tuid])
  458. else:
  459. bref_val.append(tuid)
  460. else:
  461. #Single value backref
  462. if oldd and newdd:
  463. if bref_val != tuid:
  464. raise MongoDbConsistencyError("The backreference doesn't \
  465. have expected value. Expected was %s but found %s in %s" % (
  466. tuid, bref_val, bref_leo))
  467. return bref_val
  468. elif oldd and not newdd:
  469. #deletion
  470. if not hasattr(bref_dh, "default"):
  471. raise MongoDbConsistencyError("Unable to delete a \
  472. value for a back reference update. The concerned field don't have a default \
  473. value : in %s field %s" % (bref_leo, bref_fname))
  474. bref_val = getattr(bref_dh, "default")
  475. elif not oldd and newdd:
  476. bref_val = tuid
  477. return bref_val
  478. ##@brief Fetch back reference informations
  479. #@warning thank's to __update_backref_act() this method is useless
  480. #@param bref_cls LeObject child class : __back_reference[0]
  481. #@param uidv mixed : UID value (the content of the reference field)
  482. #@param bref_fname str : the name of the back_reference field
  483. #@return tuple(bref_class, bref_LeObect_instance, bref_datahandler,
  484. #bref_value)
  485. #@throw MongoDbConsistencyError when LeObject instance not found given
  486. #uidv
  487. #@throw LodelFatalError if the back reference field is not a Reference
  488. #subclass (major failure)
  489. def __bref_get_check(self, bref_cls, uidv, bref_fname):
  490. bref_leo = bref_cls.get_from_uid(uidv)
  491. if len(bref_leo) == 0:
  492. raise MongoDbConsistencyError("Unable to get the object we make \
  493. reference to : %s with uid = %s" % (bref_cls, repr(uidv)))
  494. if len(bref_leo) > 1:
  495. raise MongoDbConsistencyFatalError("Will retrieving data with UID \
  496. as filter we got more than one result !!! Bailout !!!")
  497. bref_leo = bref_leo[0]
  498. bref_dh = bref_leo.data_handler(bref_fname)
  499. if not isinstance(bref_dh, Reference):
  500. raise LodelFatalError("Found a back reference field that \
  501. is not a reference : '%s' field '%s'" % (bref_leo, bref_fname))
  502. bref_val = bref_leo.data(bref_fname)
  503. return (bref_leo.__class__, bref_leo, bref_dh, bref_val)
  504. ##@brief Act on abstract LeObject child
  505. #
  506. #This method is designed to be called by insert, select and delete method
  507. #when they encounter an abtract class
  508. #@param target LeObject child class
  509. #@param filters
  510. #@param relational_filters
  511. #@param act function : the caller method
  512. #@param **kwargs other arguments
  513. #@return sum of results (if it's an array it will result in a concat)
  514. #@todo optimization implementing a cache for __bref_get_check()
  515. def __act_on_abstract(self,
  516. target, filters, relational_filters, act, **kwargs):
  517. result = list() if act == self.select else 0
  518. if not target.is_abstract():
  519. target_childs = target
  520. else:
  521. target_childs = [tc for tc in target.child_classes()
  522. if not tc.is_abstract()]
  523. for target_child in target_childs:
  524. #Add target_child to filter
  525. new_filters = copy.copy(filters)
  526. for i in range(len(filters)):
  527. fname, op, val = filters[i]
  528. if fname == CLASS_ID_FIELDNAME:
  529. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  530. fname, op, val))
  531. del(new_filters[i])
  532. new_filters.append(
  533. (CLASS_ID_FIELDNAME, '=',
  534. collection_name(target_child.__name__)))
  535. result += act(
  536. target = target_child,
  537. filters = new_filters,
  538. relational_filters = relational_filters,
  539. **kwargs)
  540. return result
  541. ##@brief Connect to database
  542. #@note this method avoid opening two times the same connection using
  543. #MongoDbDatasource::_connections static attribute
  544. #@param username str
  545. #@param password str
  546. #@param ro bool : If True the Datasource is for read only, else the
  547. def __connect(self, username, password, db_name, ro):
  548. conn_string = connection_string(
  549. username = username, password = password,
  550. host = self.__db_infos['host'],
  551. port = self.__db_infos['port'],
  552. db_name = db_name,
  553. ro = ro)
  554. self.__conn_hash = conn_h = hash(conn_string)
  555. if conn_h in self._connections:
  556. self._connections[conn_h]['conn_count'] += 1
  557. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  558. else:
  559. logger.info("Opening a new connection to database")
  560. self._connections[conn_h] = {
  561. 'conn_count': 1,
  562. 'db': utils.connect(conn_string)}
  563. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  564. ##@brief Return a pymongo collection given a LeObject child class
  565. #@param leobject LeObject child class (no instance)
  566. #return a pymongo.collection instance
  567. def __collection(self, leobject):
  568. return self.database[object_collection_name(leobject)]
  569. ##@brief Perform subqueries implies by relational filters and append the
  570. # result to existing filters
  571. #
  572. #The processing is divided in multiple steps :
  573. # - determine (for each relational field of the target) every collection
  574. #that are involved
  575. # - generate subqueries for relational_filters that concerns a different
  576. #collection than target collection
  577. #filters
  578. # - execute subqueries
  579. # - transform subqueries results in filters
  580. # - merge subqueries generated filters with existing filters
  581. #
  582. #@param target LeObject subclass (no instance) : Target class
  583. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  584. #@param relational_filters : same composition thant filters except that
  585. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  586. # CLASS2:RFIELD2})
  587. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  588. def __process_filters(self,target, filters, relational_filters):
  589. # Simple filters lodel2 -> pymongo converting
  590. res = self.__filters2mongo(filters)
  591. rfilters = self.__prepare_relational_filters(target, relational_filters)
  592. #Now that everything is well organized, begin to forge subquerie
  593. #filters
  594. self.__subqueries_from_relational_filters(target, rfilters)
  595. # Executing subqueries, creating filters from result, and injecting
  596. # them in original filters of the query
  597. if len(rfilters) > 0:
  598. logger.debug("Begining subquery execution")
  599. for fname in rfilters:
  600. if fname not in res:
  601. res[fname] = dict()
  602. subq_results = set()
  603. for leobject, sq_filters in rfilters[fname].items():
  604. uid_fname = mongo_fieldname(leobject._uid)
  605. log_msg = "Subquery running on collection {coll} with filters \
  606. '{filters}'"
  607. logger.debug(log_msg.format(
  608. coll=object_collection_name(leobject),
  609. filters=sq_filters))
  610. cursor = self.__collection(leobject).find(
  611. filter=sq_filters,
  612. projection=uid_fname)
  613. subq_results |= set(doc[uid_fname] for doc in cursor)
  614. #generating new filter from result
  615. if '$in' in res[fname]:
  616. #WARNING we allready have a IN on this field, doing dedup
  617. #from result
  618. deduped = set(res[fname]['$in']) & subq_results
  619. if len(deduped) == 0:
  620. del(res[fname]['$in'])
  621. else:
  622. res[fname]['$in'] = list(deduped)
  623. else:
  624. res[fname]['$in'] = list(subq_results)
  625. if len(rfilters) > 0:
  626. logger.debug("End of subquery execution")
  627. return res
  628. ##@brief Generate subqueries from rfilters tree
  629. #
  630. #Returned struct organization :
  631. # - 1st level keys : relational field name of target
  632. # - 2nd level keys : referenced leobject
  633. # - 3th level values : pymongo filters (dict)
  634. #
  635. #@note The only caller of this method is __process_filters
  636. #@warning No return value, the rfilters arguement is modified by
  637. #reference
  638. #
  639. #@param target LeObject subclass (no instance) : Target class
  640. #@param rfilters dict : A struct as returned by
  641. #MongoDbDatasource.__prepare_relational_filters()
  642. #@return None, the rfilters argument is modified by reference
  643. @classmethod
  644. def __subqueries_from_relational_filters(cls, target, rfilters):
  645. for fname in rfilters:
  646. for leobject in rfilters[fname]:
  647. for rfield in rfilters[fname][leobject]:
  648. #This way of doing is not optimized but allows to trigger
  649. #warnings in some case (2 different values for a same op
  650. #on a same field on a same collection)
  651. mongofilters = cls.__op_value_listconv(
  652. rfilters[fname][leobject][rfield])
  653. rfilters[fname][leobject][rfield] = mongofilters
  654. ##@brief Generate a tree from relational_filters
  655. #
  656. #The generated struct is a dict with :
  657. # - 1st level keys : relational field name of target
  658. # - 2nd level keys : referenced leobject
  659. # - 3th level keys : referenced field in referenced class
  660. # - 4th level values : list of tuple(op, value)
  661. #
  662. #@note The only caller of this method is __process_filters
  663. #@warning An assertion is done : if two leobject are stored in the same
  664. #collection they share the same uid
  665. #
  666. #@param target LeObject subclass (no instance) : Target class
  667. #@param relational_filters : same composition thant filters except that
  668. #@return a struct as described above
  669. @classmethod
  670. def __prepare_relational_filters(cls, target, relational_filters):
  671. # We are going to regroup relationnal filters by reference field
  672. # then by collection
  673. rfilters = dict()
  674. if relational_filters is None:
  675. relational_filters = []
  676. for (fname, rfields), op, value in relational_filters:
  677. if fname not in rfilters:
  678. rfilters[fname] = dict()
  679. rfilters[fname] = dict()
  680. # Stores the representative leobject for associated to a collection
  681. # name
  682. leo_collname = dict()
  683. # WARNING ! Here we assert that all leobject that are stored
  684. # in a same collection are identified by the same field
  685. for leobject, rfield in rfields.items():
  686. #here we are filling a dict with leobject as index but
  687. #we are doing a UNIQ on collection name
  688. cur_collname = object_collection_name(leobject)
  689. if cur_collname not in leo_collname:
  690. leo_collname[cur_collname] = leobject
  691. rfilters[fname][leobject] = dict()
  692. #Fecthing the collection's representative leobject
  693. repr_leo = leo_collname[cur_collname]
  694. if rfield not in rfilters[fname][repr_leo]:
  695. rfilters[fname][repr_leo][rfield] = list()
  696. rfilters[fname][repr_leo][rfield].append((op, value))
  697. return rfilters
  698. ##@brief Convert lodel2 filters to pymongo conditions
  699. #@param filters list : list of lodel filters
  700. #@return dict representing pymongo conditions
  701. @classmethod
  702. def __filters2mongo(cls, filters):
  703. res = dict()
  704. eq_fieldname = [] #Stores field with equal comparison OP
  705. for fieldname, op, value in filters:
  706. oop = op
  707. ovalue = value
  708. op, value = cls.__op_value_conv(op, value)
  709. if op == '=':
  710. eq_fieldname.append(fieldname)
  711. if fieldname in res:
  712. logger.warning("Dropping previous condition. Overwritten \
  713. by an equality filter")
  714. res[fieldname] = value
  715. continue
  716. if fieldname in eq_fieldname:
  717. logger.warning("Dropping condition : '%s %s %s'" % (
  718. fieldname, op, value))
  719. continue
  720. if fieldname not in res:
  721. res[fieldname] = dict()
  722. if op in res[fieldname]:
  723. logger.warning("Dropping condition : '%s %s %s'" % (
  724. fieldname, op, value))
  725. else:
  726. if op not in cls.lodel2mongo_op_map:
  727. raise ValueError("Invalid operator : '%s'" % op)
  728. new_op = cls.lodel2mongo_op_map[op]
  729. res[fieldname][new_op] = value
  730. return res
  731. ##@brief Convert lodel2 operator and value to pymongo struct
  732. #
  733. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  734. #@param op str : take value in LeFilteredQuery::_query_operators
  735. #@param value mixed : the value
  736. #@return a tuple(mongo_op, mongo_value)
  737. @classmethod
  738. def __op_value_conv(cls, op, value):
  739. if op not in cls.lodel2mongo_op_map:
  740. msg = "Invalid operator '%s' found" % op
  741. raise MongoDbDataSourceError(msg)
  742. mongop = cls.lodel2mongo_op_map[op]
  743. mongoval = value
  744. #Converting lodel2 wildcarded string into a case insensitive
  745. #mongodb re
  746. if mongop in cls.mongo_op_re:
  747. #unescaping \
  748. mongoval = value.replace('\\\\','\\')
  749. if not mongoval.startswith('*'):
  750. mongoval = '^'+mongoval
  751. #For the end of the string it's harder to detect escaped *
  752. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  753. mongoval += '$'
  754. #Replacing every other unescaped wildcard char
  755. mongoval = cls.wildcard_re.sub('.*', mongoval)
  756. mongoval = {'$regex': mongoval, '$options': 'i'}
  757. return (op, mongoval)
  758. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  759. #@return a dict with mongo op as key and value as value...
  760. @classmethod
  761. def __op_value_listconv(cls, op_value_list):
  762. result = dict()
  763. for op, value in op_value_list:
  764. mongop, mongoval = cls.__op_value_conv(op, value)
  765. if mongop in result:
  766. warnings.warn("Duplicated value given for a single \
  767. field/operator couple in a query. We will keep only the first one")
  768. else:
  769. result[mongop] = mongoval
  770. return result
  771. ##@brief Generate a comparison function for post reccursion sorting in
  772. #select
  773. #@return a lambda function that take 2 dict as arguement
  774. @classmethod
  775. def __generate_lambda_cmp_order(cls, order):
  776. if len(order) == 0:
  777. return lambda a,b: 0
  778. glco = cls.__generate_lambda_cmp_order
  779. fname, cmpdir = order[0]
  780. order = order[1:]
  781. return lambda a,b: glco(order) if a[fname] == b[fname] else (\
  782. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  783. else -1)