Nenhuma descrição
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

datasource.py 38KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. # -*- coding: utf-8 -*-
  2. ## @package plugins.mongodb_datasource.datasource Main datasource module
  3. #
  4. # In this class, there is the MongoDbDatasource class, that handles the basic
  5. # operations that can be done (CRUD ones).
  6. import re
  7. import warnings
  8. import copy
  9. import functools
  10. from bson.son import SON
  11. from collections import OrderedDict
  12. import pymongo
  13. from pymongo.errors import BulkWriteError
  14. from lodel.context import LodelContext
  15. LodelContext.expose_modules(globals(), {
  16. 'lodel.logger': 'logger',
  17. 'lodel.leapi.leobject': ['CLASS_ID_FIELDNAME'],
  18. 'lodel.leapi.datahandlers.base_classes': ['Reference', 'MultipleRef'],
  19. 'lodel.exceptions': ['LodelException', 'LodelFatalError'],
  20. 'lodel.plugin.datasource_plugin': ['AbstractDatasource']})
  21. from . import utils
  22. from .exceptions import *
  23. from .utils import object_collection_name, collection_name, \
  24. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  25. ## @brief Datasource class
  26. # @ingroup plugin_mongodb_datasource
  27. class MongoDbDatasource(AbstractDatasource):
  28. ## @brief Stores existing connections
  29. #
  30. # The key of this dict is a hash built upon the connection string and the
  31. # ro (read-only) parameter.
  32. #
  33. # The value is a dict with 2 keys :
  34. # - conn_count : the number of instanciated datasource that use this
  35. # connection
  36. # - db : the pymongo database object instance
  37. _connections = dict()
  38. ## @brief Mapping from lodel2 operators to mongodb operators
  39. lodel2mongo_op_map = {
  40. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  41. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  42. ## @brief List of mongodb operators that expect re as value
  43. mongo_op_re = ['$in', '$nin']
  44. wildcard_re = re.compile('[^\\\\]\*')
  45. ## @brief instanciates a database object given a connection name
  46. # @param host str : hostname or IP
  47. # @param port int : mongodb listening port
  48. # @param db_name str
  49. # @param username str
  50. # @param password str
  51. # @param read_only bool : If True the Datasource is for read only, else the
  52. # Datasource is write only !
  53. def __init__(self, host, port, db_name, username, password, read_only = False):
  54. ## @brief Connections infos that can be kept securly
  55. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  56. ## @brief Is the instance read only ? (if not it's write only)
  57. self.__read_only = bool(read_only)
  58. ## @brief Uniq ID for mongodb connection
  59. self.__conn_hash= None
  60. ## @brief Stores the database cursor
  61. self.database = self.__connect(
  62. username, password, db_name, self.__read_only)
  63. ## @brief Destructor that attempt to close connection to DB
  64. #
  65. # Decrease the conn_count of associated MongoDbDatasource::_connections
  66. # item. If it reach 0 close the connection to the db
  67. # @see MongoDbDatasource::__connect()
  68. def __del__(self):
  69. self._connections[self.__conn_hash]['conn_count'] -= 1
  70. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  71. self._connections[self.__conn_hash]['db'].close()
  72. del(self._connections[self.__conn_hash])
  73. logger.info("Closing connection to database")
  74. ## @brief Provides a new uniq numeric ID
  75. # @param emcomp LeObject subclass (not instance) : To know on wich things we
  76. # have to be uniq
  77. # @warning multiple UID broken by this method
  78. # @return an integer
  79. def new_numeric_id(self, emcomp):
  80. target = emcomp.uid_source()
  81. tuid = target._uid[0] # Multiple UID broken here
  82. results = self.select(
  83. target, field_list = [tuid], filters = [],
  84. order=[(tuid, 'DESC')], limit = 1)
  85. if len(results) == 0:
  86. return 1
  87. return results[0][tuid]+1
  88. ## @brief returns a selection of documents from the datasource
  89. # @param target Emclass
  90. # @param field_list list
  91. # @param filters list : List of filters
  92. # @param relational_filters list : List of relational filters
  93. # @param order list : List of column to order. ex: order = [('title', 'ASC'),]
  94. # @param group list : List of tupple representing the column used as
  95. # "group by" fields. ex: group = [('title', 'ASC'),]
  96. # @param limit int : Number of records to be returned
  97. # @param offset int: used with limit to choose the start record
  98. # @return list
  99. # @todo Implement group for abstract LeObject childs
  100. def select(self, target, field_list, filters = None,
  101. relational_filters=None, order=None, group=None, limit=None,
  102. offset=0):
  103. if target.is_abstract():
  104. # Reccursive calls for abstract LeObject child
  105. results = self.__act_on_abstract(target, filters,
  106. relational_filters, self.select, field_list = field_list,
  107. order = order, group = group, limit = limit)
  108. # Here we may implement the group
  109. # If sorted query we have to sort again
  110. if order is not None:
  111. key_fun = functools.cmp_to_key(
  112. self.__generate_lambda_cmp_order(order))
  113. results = sorted(results, key=key_fun)
  114. # If limit given apply limit again
  115. if offset > len(results):
  116. results = list()
  117. else:
  118. if limit is not None:
  119. if limit + offset > len(results):
  120. limit = len(results)-offset-1
  121. results = results[offset:offset+limit]
  122. return results
  123. # Default behavior
  124. if filters is None:
  125. filters = list()
  126. if relational_filters is None:
  127. relational_filters = list()
  128. collection_name = object_collection_name(target)
  129. collection = self.database[collection_name]
  130. query_filters = self.__process_filters(
  131. target, filters, relational_filters)
  132. query_result_ordering = None
  133. if order is not None:
  134. query_result_ordering = utils.parse_query_order(order)
  135. if group is None:
  136. if field_list is None:
  137. field_list = dict()
  138. else:
  139. f_list=dict()
  140. for fl in field_list:
  141. f_list[fl] = 1
  142. field_list = f_list
  143. field_list['_id'] = 0
  144. cursor = collection.find(
  145. spec = query_filters,
  146. fields=field_list,
  147. skip=offset,
  148. limit=limit if limit != None else 0,
  149. sort=query_result_ordering)
  150. else:
  151. pipeline = list()
  152. unwinding_list = list()
  153. grouping_dict = OrderedDict()
  154. sorting_list = list()
  155. for group_param in group:
  156. field_name = group_param[0]
  157. field_sort_option = group_param[1]
  158. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  159. unwinding_list.append({'$unwind': '$%s' % field_name})
  160. grouping_dict[field_name] = '$%s' % field_name
  161. sorting_list.append((field_name, sort_option))
  162. sorting_list.extends(query_result_ordering)
  163. pipeline.append({'$match': query_filters})
  164. if field_list is not None:
  165. pipeline.append({
  166. '$project': SON([{field_name: 1}
  167. for field_name in field_list])})
  168. pipeline.extend(unwinding_list)
  169. pipeline.append({'$group': grouping_dict})
  170. pipeline.extend({'$sort': SON(sorting_list)})
  171. if offset > 0:
  172. pipeline.append({'$skip': offset})
  173. if limit is not None:
  174. pipeline.append({'$limit': limit})
  175. results = list()
  176. for document in cursor:
  177. results.append(document)
  178. return results
  179. ## @brief Deletes records according to given filters
  180. # @param target Emclass : class of the record to delete
  181. # @param filters list : List of filters
  182. # @param relational_filters list : List of relational filters
  183. # @return int : number of deleted records
  184. def delete(self, target, filters, relational_filters):
  185. if target.is_abstract():
  186. logger.debug("Delete called on %s filtered by (%s,%s). Target is \
  187. abstract, preparing reccursiv calls" % (target, filters, relational_filters))
  188. # Deletion with abstract LeObject as target (reccursiv calls)
  189. return self.__act_on_abstract(target, filters,
  190. relational_filters, self.delete)
  191. logger.debug("Delete called on %s filtered by (%s,%s)." % (
  192. target, filters, relational_filters))
  193. # Non abstract beahavior
  194. mongo_filters = self.__process_filters(
  195. target, filters, relational_filters)
  196. # Updating backref before deletion
  197. self.__update_backref_filtered(target, filters, relational_filters,
  198. None)
  199. res = self.__collection(target).remove(mongo_filters)
  200. return res['n']
  201. ## @brief updates records according to given filters
  202. # @param target Emclass : class of the object to insert
  203. # @param filters list : List of filters
  204. # @param relational_filters list : List of relational filters
  205. # @param upd_datas dict : datas to update (new values)
  206. # @return int : Number of updated records
  207. def update(self, target, filters, relational_filters, upd_datas):
  208. self._data_cast(upd_datas)
  209. #fetching current datas state
  210. mongo_filters = self.__process_filters(
  211. target, filters, relational_filters)
  212. old_datas_l = self.__collection(target).find(
  213. mongo_filters)
  214. old_datas_l = list(old_datas_l)
  215. #Running update
  216. res = self.__update_no_backref(target, filters, relational_filters,
  217. upd_datas)
  218. #updating backref
  219. self.__update_backref_filtered(target, filters, relational_filters,
  220. upd_datas, old_datas_l)
  221. return res
  222. ## @brief Designed to be called by backref update in order to avoid
  223. # infinite updates between back references
  224. # @see update()
  225. def __update_no_backref(self, target, filters, relational_filters,
  226. upd_datas):
  227. logger.debug("Update called on %s filtered by (%s,%s) with datas \
  228. %s" % (target, filters, relational_filters, upd_datas))
  229. if target.is_abstract():
  230. #Update using abstract LeObject as target (reccursiv calls)
  231. return self.__act_on_abstract(target, filters,
  232. relational_filters, self.update, upd_datas = upd_datas)
  233. #Non abstract beahavior
  234. mongo_filters = self.__process_filters(
  235. target, filters, relational_filters)
  236. self._data_cast(upd_datas)
  237. mongo_arg = {'$set': upd_datas }
  238. res = self.__collection(target).update(mongo_filters, mongo_arg)
  239. return res['n']
  240. ## @brief Inserts a record in a given collection
  241. # @param target Emclass : class of the object to insert
  242. # @param new_datas dict : datas to insert
  243. # @return the inserted uid
  244. def insert(self, target, new_datas):
  245. self._data_cast(new_datas)
  246. logger.debug("Insert called on %s with datas : %s"% (
  247. target, new_datas))
  248. uidname = target.uid_fieldname()[0] #MULTIPLE UID BROKEN HERE
  249. if uidname not in new_datas:
  250. raise MongoDataSourceError("Missing UID data will inserting a new \
  251. %s" % target.__class__)
  252. res = self.__collection(target).insert(new_datas)
  253. self.__update_backref(target, new_datas[uidname], None, new_datas)
  254. return str(res)
  255. ## @brief Inserts a list of records in a given collection
  256. # @param target Emclass : class of the objects inserted
  257. # @param datas_list list : list of dict
  258. # @return list : list of the inserted records' ids
  259. def insert_multi(self, target, datas_list):
  260. for datas in datas_list:
  261. self._data_cast(datas)
  262. res = self.__collection(target).insert_many(datas_list)
  263. for new_datas in datas_list:
  264. self.__update_backref(target, None, new_datas)
  265. target.make_consistency(datas=new_datas)
  266. return list(res.inserted_ids)
  267. ## @brief Update backref giving an action
  268. # @param target leObject child class
  269. # @param filters
  270. # @param relational_filters,
  271. # @param new_datas None | dict : optional new datas if None mean we are deleting
  272. # @param old_datas_l None | list : if None fetch old datas from db (usefull
  273. # when modifications are made on instance before updating backrefs)
  274. # @return nothing (for the moment
  275. def __update_backref_filtered(self, target,
  276. filters, relational_filters, new_datas = None, old_datas_l = None):
  277. # Getting all the UID of the object that will be deleted in order
  278. # to update back_references
  279. if old_datas_l is None:
  280. mongo_filters = self.__process_filters(
  281. target, filters, relational_filters)
  282. old_datas_l = self.__collection(target).find(
  283. mongo_filters)
  284. old_datas_l = list(old_datas_l)
  285. uidname = target.uid_fieldname()[0] # MULTIPLE UID BROKEN HERE
  286. for old_datas in old_datas_l:
  287. self.__update_backref(
  288. target, old_datas[uidname], old_datas, new_datas)
  289. ## @brief Update back references of an object
  290. # @ingroup plugin_mongodb_bref_op
  291. #
  292. # old_datas and new_datas arguments are set to None to indicate
  293. # insertion or deletion. Calls examples :
  294. # @par LeObject insert __update backref call
  295. # <pre>
  296. # Insert(datas):
  297. # self.make_insert(datas)
  298. # self.__update_backref(self.__class__, None, datas)
  299. # </pre>
  300. # @par LeObject delete __update backref call
  301. # Delete()
  302. # old_datas = self.datas()
  303. # self.make_delete()
  304. #  self.__update_backref(self.__class__, old_datas, None)
  305. # @par LeObject update __update_backref call
  306. # <pre>
  307. # Update(new_datas):
  308. # old_datas = self.datas()
  309. # self.make_udpdate(new_datas)
  310. #  self.__update_backref(self.__class__, old_datas, new_datas)
  311. # </pre>
  312. #
  313. # @param target LeObject child classa
  314. # @param tuid mixed : The target UID (the value that will be inserted in
  315. # back references)
  316. # @param old_datas dict : datas state before update
  317. # @param new_datas dict : datas state after the update process
  318. def __update_backref(self, target, tuid, old_datas, new_datas):
  319. #upd_dict is the dict that will allow to run updates in an optimized
  320. #way (or try to help doing it)
  321. #
  322. #Its structure looks like :
  323. # { LeoCLASS : {
  324. # UID1: (
  325. # LeoINSTANCE,
  326. # { fname1 : value, fname2: value }),
  327. # UID2 (LeoINSTANCE, {fname...}),
  328. # },
  329. # LeoClass2: {...
  330. #
  331. upd_dict = {}
  332. for fname, fdh in target.reference_handlers().items():
  333. oldd = old_datas is not None and fname in old_datas and \
  334. (not hasattr(fdh, 'default') or old_datas[fname] != fdh.default) \
  335. and not old_datas[fname] is None
  336. newd = new_datas is not None and fname in new_datas and \
  337. (not hasattr(fdh, 'default') or new_datas[fname] != fdh.default) \
  338. and not new_datas[fname] is None
  339. if (oldd and newd and old_datas[fname] == new_datas[fname])\
  340. or not(oldd or newd):
  341. # No changes or not concerned
  342. continue
  343. bref_cls = fdh.back_reference[0]
  344. bref_fname = fdh.back_reference[1]
  345. if not fdh.is_singlereference():
  346. # fdh is a multiple reference. So the update preparation will be
  347. # divided into two loops :
  348. # - one loop for deleting old datas
  349. # - one loop for inserting updated datas
  350. #
  351. # Preparing the list of values to delete or to add
  352. if newd and oldd:
  353. old_values = old_datas[fname]
  354. new_values = new_datas[fname]
  355. to_del = [ val
  356. for val in old_values
  357. if val not in new_values]
  358. to_add = [ val
  359. for val in new_values
  360. if val not in old_values]
  361. elif oldd and not newd:
  362. to_del = old_datas[fname]
  363. to_add = []
  364. elif not oldd and newd:
  365. to_del = []
  366. to_add = new_datas[fname]
  367. # Calling __back_ref_upd_one_value() with good arguments
  368. for vtype, vlist in [('old',to_del), ('new', to_add)]:
  369. for value in vlist:
  370. # fetching backref infos
  371. bref_infos = self.__bref_get_check(
  372. bref_cls, value, bref_fname)
  373. # preparing the upd_dict
  374. upd_dict = self.__update_backref_upd_dict_prepare(
  375. upd_dict, bref_infos, bref_fname, value)
  376. # preparing updated bref_infos
  377. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  378. bref_infos = (bref_cls, bref_leo, bref_dh,
  379. upd_dict[bref_cls][value][1][bref_fname])
  380. vdict = {vtype: value}
  381. # fetch and store updated value
  382. new_bref_val = self.__back_ref_upd_one_value(
  383. fname, fdh, tuid, bref_infos, **vdict)
  384. upd_dict[bref_cls][value][1][bref_fname] = new_bref_val
  385. else:
  386. # fdh is a single ref so the process is simpler, we do not have
  387. # to loop and we may do an update in only one
  388. # __back_ref_upd_one_value() call by giving both old and new
  389. # value
  390. vdict = {}
  391. if oldd:
  392. vdict['old'] = old_datas[fname]
  393. uid_val = vdict['old']
  394. if newd:
  395. vdict['new'] = new_datas[fname]
  396. if not oldd:
  397. uid_val = vdict['new']
  398. # Fetching back ref infos
  399. bref_infos = self.__bref_get_check(
  400. bref_cls, uid_val, bref_fname)
  401. # prepare the upd_dict
  402. upd_dict = self.__update_backref_upd_dict_prepare(
  403. upd_dict, bref_infos, bref_fname, uid_val)
  404. # forging update bref_infos
  405. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  406. bref_infos = (bref_cls, bref_leo, bref_dh,
  407. upd_dict[bref_cls][uid_val][1][bref_fname])
  408. # fetch and store updated value
  409. new_bref_val = self.__back_ref_upd_one_value(
  410. fname, fdh, tuid, bref_infos, **vdict)
  411. upd_dict[bref_cls][uid_val][1][bref_fname] = new_bref_val
  412. # Now we've got our upd_dict ready.
  413. # running the updates
  414. for bref_cls, uid_dict in upd_dict.items():
  415. for uidval, (leo, datas) in uid_dict.items():
  416. # MULTIPLE UID BROKEN 2 LINES BELOW
  417. self.__update_no_backref(
  418. leo.__class__, [(leo.uid_fieldname()[0], '=', uidval)],
  419. [], datas)
  420. ## @brief Utility function designed to handle the upd_dict of __update_backref()
  421. #
  422. # Basically checks if a key exists at some level, if not create it with
  423. # the good default value (in most case dict())
  424. # @param upd_dict dict : in & out args modified by reference
  425. # @param bref_infos tuple : as returned by __bref_get_check()
  426. # @param bref_fname str : name of the field in referenced class
  427. # @param uid_val mixed : the UID of the referenced object
  428. # @return the updated version of upd_dict
  429. @staticmethod
  430. def __update_backref_upd_dict_prepare(upd_dict,bref_infos, bref_fname,
  431. uid_val):
  432. bref_cls, bref_leo, bref_dh, bref_value = bref_infos
  433. if bref_cls not in upd_dict:
  434. upd_dict[bref_cls] = {}
  435. if uid_val not in upd_dict[bref_cls]:
  436. upd_dict[bref_cls][uid_val] = (bref_leo, {})
  437. if bref_fname not in upd_dict[bref_cls][uid_val]:
  438. upd_dict[bref_cls][uid_val][1][bref_fname] = bref_value
  439. return upd_dict
  440. ## @brief Prepare a one value back reference update
  441. # @param fname str : the source Reference field name
  442. # @param fdh DataHandler : the source Reference DataHandler
  443. # @param tuid mixed : the uid of the Leo that make reference to the backref
  444. # @param bref_infos tuple : as returned by __bref_get_check() method
  445. # @param old mixed : (optional **values) the old value
  446. # @param new mixed : (optional **values) the new value
  447. # @return the new back reference field value
  448. def __back_ref_upd_one_value(self, fname, fdh, tuid, bref_infos, **values):
  449. bref_cls, bref_leo, bref_dh, bref_val = bref_infos
  450. oldd = 'old' in values
  451. newdd = 'new' in values
  452. if bref_val is None:
  453. bref_val = bref_dh.empty()
  454. if not bref_dh.is_singlereference():
  455. if oldd and newdd:
  456. if tuid not in bref_val:
  457. raise MongoDbConsistencyError("The value we want to \
  458. delete in this back reference update was not found in the back referenced \
  459. object : %s. Value was : '%s'" % (bref_leo, tuid))
  460. return bref_val
  461. elif oldd and not newdd:
  462. # deletion
  463. old_value = values['old']
  464. if tuid not in bref_val:
  465. raise MongoDbConsistencyError("The value we want to \
  466. delete in this back reference update was not found in the back referenced \
  467. object : %s. Value was : '%s'" % (bref_leo, tuid))
  468. if isinstance(bref_val, tuple):
  469. bref_val = set(bref_val)
  470. if isinstance(bref_val, set):
  471. bref_val -= set([tuid])
  472. else:
  473. del(bref_val[bref_val.index(tuid)])
  474. elif not oldd and newdd:
  475. if tuid in bref_val:
  476. raise MongoDbConsistencyError("The value we want to \
  477. add in this back reference update was found in the back referenced \
  478. object : %s. Value was : '%s'" % (bref_leo, tuid))
  479. if isinstance(bref_val, tuple):
  480. bref_val = set(bref_val)
  481. if isinstance(bref_val, set):
  482. bref_val |= set([tuid])
  483. else:
  484. bref_val.append(tuid)
  485. else:
  486. # Single value backref
  487. if oldd and newdd:
  488. if bref_val != tuid:
  489. raise MongoDbConsistencyError("The backreference doesn't \
  490. have expected value. Expected was %s but found %s in %s" % (
  491. tuid, bref_val, bref_leo))
  492. return bref_val
  493. elif oldd and not newdd:
  494. # deletion
  495. if not hasattr(bref_dh, "default"):
  496. raise MongoDbConsistencyError("Unable to delete a \
  497. value for a back reference update. The concerned field don't have a default \
  498. value : in %s field %s" % (bref_leo,fname))
  499. bref_val = getattr(bref_dh, "default")
  500. elif not oldd and newdd:
  501. bref_val = tuid
  502. return bref_val
  503. ## @brief Fetch back reference informations
  504. # @warning thank's to __update_backref_act() this method is useless
  505. # @param bref_cls LeObject child class : __back_reference[0]
  506. # @param uidv mixed : UID value (the content of the reference field)
  507. # @param bref_fname str : the name of the back_reference field
  508. # @return tuple(bref_class, bref_LeObect_instance, bref_datahandler, bref_value)
  509. # @throw MongoDbConsistencyError when LeObject instance not found given uidv
  510. # @throw LodelFatalError if the back reference field is not a Reference subclass (major failure)
  511. def __bref_get_check(self, bref_cls, uidv, bref_fname):
  512. bref_leo = bref_cls.get_from_uid(uidv)
  513. if bref_leo is None:
  514. raise MongoDbConsistencyError("Unable to get the object we make \
  515. reference to : %s with uid = %s" % (bref_cls, repr(uidv)))
  516. bref_dh = bref_leo.data_handler(bref_fname)
  517. if not bref_dh.is_reference():
  518. raise LodelFatalError("Found a back reference field that \
  519. is not a reference : '%s' field '%s'" % (bref_leo, bref_fname))
  520. bref_val = bref_leo.data(bref_fname)
  521. return (bref_leo.__class__, bref_leo, bref_dh, bref_val)
  522. ## @brief Act on abstract LeObject child
  523. #
  524. # This method is designed to be called by insert, select and delete method
  525. # when they encounter an abtract class
  526. # @param target LeObject child class
  527. # @param filters
  528. # @param relational_filters
  529. # @param act function : the caller method
  530. # @param **kwargs other arguments
  531. # @return sum of results (if it's an array it will result in a concat)
  532. # @todo optimization implementing a cache for __bref_get_check()
  533. def __act_on_abstract(self,
  534. target, filters, relational_filters, act, **kwargs):
  535. logger.debug("Abstract %s, running reccursiv select \
  536. on non abstract childs" % act.__name__)
  537. result = list() if act == self.select else 0
  538. if not target.is_abstract():
  539. target_childs = [target]
  540. else:
  541. target_childs = [tc for tc in target.child_classes()
  542. if not tc.is_abstract()]
  543. for target_child in target_childs:
  544. logger.debug(
  545. "Abstract %s on %s" % (act.__name__, target_child.__name__))
  546. # Add target_child to filter
  547. new_filters = copy.copy(filters)
  548. for i in range(len(filters)):
  549. fname, op, val = filters[i]
  550. if fname == CLASS_ID_FIELDNAME:
  551. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  552. fname, op, val))
  553. del(new_filters[i])
  554. new_filters.append(
  555. (CLASS_ID_FIELDNAME, '=',
  556. collection_name(target_child.__name__)))
  557. result += act(
  558. target = target_child,
  559. filters = new_filters,
  560. relational_filters = relational_filters,
  561. **kwargs)
  562. return result
  563. ## @brief Connect to database
  564. # @note this method avoid opening two times the same connection using MongoDbDatasource::_connections static attribute
  565. # @param username str
  566. # @param password str
  567. # @param ro bool : If True the Datasource is for read only, else it will be write only
  568. def __connect(self, username, password, db_name, ro):
  569. conn_string = connection_string(
  570. username = username, password = password,
  571. host = self.__db_infos['host'],
  572. port = self.__db_infos['port'],
  573. db_name = db_name,
  574. ro = ro)
  575. self.__conn_hash = conn_h = hash(conn_string)
  576. if conn_h in self._connections:
  577. self._connections[conn_h]['conn_count'] += 1
  578. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  579. else:
  580. logger.info("Opening a new connection to database")
  581. self._connections[conn_h] = {
  582. 'conn_count': 1,
  583. 'db': utils.connect(conn_string)}
  584. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  585. ## @brief Return a pymongo collection given a LeObject child class
  586. # @param leobject LeObject child class (no instance)
  587. # @return a pymongo.collection instance
  588. def __collection(self, leobject):
  589. return self.database[object_collection_name(leobject)]
  590. ## @brief Perform subqueries implies by relational filters and append the
  591. # result to existing filters
  592. #
  593. # The processing is divided in multiple steps :
  594. # - determine (for each relational field of the target) every collection that are involved
  595. # - generate subqueries for relational_filters that concerns a different collection than target collection filters
  596. # - execute subqueries
  597. # - transform subqueries results in filters
  598. # - merge subqueries generated filters with existing filters
  599. #
  600. # @param target LeObject subclass (no instance) : Target class
  601. # @param filters list : List of tuple(FIELDNAME, OP, VALUE)
  602. # @param relational_filters : same composition thant filters except that FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1, CLASS2:RFIELD2})
  603. # @return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  604. def __process_filters(self,target, filters, relational_filters):
  605. # Simple filters lodel2 -> pymongo converting
  606. res = self.__filters2mongo(filters, target)
  607. rfilters = self.__prepare_relational_filters(target, relational_filters)
  608. #Now that everything is well organized, begin to forge subquerie
  609. #filters
  610. self.__subqueries_from_relational_filters(target, rfilters)
  611. # Executing subqueries, creating filters from result, and injecting
  612. # them in original filters of the query
  613. if len(rfilters) > 0:
  614. logger.debug("Begining subquery execution")
  615. for fname in rfilters:
  616. if fname not in res:
  617. res[fname] = dict()
  618. subq_results = set()
  619. for leobject, sq_filters in rfilters[fname].items():
  620. uid_fname = mongo_fieldname(leobject._uid)
  621. log_msg = "Subquery running on collection {coll} with filters \
  622. '{filters}'"
  623. logger.debug(log_msg.format(
  624. coll=object_collection_name(leobject),
  625. filters=sq_filters))
  626. cursor = self.__collection(leobject).find(
  627. filter=sq_filters,
  628. projection=uid_fname)
  629. subq_results |= set(doc[uid_fname] for doc in cursor)
  630. #generating new filter from result
  631. if '$in' in res[fname]:
  632. #WARNING we allready have a IN on this field, doing dedup
  633. #from result
  634. deduped = set(res[fname]['$in']) & subq_results
  635. if len(deduped) == 0:
  636. del(res[fname]['$in'])
  637. else:
  638. res[fname]['$in'] = list(deduped)
  639. else:
  640. res[fname]['$in'] = list(subq_results)
  641. if len(rfilters) > 0:
  642. logger.debug("End of subquery execution")
  643. return res
  644. ## @brief Generate subqueries from rfilters tree
  645. #
  646. # Returned struct organization :
  647. # - 1st level keys : relational field name of target
  648. # - 2nd level keys : referenced leobject
  649. # - 3th level values : pymongo filters (dict)
  650. #
  651. # @note The only caller of this method is __process_filters
  652. # @warning No return value, the rfilters arguement is modified by reference
  653. #
  654. # @param target LeObject subclass (no instance) : Target class
  655. # @param rfilters dict : A struct as returned by MongoDbDatasource.__prepare_relational_filters()
  656. # @return None, the rfilters argument is modified by reference
  657. @classmethod
  658. def __subqueries_from_relational_filters(cls, target, rfilters):
  659. for fname in rfilters:
  660. for leobject in rfilters[fname]:
  661. for rfield in rfilters[fname][leobject]:
  662. #This way of doing is not optimized but allows to trigger
  663. #warnings in some case (2 different values for a same op
  664. #on a same field on a same collection)
  665. mongofilters = cls.__op_value_listconv(
  666. rfilters[fname][leobject][rfield], target.field(fname))
  667. rfilters[fname][leobject][rfield] = mongofilters
  668. ## @brief Generate a tree from relational_filters
  669. #
  670. # The generated struct is a dict with :
  671. # - 1st level keys : relational field name of target
  672. # - 2nd level keys : referenced leobject
  673. # - 3th level keys : referenced field in referenced class
  674. # - 4th level values : list of tuple(op, value)
  675. #
  676. # @note The only caller of this method is __process_filters
  677. # @warning An assertion is done : if two leobject are stored in the same collection they share the same uid
  678. #
  679. # @param target LeObject subclass (no instance) : Target class
  680. # @param relational_filters : same composition thant filters except that
  681. # @return a struct as described above
  682. @classmethod
  683. def __prepare_relational_filters(cls, target, relational_filters):
  684. # We are going to regroup relationnal filters by reference field
  685. # then by collection
  686. rfilters = dict()
  687. if relational_filters is None:
  688. relational_filters = []
  689. for (fname, rfields), op, value in relational_filters:
  690. if fname not in rfilters:
  691. rfilters[fname] = dict()
  692. rfilters[fname] = dict()
  693. # Stores the representative leobject for associated to a collection
  694. # name
  695. leo_collname = dict()
  696. # WARNING ! Here we assert that all leobject that are stored
  697. # in a same collection are identified by the same field
  698. for leobject, rfield in rfields.items():
  699. #here we are filling a dict with leobject as index but
  700. #we are doing a UNIQ on collection name
  701. cur_collname = object_collection_name(leobject)
  702. if cur_collname not in leo_collname:
  703. leo_collname[cur_collname] = leobject
  704. rfilters[fname][leobject] = dict()
  705. #Fecthing the collection's representative leobject
  706. repr_leo = leo_collname[cur_collname]
  707. if rfield not in rfilters[fname][repr_leo]:
  708. rfilters[fname][repr_leo][rfield] = list()
  709. rfilters[fname][repr_leo][rfield].append((op, value))
  710. return rfilters
  711. ## @brief Convert lodel2 filters to pymongo conditions
  712. # @param filters list : list of lodel filters
  713. # @return dict representing pymongo conditions
  714. @classmethod
  715. def __filters2mongo(cls, filters, target):
  716. res = dict()
  717. eq_fieldname = [] #Stores field with equal comparison OP
  718. for fieldname, op, value in filters:
  719. oop = op
  720. ovalue = value
  721. op, value = cls.__op_value_conv(op, value, target.field(fieldname))
  722. if op == '=':
  723. eq_fieldname.append(fieldname)
  724. if fieldname in res:
  725. logger.warning("Dropping previous condition. Overwritten \
  726. by an equality filter")
  727. res[fieldname] = value
  728. continue
  729. if fieldname in eq_fieldname:
  730. logger.warning("Dropping condition : '%s %s %s'" % (
  731. fieldname, op, value))
  732. continue
  733. if fieldname not in res:
  734. res[fieldname] = dict()
  735. if op in res[fieldname]:
  736. logger.warning("Dropping condition : '%s %s %s'" % (
  737. fieldname, op, value))
  738. else:
  739. if op not in cls.lodel2mongo_op_map:
  740. raise ValueError("Invalid operator : '%s'" % op)
  741. new_op = cls.lodel2mongo_op_map[op]
  742. res[fieldname][new_op] = value
  743. return res
  744. ## @brief Convert lodel2 operator and value to pymongo struct
  745. #
  746. # Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  747. # @param op str : take value in LeFilteredQuery::_query_operators
  748. # @param value mixed : the value
  749. # @param dhdl
  750. # @return a tuple(mongo_op, mongo_value)
  751. @classmethod
  752. def __op_value_conv(cls, op, value, dhdl):
  753. if op not in cls.lodel2mongo_op_map:
  754. msg = "Invalid operator '%s' found" % op
  755. raise MongoDbDataSourceError(msg)
  756. mongop = cls.lodel2mongo_op_map[op]
  757. mongoval = value
  758. # Converting lodel2 wildcarded string into a case insensitive
  759. # mongodb re
  760. if mongop in cls.mongo_op_re:
  761. if value.startswith('(') and value.endswith(')'):
  762. if (dhdl.cast_type is not None):
  763. mongoval = [ dhdl.cast_type(item) for item in mongoval[1:-1].split(',') ]
  764. else:
  765. mongoval = [ item for item in mongoval[1:-1].split(',') ]
  766. elif mongop == 'like':
  767. #unescaping \
  768. mongoval = value.replace('\\\\','\\')
  769. if not mongoval.startswith('*'):
  770. mongoval = '^'+mongoval
  771. #For the end of the string it's harder to detect escaped *
  772. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  773. mongoval += '$'
  774. #Replacing every other unescaped wildcard char
  775. mongoval = cls.wildcard_re.sub('.*', mongoval)
  776. mongoval = {'$regex': mongoval, '$options': 'i'}
  777. return (op, mongoval)
  778. ## @brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  779. # @param op_value_list list
  780. # @param dhdl
  781. # @return a dict with mongo op as key and value as value...
  782. @classmethod
  783. def __op_value_listconv(cls, op_value_list, dhdl):
  784. result = dict()
  785. for op, value in op_value_list:
  786. mongop, mongoval = cls.__op_value_conv(op, value, dhdl)
  787. if mongop in result:
  788. warnings.warn("Duplicated value given for a single \
  789. field/operator couple in a query. We will keep only the first one")
  790. else:
  791. result[mongop] = mongoval
  792. return result
  793. ##@brief Generate a comparison function for post reccursion sorting in
  794. #select
  795. #@return a lambda function that take 2 dict as arguement
  796. @classmethod
  797. def __generate_lambda_cmp_order(cls, order):
  798. if len(order) == 0:
  799. return lambda a,b: 0
  800. glco = cls.__generate_lambda_cmp_order
  801. fname, cmpdir = order[0]
  802. order = order[1:]
  803. return lambda a,b: glco(order)(a,b) if a[fname] == b[fname] else (\
  804. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  805. else -1)
  806. ##@brief Correct some datas before giving them to pymongo
  807. #
  808. #For example sets has to be casted to lise
  809. #@param datas
  810. #@return datas
  811. @classmethod
  812. def _data_cast(cls, datas):
  813. for dname in datas:
  814. if isinstance(datas[dname], set):
  815. #pymongo raises :
  816. #bson.errors.InvalidDocument: Cannot encode object: {...}
  817. #with sets
  818. datas[dname] = list(datas[dname])
  819. return datas