No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 38KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel.context import LodelContext
  11. LodelContext.expose_modules(globals(), {
  12. 'lodel.logger': 'logger',
  13. 'lodel.leapi.leobject': ['CLASS_ID_FIELDNAME'],
  14. 'lodel.leapi.datahandlers.base_classes': ['Reference', 'MultipleRef'],
  15. 'lodel.exceptions': ['LodelException', 'LodelFatalError'],
  16. 'lodel.plugin.datasource_plugin': ['AbstractDatasource']})
  17. from . import utils
  18. from .exceptions import *
  19. from .utils import object_collection_name, collection_name, \
  20. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  21. ##@brief Datasource class
  22. #@ingroup plugin_mongodb_datasource
  23. class MongoDbDatasource(AbstractDatasource):
  24. ##@brief Stores existing connections
  25. #
  26. #The key of this dict is a hash of the connection string + ro parameter.
  27. #The value is a dict with 2 keys :
  28. # - conn_count : the number of instanciated datasource that use this
  29. #connection
  30. # - db : the pymongo database object instance
  31. _connections = dict()
  32. ##@brief Mapping from lodel2 operators to mongodb operator
  33. lodel2mongo_op_map = {
  34. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  35. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  36. ##@brief List of mongodb operators that expect re as value
  37. mongo_op_re = ['$in', '$nin']
  38. wildcard_re = re.compile('[^\\\\]\*')
  39. ##@brief instanciates a database object given a connection name
  40. #@param host str : hostname or IP
  41. #@param port int : mongodb listening port
  42. #@param db_name str
  43. #@param username str
  44. #@param password str
  45. #@param read_only bool : If True the Datasource is for read only, else the
  46. #Datasource is write only !
  47. def __init__(self, host, port, db_name, username, password, read_only = False):
  48. ##@brief Connections infos that can be kept securly
  49. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  50. ##@brief Is the instance read only ? (if not it's write only)
  51. self.__read_only = bool(read_only)
  52. ##@brief Uniq ID for mongodb connection
  53. self.__conn_hash= None
  54. ##@brief Stores the database cursor
  55. self.database = self.__connect(
  56. username, password, db_name, self.__read_only)
  57. ##@brief Destructor that attempt to close connection to DB
  58. #
  59. #Decrease the conn_count of associated MongoDbDatasource::_connections
  60. #item. If it reach 0 close the connection to the db
  61. #@see MongoDbDatasource::__connect()
  62. def __del__(self):
  63. self._connections[self.__conn_hash]['conn_count'] -= 1
  64. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  65. self._connections[self.__conn_hash]['db'].close()
  66. del(self._connections[self.__conn_hash])
  67. logger.info("Closing connection to database")
  68. ##@brief Provide a new uniq numeric ID
  69. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  70. #have to be uniq
  71. #@warning multiple UID broken by this method
  72. #@return an integer
  73. def new_numeric_id(self, emcomp):
  74. target = emcomp.uid_source()
  75. tuid = target._uid[0] # Multiple UID broken here
  76. results = self.select(
  77. target, field_list = [tuid], filters = [],
  78. order=[(tuid, 'DESC')], limit = 1)
  79. if len(results) == 0:
  80. return 1
  81. return results[0][tuid]+1
  82. ##@brief returns a selection of documents from the datasource
  83. #@param target Emclass
  84. #@param field_list list
  85. #@param filters list : List of filters
  86. #@param relational_filters list : List of relational filters
  87. #@param order list : List of column to order. ex: order =
  88. #[('title', 'ASC'),]
  89. #@param group list : List of tupple representing the column used as
  90. #"group by" fields. ex: group = [('title', 'ASC'),]
  91. #@param limit int : Number of records to be returned
  92. #@param offset int: used with limit to choose the start record
  93. #@return list
  94. #@todo Implement group for abstract LeObject childs
  95. def select(self, target, field_list, filters = None,
  96. relational_filters=None, order=None, group=None, limit=None,
  97. offset=0):
  98. if target.is_abstract():
  99. #Reccursiv calls for abstract LeObject child
  100. results = self.__act_on_abstract(target, filters,
  101. relational_filters, self.select, field_list = field_list,
  102. order = order, group = group, limit = limit)
  103. #Here we may implement the group
  104. #If sorted query we have to sort again
  105. if order is not None:
  106. results = sorted(results,
  107. key=functools.cmp_to_key(
  108. self.__generate_lambda_cmp_order(order)))
  109. #If limit given apply limit again
  110. if offset > len(results):
  111. results = list()
  112. else:
  113. if limit is not None:
  114. if limit + offset > len(results):
  115. limit = len(results)-offset-1
  116. results = results[offset:offset+limit]
  117. return results
  118. # Default behavior
  119. if filters is None:
  120. filters = list()
  121. if relational_filters is None:
  122. relational_filters = list()
  123. collection_name = object_collection_name(target)
  124. collection = self.database[collection_name]
  125. query_filters = self.__process_filters(
  126. target, filters, relational_filters)
  127. query_result_ordering = None
  128. if order is not None:
  129. query_result_ordering = utils.parse_query_order(order)
  130. if group is None:
  131. if field_list is None:
  132. field_list = dict()
  133. else:
  134. f_list=dict()
  135. for fl in field_list:
  136. f_list[fl] = 1
  137. field_list = f_list
  138. field_list['_id'] = 0
  139. cursor = collection.find(
  140. spec = query_filters,
  141. fields=field_list,
  142. skip=offset,
  143. limit=limit if limit != None else 0,
  144. sort=query_result_ordering)
  145. else:
  146. pipeline = list()
  147. unwinding_list = list()
  148. grouping_dict = OrderedDict()
  149. sorting_list = list()
  150. for group_param in group:
  151. field_name = group_param[0]
  152. field_sort_option = group_param[1]
  153. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  154. unwinding_list.append({'$unwind': '$%s' % field_name})
  155. grouping_dict[field_name] = '$%s' % field_name
  156. sorting_list.append((field_name, sort_option))
  157. sorting_list.extends(query_result_ordering)
  158. pipeline.append({'$match': query_filters})
  159. if field_list is not None:
  160. pipeline.append({
  161. '$project': SON([{field_name: 1}
  162. for field_name in field_list])})
  163. pipeline.extend(unwinding_list)
  164. pipeline.append({'$group': grouping_dict})
  165. pipeline.extend({'$sort': SON(sorting_list)})
  166. if offset > 0:
  167. pipeline.append({'$skip': offset})
  168. if limit is not None:
  169. pipeline.append({'$limit': limit})
  170. results = list()
  171. for document in cursor:
  172. results.append(document)
  173. return results
  174. ##@brief Deletes records according to given filters
  175. #@param target Emclass : class of the record to delete
  176. #@param filters list : List of filters
  177. #@param relational_filters list : List of relational filters
  178. #@return int : number of deleted records
  179. def delete(self, target, filters, relational_filters):
  180. if target.is_abstract():
  181. logger.debug("Delete called on %s filtered by (%s,%s). Target is \
  182. abstract, preparing reccursiv calls" % (target, filters, relational_filters))
  183. #Deletion with abstract LeObject as target (reccursiv calls)
  184. return self.__act_on_abstract(target, filters,
  185. relational_filters, self.delete)
  186. logger.debug("Delete called on %s filtered by (%s,%s)." % (
  187. target, filters, relational_filters))
  188. #Non abstract beahavior
  189. mongo_filters = self.__process_filters(
  190. target, filters, relational_filters)
  191. #Updating backref before deletion
  192. self.__update_backref_filtered(target, filters, relational_filters,
  193. None)
  194. res = self.__collection(target).remove(mongo_filters)
  195. return res['n']
  196. ##@brief updates records according to given filters
  197. #@param target Emclass : class of the object to insert
  198. #@param filters list : List of filters
  199. #@param relational_filters list : List of relational filters
  200. #@param upd_datas dict : datas to update (new values)
  201. #@return int : Number of updated records
  202. def update(self, target, filters, relational_filters, upd_datas):
  203. self._data_cast(upd_datas)
  204. #fetching current datas state
  205. mongo_filters = self.__process_filters(
  206. target, filters, relational_filters)
  207. old_datas_l = self.__collection(target).find(
  208. mongo_filters)
  209. old_datas_l = list(old_datas_l)
  210. #Running update
  211. res = self.__update_no_backref(target, filters, relational_filters,
  212. upd_datas)
  213. #updating backref
  214. self.__update_backref_filtered(target, filters, relational_filters,
  215. upd_datas, old_datas_l)
  216. return res
  217. ##@brief Designed to be called by backref update in order to avoid
  218. #infinite updates between back references
  219. #@see update()
  220. def __update_no_backref(self, target, filters, relational_filters,
  221. upd_datas):
  222. logger.debug("Update called on %s filtered by (%s,%s) with datas \
  223. %s" % (target, filters, relational_filters, upd_datas))
  224. if target.is_abstract():
  225. #Update using abstract LeObject as target (reccursiv calls)
  226. return self.__act_on_abstract(target, filters,
  227. relational_filters, self.update, upd_datas = upd_datas)
  228. #Non abstract beahavior
  229. mongo_filters = self.__process_filters(
  230. target, filters, relational_filters)
  231. self._data_cast(upd_datas)
  232. mongo_arg = {'$set': upd_datas }
  233. res = self.__collection(target).update(mongo_filters, mongo_arg)
  234. return res['n']
  235. ## @brief Inserts a record in a given collection
  236. # @param target Emclass : class of the object to insert
  237. # @param new_datas dict : datas to insert
  238. # @return the inserted uid
  239. def insert(self, target, new_datas):
  240. self._data_cast(new_datas)
  241. logger.debug("Insert called on %s with datas : %s"% (
  242. target, new_datas))
  243. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  244. if uidname not in new_datas:
  245. raise MongoDataSourceError("Missing UID data will inserting a new \
  246. %s" % target.__class__)
  247. res = self.__collection(target).insert(new_datas)
  248. self.__update_backref(target, new_datas[uidname], None, new_datas)
  249. return str(res)
  250. ## @brief Inserts a list of records in a given collection
  251. # @param target Emclass : class of the objects inserted
  252. # @param datas_list list : list of dict
  253. # @return list : list of the inserted records' ids
  254. def insert_multi(self, target, datas_list):
  255. for datas in datas_list:
  256. self._data_cast(datas)
  257. res = self.__collection(target).insert_many(datas_list)
  258. for new_datas in datas_list:
  259. self.__update_backref(target, None, new_datas)
  260. target.make_consistency(datas=new_datas)
  261. return list(res.inserted_ids)
  262. ##@brief Update backref giving an action
  263. #@param target leObject child class
  264. #@param filters
  265. #@param relational_filters,
  266. #@param new_datas None | dict : optional new datas if None mean we are deleting
  267. #@param old_datas_l None | list : if None fetch old datas from db (usefull
  268. #when modifications are made on instance before updating backrefs)
  269. #@return nothing (for the moment
  270. def __update_backref_filtered(self, target,
  271. filters, relational_filters, new_datas = None, old_datas_l = None):
  272. #Getting all the UID of the object that will be deleted in order
  273. #to update back_references
  274. if old_datas_l is None:
  275. mongo_filters = self.__process_filters(
  276. target, filters, relational_filters)
  277. old_datas_l = self.__collection(target).find(
  278. mongo_filters)
  279. old_datas_l = list(old_datas_l)
  280. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  281. for old_datas in old_datas_l:
  282. self.__update_backref(
  283. target, old_datas[uidname], old_datas, new_datas)
  284. ##@brief Update back references of an object
  285. #@ingroup plugin_mongodb_bref_op
  286. #
  287. #old_datas and new_datas arguments are set to None to indicate
  288. #insertion or deletion. Calls examples :
  289. #@par LeObject insert __update backref call
  290. #<pre>
  291. #Insert(datas):
  292. # self.make_insert(datas)
  293. # self.__update_backref(self.__class__, None, datas)
  294. #</pre>
  295. #@par LeObject delete __update backref call
  296. #Delete()
  297. # old_datas = self.datas()
  298. # self.make_delete()
  299. #  self.__update_backref(self.__class__, old_datas, None)
  300. #@par LeObject update __update_backref call
  301. #<pre>
  302. #Update(new_datas):
  303. # old_datas = self.datas()
  304. # self.make_udpdate(new_datas)
  305. #  self.__update_backref(self.__class__, old_datas, new_datas)
  306. #</pre>
  307. #
  308. #@param target LeObject child classa
  309. #@param tuid mixed : The target UID (the value that will be inserted in
  310. #back references)
  311. #@param old_datas dict : datas state before update
  312. #@param new_datas dict : datas state after the update process
  313. #retun None
  314. def __update_backref(self, target, tuid, old_datas, new_datas):
  315. #upd_dict is the dict that will allow to run updates in an optimized
  316. #way (or try to help doing it)
  317. #
  318. #It's struct looks like :
  319. # { LeoCLASS : {
  320. # UID1: (
  321. # LeoINSTANCE,
  322. # { fname1 : value, fname2: value }),
  323. # UID2 (LeoINSTANCE, {fname...}),
  324. # },
  325. # LeoClass2: {...
  326. #
  327. upd_dict = {}
  328. for fname, fdh in target.reference_handlers().items():
  329. oldd = old_datas is not None and fname in old_datas and \
  330. (not hasattr(fdh, 'default') or old_datas[fname] != fdh.default) \
  331. and not old_datas[fname] is None
  332. newd = new_datas is not None and fname in new_datas and \
  333. (not hasattr(fdh, 'default') or new_datas[fname] != fdh.default) \
  334. and not new_datas[fname] is None
  335. if (oldd and newd and old_datas[fname] == new_datas[fname])\
  336. or not(oldd or newd):
  337. #No changes or not concerned
  338. continue
  339. bref_cls = fdh.back_reference[0]
  340. bref_fname = fdh.back_reference[1]
  341. if issubclass(fdh.__class__, MultipleRef):
  342. #fdh is a multiple ref. So the update preparation will be
  343. #divided into two loops :
  344. #- one loop for deleting old datas
  345. #- one loop for inserting updated datas
  346. #
  347. #Preparing the list of values to delete or to add
  348. if newd and oldd:
  349. old_values = old_datas[fname]
  350. new_values = new_datas[fname]
  351. to_del = [ val
  352. for val in old_values
  353. if val not in new_values]
  354. to_add = [ val
  355. for val in new_values
  356. if val not in old_values]
  357. elif oldd and not newd:
  358. to_del = old_datas[fname]
  359. to_add = []
  360. elif not oldd and newd:
  361. to_del = []
  362. to_add = new_datas[fname]
  363. #Calling __back_ref_upd_one_value() with good arguments
  364. for vtype, vlist in [('old',to_del), ('new', to_add)]:
  365. for value in vlist:
  366. #fetching backref infos
  367. bref_infos = self.__bref_get_check(
  368. bref_cls, value, bref_fname)
  369. #preparing the upd_dict
  370. upd_dict = self.__update_backref_upd_dict_prepare(
  371. upd_dict, bref_infos, bref_fname, value)
  372. #preparing updated bref_infos
  373. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  374. bref_infos = (bref_cls, bref_leo, bref_dh,
  375. upd_dict[bref_cls][value][1][bref_fname])
  376. vdict = {vtype: value}
  377. #fetch and store updated value
  378. new_bref_val = self.__back_ref_upd_one_value(
  379. fname, fdh, tuid, bref_infos, **vdict)
  380. upd_dict[bref_cls][value][1][bref_fname] = new_bref_val
  381. else:
  382. #fdh is a single ref so the process is simpler, we do not have
  383. #to loop and we may do an update in only one
  384. #__back_ref_upd_one_value() call by giving both old and new
  385. #value
  386. vdict = {}
  387. if oldd:
  388. vdict['old'] = old_datas[fname]
  389. uid_val = vdict['old']
  390. if newd:
  391. vdict['new'] = new_datas[fname]
  392. if not oldd:
  393. uid_val = vdict['new']
  394. #Fetching back ref infos
  395. bref_infos = self.__bref_get_check(
  396. bref_cls, uid_val, bref_fname)
  397. #prepare the upd_dict
  398. upd_dict = self.__update_backref_upd_dict_prepare(
  399. upd_dict, bref_infos, bref_fname, uid_val)
  400. #forging update bref_infos
  401. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  402. bref_infos = (bref_cls, bref_leo, bref_dh,
  403. upd_dict[bref_cls][uid_val][1][bref_fname])
  404. #fetche and store updated value
  405. new_bref_val = self.__back_ref_upd_one_value(
  406. fname, fdh, tuid, bref_infos, **vdict)
  407. upd_dict[bref_cls][uid_val][1][bref_fname] = new_bref_val
  408. #Now we've got our upd_dict ready.
  409. #running the updates
  410. for bref_cls, uid_dict in upd_dict.items():
  411. for uidval, (leo, datas) in uid_dict.items():
  412. #MULTIPLE UID BROKEN 2 LINES BELOW
  413. self.__update_no_backref(
  414. leo.__class__, [(leo.uid_fieldname()[0], '=', uidval)],
  415. [], datas)
  416. ##@brief Utility function designed to handle the upd_dict of
  417. #__update_backref()
  418. #
  419. #Basically checks if a key exists at some level, if not create it with
  420. #the good default value (in most case dict())
  421. #@param upd_dict dict : in & out args modified by reference
  422. #@param bref_infos tuple : as returned by __bref_get_check()
  423. #@param bref_fname str : name of the field in referenced class
  424. #@param uid_val mixed : the UID of the referenced object
  425. #@return the updated version of upd_dict
  426. @staticmethod
  427. def __update_backref_upd_dict_prepare(upd_dict,bref_infos, bref_fname,
  428. uid_val):
  429. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  430. if bref_cls not in upd_dict:
  431. upd_dict[bref_cls] = {}
  432. if uid_val not in upd_dict[bref_cls]:
  433. upd_dict[bref_cls][uid_val] = (bref_leo, {})
  434. if bref_fname not in upd_dict[bref_cls][uid_val]:
  435. upd_dict[bref_cls][uid_val][1][bref_fname] = bref_value
  436. return upd_dict
  437. ##@brief Prepare a one value back reference update
  438. #@param fname str : the source Reference field name
  439. #@param fdh DataHandler : the source Reference DataHandler
  440. #@param tuid mixed : the uid of the Leo that make reference to the backref
  441. #@param bref_infos tuple : as returned by __bref_get_check() method
  442. #@param old mixed : (optional **values) the old value
  443. #@param new mixed : (optional **values) the new value
  444. #@return the new back reference field value
  445. def __back_ref_upd_one_value(self, fname, fdh, tuid, bref_infos, **values):
  446. bref_cls, bref_leo, bref_dh, bref_val = bref_infos
  447. oldd = 'old' in values
  448. newdd = 'new' in values
  449. if bref_val is None:
  450. bref_val = bref_dh.empty()
  451. if issubclass(bref_dh.__class__, MultipleRef):
  452. if oldd and newdd:
  453. if tuid not in bref_val:
  454. raise MongoDbConsistencyError("The value we want to \
  455. delete in this back reference update was not found in the back referenced \
  456. object : %s. Value was : '%s'" % (bref_leo, tuid))
  457. return bref_val
  458. elif oldd and not newdd:
  459. #deletion
  460. old_value = values['old']
  461. if tuid not in bref_val:
  462. raise MongoDbConsistencyError("The value we want to \
  463. delete in this back reference update was not found in the back referenced \
  464. object : %s. Value was : '%s'" % (bref_leo, tuid))
  465. if isinstance(bref_val, tuple):
  466. bref_val = set(bref_val)
  467. if isinstance(bref_val, set):
  468. bref_val -= set([tuid])
  469. else:
  470. del(bref_val[bref_val.index(tuid)])
  471. elif not oldd and newdd:
  472. if tuid in bref_val:
  473. raise MongoDbConsistencyError("The value we want to \
  474. add in this back reference update was found in the back referenced \
  475. object : %s. Value was : '%s'" % (bref_leo, tuid))
  476. if isinstance(bref_val, tuple):
  477. bref_val = set(bref_val)
  478. if isinstance(bref_val, set):
  479. bref_val |= set([tuid])
  480. else:
  481. bref_val.append(tuid)
  482. else:
  483. #Single value backref
  484. if oldd and newdd:
  485. if bref_val != tuid:
  486. raise MongoDbConsistencyError("The backreference doesn't \
  487. have expected value. Expected was %s but found %s in %s" % (
  488. tuid, bref_val, bref_leo))
  489. return bref_val
  490. elif oldd and not newdd:
  491. #deletion
  492. if not hasattr(bref_dh, "default"):
  493. raise MongoDbConsistencyError("Unable to delete a \
  494. value for a back reference update. The concerned field don't have a default \
  495. value : in %s field %s" % (bref_leo,fname))
  496. bref_val = getattr(bref_dh, "default")
  497. elif not oldd and newdd:
  498. bref_val = tuid
  499. return bref_val
  500. ##@brief Fetch back reference informations
  501. #@warning thank's to __update_backref_act() this method is useless
  502. #@param bref_cls LeObject child class : __back_reference[0]
  503. #@param uidv mixed : UID value (the content of the reference field)
  504. #@param bref_fname str : the name of the back_reference field
  505. #@return tuple(bref_class, bref_LeObect_instance, bref_datahandler,
  506. #bref_value)
  507. #@throw MongoDbConsistencyError when LeObject instance not found given
  508. #uidv
  509. #@throw LodelFatalError if the back reference field is not a Reference
  510. #subclass (major failure)
  511. def __bref_get_check(self, bref_cls, uidv, bref_fname):
  512. bref_leo = bref_cls.get_from_uid(uidv)
  513. if bref_leo is None:
  514. raise MongoDbConsistencyError("Unable to get the object we make \
  515. reference to : %s with uid = %s" % (bref_cls, repr(uidv)))
  516. bref_dh = bref_leo.data_handler(bref_fname)
  517. if not isinstance(bref_dh, Reference):
  518. raise LodelFatalError("Found a back reference field that \
  519. is not a reference : '%s' field '%s'" % (bref_leo, bref_fname))
  520. bref_val = bref_leo.data(bref_fname)
  521. return (bref_leo.__class__, bref_leo, bref_dh, bref_val)
  522. ##@brief Act on abstract LeObject child
  523. #
  524. #This method is designed to be called by insert, select and delete method
  525. #when they encounter an abtract class
  526. #@param target LeObject child class
  527. #@param filters
  528. #@param relational_filters
  529. #@param act function : the caller method
  530. #@param **kwargs other arguments
  531. #@return sum of results (if it's an array it will result in a concat)
  532. #@todo optimization implementing a cache for __bref_get_check()
  533. def __act_on_abstract(self,
  534. target, filters, relational_filters, act, **kwargs):
  535. result = list() if act == self.select else 0
  536. if not target.is_abstract():
  537. target_childs = target
  538. else:
  539. target_childs = [tc for tc in target.child_classes()
  540. if not tc.is_abstract()]
  541. for target_child in target_childs:
  542. #Add target_child to filter
  543. new_filters = copy.copy(filters)
  544. for i in range(len(filters)):
  545. fname, op, val = filters[i]
  546. if fname == CLASS_ID_FIELDNAME:
  547. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  548. fname, op, val))
  549. del(new_filters[i])
  550. new_filters.append(
  551. (CLASS_ID_FIELDNAME, '=',
  552. collection_name(target_child.__name__)))
  553. result += act(
  554. target = target_child,
  555. filters = new_filters,
  556. relational_filters = relational_filters,
  557. **kwargs)
  558. return result
  559. ##@brief Connect to database
  560. #@note this method avoid opening two times the same connection using
  561. #MongoDbDatasource::_connections static attribute
  562. #@param username str
  563. #@param password str
  564. #@param ro bool : If True the Datasource is for read only, else the
  565. def __connect(self, username, password, db_name, ro):
  566. conn_string = connection_string(
  567. username = username, password = password,
  568. host = self.__db_infos['host'],
  569. port = self.__db_infos['port'],
  570. db_name = db_name,
  571. ro = ro)
  572. self.__conn_hash = conn_h = hash(conn_string)
  573. if conn_h in self._connections:
  574. self._connections[conn_h]['conn_count'] += 1
  575. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  576. else:
  577. logger.info("Opening a new connection to database")
  578. self._connections[conn_h] = {
  579. 'conn_count': 1,
  580. 'db': utils.connect(conn_string)}
  581. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  582. ##@brief Return a pymongo collection given a LeObject child class
  583. #@param leobject LeObject child class (no instance)
  584. #return a pymongo.collection instance
  585. def __collection(self, leobject):
  586. return self.database[object_collection_name(leobject)]
  587. ##@brief Perform subqueries implies by relational filters and append the
  588. # result to existing filters
  589. #
  590. #The processing is divided in multiple steps :
  591. # - determine (for each relational field of the target) every collection
  592. #that are involved
  593. # - generate subqueries for relational_filters that concerns a different
  594. #collection than target collection
  595. #filters
  596. # - execute subqueries
  597. # - transform subqueries results in filters
  598. # - merge subqueries generated filters with existing filters
  599. #
  600. #@param target LeObject subclass (no instance) : Target class
  601. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  602. #@param relational_filters : same composition thant filters except that
  603. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  604. # CLASS2:RFIELD2})
  605. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  606. def __process_filters(self,target, filters, relational_filters):
  607. # Simple filters lodel2 -> pymongo converting
  608. res = self.__filters2mongo(filters, target)
  609. rfilters = self.__prepare_relational_filters(target, relational_filters)
  610. #Now that everything is well organized, begin to forge subquerie
  611. #filters
  612. self.__subqueries_from_relational_filters(target, rfilters)
  613. # Executing subqueries, creating filters from result, and injecting
  614. # them in original filters of the query
  615. if len(rfilters) > 0:
  616. logger.debug("Begining subquery execution")
  617. for fname in rfilters:
  618. if fname not in res:
  619. res[fname] = dict()
  620. subq_results = set()
  621. for leobject, sq_filters in rfilters[fname].items():
  622. uid_fname = mongo_fieldname(leobject._uid)
  623. log_msg = "Subquery running on collection {coll} with filters \
  624. '{filters}'"
  625. logger.debug(log_msg.format(
  626. coll=object_collection_name(leobject),
  627. filters=sq_filters))
  628. cursor = self.__collection(leobject).find(
  629. filter=sq_filters,
  630. projection=uid_fname)
  631. subq_results |= set(doc[uid_fname] for doc in cursor)
  632. #generating new filter from result
  633. if '$in' in res[fname]:
  634. #WARNING we allready have a IN on this field, doing dedup
  635. #from result
  636. deduped = set(res[fname]['$in']) & subq_results
  637. if len(deduped) == 0:
  638. del(res[fname]['$in'])
  639. else:
  640. res[fname]['$in'] = list(deduped)
  641. else:
  642. res[fname]['$in'] = list(subq_results)
  643. if len(rfilters) > 0:
  644. logger.debug("End of subquery execution")
  645. return res
  646. ##@brief Generate subqueries from rfilters tree
  647. #
  648. #Returned struct organization :
  649. # - 1st level keys : relational field name of target
  650. # - 2nd level keys : referenced leobject
  651. # - 3th level values : pymongo filters (dict)
  652. #
  653. #@note The only caller of this method is __process_filters
  654. #@warning No return value, the rfilters arguement is modified by
  655. #reference
  656. #
  657. #@param target LeObject subclass (no instance) : Target class
  658. #@param rfilters dict : A struct as returned by
  659. #MongoDbDatasource.__prepare_relational_filters()
  660. #@return None, the rfilters argument is modified by reference
  661. @classmethod
  662. def __subqueries_from_relational_filters(cls, target, rfilters):
  663. for fname in rfilters:
  664. for leobject in rfilters[fname]:
  665. for rfield in rfilters[fname][leobject]:
  666. #This way of doing is not optimized but allows to trigger
  667. #warnings in some case (2 different values for a same op
  668. #on a same field on a same collection)
  669. mongofilters = cls.__op_value_listconv(
  670. rfilters[fname][leobject][rfield], target.field(fname))
  671. rfilters[fname][leobject][rfield] = mongofilters
  672. ##@brief Generate a tree from relational_filters
  673. #
  674. #The generated struct is a dict with :
  675. # - 1st level keys : relational field name of target
  676. # - 2nd level keys : referenced leobject
  677. # - 3th level keys : referenced field in referenced class
  678. # - 4th level values : list of tuple(op, value)
  679. #
  680. #@note The only caller of this method is __process_filters
  681. #@warning An assertion is done : if two leobject are stored in the same
  682. #collection they share the same uid
  683. #
  684. #@param target LeObject subclass (no instance) : Target class
  685. #@param relational_filters : same composition thant filters except that
  686. #@return a struct as described above
  687. @classmethod
  688. def __prepare_relational_filters(cls, target, relational_filters):
  689. # We are going to regroup relationnal filters by reference field
  690. # then by collection
  691. rfilters = dict()
  692. if relational_filters is None:
  693. relational_filters = []
  694. for (fname, rfields), op, value in relational_filters:
  695. if fname not in rfilters:
  696. rfilters[fname] = dict()
  697. rfilters[fname] = dict()
  698. # Stores the representative leobject for associated to a collection
  699. # name
  700. leo_collname = dict()
  701. # WARNING ! Here we assert that all leobject that are stored
  702. # in a same collection are identified by the same field
  703. for leobject, rfield in rfields.items():
  704. #here we are filling a dict with leobject as index but
  705. #we are doing a UNIQ on collection name
  706. cur_collname = object_collection_name(leobject)
  707. if cur_collname not in leo_collname:
  708. leo_collname[cur_collname] = leobject
  709. rfilters[fname][leobject] = dict()
  710. #Fecthing the collection's representative leobject
  711. repr_leo = leo_collname[cur_collname]
  712. if rfield not in rfilters[fname][repr_leo]:
  713. rfilters[fname][repr_leo][rfield] = list()
  714. rfilters[fname][repr_leo][rfield].append((op, value))
  715. return rfilters
  716. ##@brief Convert lodel2 filters to pymongo conditions
  717. #@param filters list : list of lodel filters
  718. #@return dict representing pymongo conditions
  719. @classmethod
  720. def __filters2mongo(cls, filters, target):
  721. res = dict()
  722. eq_fieldname = [] #Stores field with equal comparison OP
  723. for fieldname, op, value in filters:
  724. oop = op
  725. ovalue = value
  726. op, value = cls.__op_value_conv(op, value, target.field(fieldname))
  727. if op == '=':
  728. eq_fieldname.append(fieldname)
  729. if fieldname in res:
  730. logger.warning("Dropping previous condition. Overwritten \
  731. by an equality filter")
  732. res[fieldname] = value
  733. continue
  734. if fieldname in eq_fieldname:
  735. logger.warning("Dropping condition : '%s %s %s'" % (
  736. fieldname, op, value))
  737. continue
  738. if fieldname not in res:
  739. res[fieldname] = dict()
  740. if op in res[fieldname]:
  741. logger.warning("Dropping condition : '%s %s %s'" % (
  742. fieldname, op, value))
  743. else:
  744. if op not in cls.lodel2mongo_op_map:
  745. raise ValueError("Invalid operator : '%s'" % op)
  746. new_op = cls.lodel2mongo_op_map[op]
  747. res[fieldname][new_op] = value
  748. return res
  749. ##@brief Convert lodel2 operator and value to pymongo struct
  750. #
  751. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  752. #@param op str : take value in LeFilteredQuery::_query_operators
  753. #@param value mixed : the value
  754. #@return a tuple(mongo_op, mongo_value)
  755. @classmethod
  756. def __op_value_conv(cls, op, value, dhdl):
  757. if op not in cls.lodel2mongo_op_map:
  758. msg = "Invalid operator '%s' found" % op
  759. raise MongoDbDataSourceError(msg)
  760. mongop = cls.lodel2mongo_op_map[op]
  761. mongoval = value
  762. #Converting lodel2 wildcarded string into a case insensitive
  763. #mongodb re
  764. if mongop in cls.mongo_op_re:
  765. if value.startswith('(') and value.endswith(')') and ',' in value:
  766. if (dhdl.cast_type is not None):
  767. mongoval = [ dhdl.cast_type(item) for item in mongoval[1:-1].split(',') ]
  768. else:
  769. mongoval = [ item for item in mongoval[1:-1].split(',') ]
  770. elif mongop == 'like':
  771. #unescaping \
  772. mongoval = value.replace('\\\\','\\')
  773. if not mongoval.startswith('*'):
  774. mongoval = '^'+mongoval
  775. #For the end of the string it's harder to detect escaped *
  776. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  777. mongoval += '$'
  778. #Replacing every other unescaped wildcard char
  779. mongoval = cls.wildcard_re.sub('.*', mongoval)
  780. mongoval = {'$regex': mongoval, '$options': 'i'}
  781. return (op, mongoval)
  782. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  783. #@return a dict with mongo op as key and value as value...
  784. @classmethod
  785. def __op_value_listconv(cls, op_value_list, dhdl):
  786. result = dict()
  787. for op, value in op_value_list:
  788. mongop, mongoval = cls.__op_value_conv(op, value, dhdl)
  789. if mongop in result:
  790. warnings.warn("Duplicated value given for a single \
  791. field/operator couple in a query. We will keep only the first one")
  792. else:
  793. result[mongop] = mongoval
  794. return result
  795. ##@brief Generate a comparison function for post reccursion sorting in
  796. #select
  797. #@return a lambda function that take 2 dict as arguement
  798. @classmethod
  799. def __generate_lambda_cmp_order(cls, order):
  800. if len(order) == 0:
  801. return lambda a,b: 0
  802. glco = cls.__generate_lambda_cmp_order
  803. fname, cmpdir = order[0]
  804. order = order[1:]
  805. return lambda a,b: glco(order) if a[fname] == b[fname] else (\
  806. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  807. else -1)
  808. ##@brief Correct some datas before giving them to pymongo
  809. #
  810. #For example sets has to be casted to lise
  811. #@param datas
  812. #@return datas
  813. @classmethod
  814. def _data_cast(cls, datas):
  815. for dname in datas:
  816. if isinstance(datas[dname], set):
  817. #pymongo raises :
  818. #bson.errors.InvalidDocument: Cannot encode object: {...}
  819. #with sets
  820. datas[dname] = list(datas[dname])
  821. return datas