No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 22KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel import logger
  11. from lodel.leapi.leobject import CLASS_ID_FIELDNAME
  12. from . import utils
  13. from .utils import object_collection_name, collection_name, \
  14. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  15. class MongoDbDataSourceError(Exception):
  16. pass
  17. class MongoDbDatasource(object):
  18. ##@brief Stores existing connections
  19. #
  20. #The key of this dict is a hash of the connection string + ro parameter.
  21. #The value is a dict with 2 keys :
  22. # - conn_count : the number of instanciated datasource that use this
  23. #connection
  24. # - db : the pymongo database object instance
  25. _connections = dict()
  26. ##@brief Mapping from lodel2 operators to mongodb operator
  27. lodel2mongo_op_map = {
  28. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  29. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  30. ##@brief List of mongodb operators that expect re as value
  31. mongo_op_re = ['$in', '$nin']
  32. wildcard_re = re.compile('[^\\\\]\*')
  33. ##@brief instanciates a database object given a connection name
  34. #@param host str : hostname or IP
  35. #@param port int : mongodb listening port
  36. #@param db_name str
  37. #@param username str
  38. #@param password str
  39. #@param read_only bool : If True the Datasource is for read only, else the
  40. #Datasource is write only !
  41. def __init__(self, host, port, db_name, username, password, read_only = False):
  42. ##@brief Connections infos that can be kept securly
  43. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  44. ##@brief Is the instance read only ? (if not it's write only)
  45. self.__read_only = bool(read_only)
  46. ##@brief Uniq ID for mongodb connection
  47. self.__conn_hash= None
  48. ##@brief Stores the database cursor
  49. self.database = self.__connect(
  50. username, password, ro = self.__read_only)
  51. ##@brief Destructor that attempt to close connection to DB
  52. #
  53. #Decrease the conn_count of associated MongoDbDatasource::_connections
  54. #item. If it reach 0 close the connection to the db
  55. #@see MongoDbDatasource::__connect()
  56. def __del__(self):
  57. self._connections[self.__conn_hash]['conn_count'] -= 1
  58. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  59. self._connections[self.__conn_hash]['db'].close()
  60. del(self._connections[self.__conn_hash])
  61. logger.info("Closing connection to database")
  62. ##@brief Provide a new uniq numeric ID
  63. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  64. #have to be uniq
  65. #@warning multiple UID broken by this method
  66. #@return an integer
  67. def new_numeric_id(self, emcomp):
  68. target = emcomp.uid_source()
  69. tuid = target._uid[0] # Multiple UID broken here
  70. results = self.select(
  71. target, field_list = [tuid], filters = [],
  72. order=[(tuid, 'DESC')], limit = 1)
  73. if len(results) == 0:
  74. return 1
  75. return results[0][tuid]+1
  76. ##@brief returns a selection of documents from the datasource
  77. #@param target Emclass
  78. #@param field_list list
  79. #@param filters list : List of filters
  80. #@param relational_filters list : List of relational filters
  81. #@param order list : List of column to order. ex: order =
  82. #[('title', 'ASC'),]
  83. #@param group list : List of tupple representing the column used as
  84. #"group by" fields. ex: group = [('title', 'ASC'),]
  85. #@param limit int : Number of records to be returned
  86. #@param offset int: used with limit to choose the start record
  87. #@return list
  88. #@todo Implement group for abstract LeObject childs
  89. def select(self, target, field_list, filters = None,
  90. relational_filters=None, order=None, group=None, limit=None, offset=0):
  91. if target.is_abstract():
  92. #Reccursiv calls for abstract LeObject child
  93. results = self.__act_on_abstract(target, filters,
  94. relational_filters, self.select, field_list = field_list,
  95. order = order, group = group, limit = limit)
  96. #Here we may implement the group
  97. #If sorted query we have to sort again
  98. if order is not None:
  99. results = sorted(results,
  100. key=functools.cmp_to_key(
  101. self.__generate_lambda_cmp_order(order)))
  102. #If limit given apply limit again
  103. if offset > len(results):
  104. results = list()
  105. else:
  106. if limit is not None:
  107. if limit + offset > len(results):
  108. limit = len(results)-offset-1
  109. results = results[offset:offset+limit]
  110. return results
  111. # Default behavior
  112. if filters is None:
  113. filters = list()
  114. if relational_filters is None:
  115. relational_filters = list()
  116. collection_name = object_collection_name(target)
  117. collection = self.database[collection_name]
  118. query_filters = self.__process_filters(
  119. target, filters, relational_filters)
  120. query_result_ordering = None
  121. if order is not None:
  122. query_result_ordering = utils.parse_query_order(order)
  123. if group is None:
  124. if field_list is None:
  125. field_list = dict()
  126. else:
  127. f_list=dict()
  128. for fl in field_list:
  129. f_list[fl] = 1
  130. field_list = f_list
  131. field_list['_id'] = 0
  132. cursor = collection.find(
  133. spec = query_filters,
  134. fields=field_list,
  135. skip=offset,
  136. limit=limit if limit != None else 0,
  137. sort=query_result_ordering)
  138. else:
  139. pipeline = list()
  140. unwinding_list = list()
  141. grouping_dict = OrderedDict()
  142. sorting_list = list()
  143. for group_param in group:
  144. field_name = group_param[0]
  145. field_sort_option = group_param[1]
  146. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  147. unwinding_list.append({'$unwind': '$%s' % field_name})
  148. grouping_dict[field_name] = '$%s' % field_name
  149. sorting_list.append((field_name, sort_option))
  150. sorting_list.extends(query_result_ordering)
  151. pipeline.append({'$match': query_filters})
  152. if field_list is not None:
  153. pipeline.append({
  154. '$project': SON([{field_name: 1}
  155. for field_name in field_list])})
  156. pipeline.extend(unwinding_list)
  157. pipeline.append({'$group': grouping_dict})
  158. pipeline.extend({'$sort': SON(sorting_list)})
  159. if offset > 0:
  160. pipeline.append({'$skip': offset})
  161. if limit is not None:
  162. pipeline.append({'$limit': limit})
  163. results = list()
  164. for document in cursor:
  165. results.append(document)
  166. return results
  167. ##@brief Deletes records according to given filters
  168. #@param target Emclass : class of the record to delete
  169. #@param filters list : List of filters
  170. #@param relational_filters list : List of relational filters
  171. #@return int : number of deleted records
  172. def delete(self, target, filters, relational_filters):
  173. if target.is_abstract():
  174. #Deletion with abstract LeObject as target (reccursiv calls)
  175. return self.__act_on_abstract(target, filters,
  176. relational_filters, self.delete)
  177. #Non abstract beahavior
  178. mongo_filters = self.__process_filters(
  179. target, filters, relational_filters)
  180. res = self.__collection(target).delete_many(mongo_filters)
  181. return res.deleted_count
  182. ## @brief updates records according to given filters
  183. #@param target Emclass : class of the object to insert
  184. #@param filters list : List of filters
  185. #@param relational_filters list : List of relational filters
  186. #@param upd_datas dict : datas to update (new values)
  187. #@return int : Number of updated records
  188. def update(self, target, filters, relational_filters, upd_datas):
  189. if target.is_abstract():
  190. #Update using abstract LeObject as target (reccursiv calls)
  191. return self.__act_on_abstract(target, filters,
  192. relational_filters, self.update, upd_datas = upd_datas)
  193. #Non abstract beahavior
  194. mongo_filters = self.__process_filters(
  195. target, filters, relational_filters)
  196. res = self.__collection(target).update(mongo_filters, upd_datas)
  197. target.make_consistency(datas=upd_datas, type_query='update')
  198. return res['n']
  199. ## @brief Inserts a record in a given collection
  200. # @param target Emclass : class of the object to insert
  201. # @param new_datas dict : datas to insert
  202. # @return the inserted uid
  203. def insert(self, target, new_datas):
  204. res = self.__collection(target).insert(new_datas)
  205. target.make_consistency(datas=new_datas)
  206. return str(res)
  207. ## @brief Inserts a list of records in a given collection
  208. # @param target Emclass : class of the objects inserted
  209. # @param datas_list list : list of dict
  210. # @return list : list of the inserted records' ids
  211. def insert_multi(self, target, datas_list):
  212. res = self.__collection(target).insert_many(datas_list)
  213. for new_datas in datas_list:
  214. target.make_consistency(datas=new_datas)
  215. return list(res.inserted_ids)
  216. ##@brief Act on abstract LeObject child
  217. #
  218. #This method is designed to be called by insert, select and delete method
  219. #when they encounter an abtract class
  220. #@param target LeObject child class
  221. #@param filters
  222. #@param relational_filters
  223. #@param act function : the caller method
  224. #@param **kwargs other arguments
  225. #@return sum of results (if it's an array it will result in a concat)
  226. def __act_on_abstract(self,
  227. target, filters, relational_filters, act, **kwargs):
  228. result = list() if act == self.select else 0
  229. if not target.is_abstract():
  230. target_childs = target
  231. else:
  232. target_childs = [tc for tc in target.child_classes()
  233. if not tc.is_abstract()]
  234. for target_child in target_childs:
  235. #Add target_child to filter
  236. new_filters = copy.copy(filters)
  237. for i in range(len(filters)):
  238. fname, op, val = filters[i]
  239. if fname == CLASS_ID_FIELDNAME:
  240. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  241. fname, op, val))
  242. del(new_filters[i])
  243. new_filters.append(
  244. (CLASS_ID_FIELDNAME, '=',
  245. collection_name(target_child.__name__)))
  246. result += act(
  247. target = target_child,
  248. filters = new_filters,
  249. relational_filters = relational_filters,
  250. **kwargs)
  251. return result
  252. ##@brief Connect to database
  253. #@note this method avoid opening two times the same connection using
  254. #MongoDbDatasource::_connections static attribute
  255. #@param username str
  256. #@param password str
  257. #@param ro bool : If True the Datasource is for read only, else the
  258. def __connect(self, username, password, ro):
  259. conn_string = connection_string(
  260. username = username, password = password,
  261. host = self.__db_infos['host'],
  262. port = self.__db_infos['port'])
  263. conn_string += "__ReadOnly__:"+str(self.__read_only)
  264. self.__conn_hash = conn_h = hash(conn_string)
  265. if conn_h in self._connections:
  266. self._connections[conn_h]['conn_count'] += 1
  267. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  268. else:
  269. logger.info("Opening a new connection to database")
  270. self._connections[conn_h] = {
  271. 'conn_count': 1,
  272. 'db': utils.connection(
  273. host = self.__db_infos['host'],
  274. port = self.__db_infos['port'],
  275. username = username,
  276. password = password)}
  277. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  278. ##@brief Return a pymongo collection given a LeObject child class
  279. #@param leobject LeObject child class (no instance)
  280. #return a pymongo.collection instance
  281. def __collection(self, leobject):
  282. return self.database[object_collection_name(leobject)]
  283. ##@brief Perform subqueries implies by relational filters and append the
  284. # result to existing filters
  285. #
  286. #The processing is divided in multiple steps :
  287. # - determine (for each relational field of the target) every collection
  288. #that are involved
  289. # - generate subqueries for relational_filters that concerns a different
  290. #collection than target collection
  291. #filters
  292. # - execute subqueries
  293. # - transform subqueries results in filters
  294. # - merge subqueries generated filters with existing filters
  295. #
  296. #@param target LeObject subclass (no instance) : Target class
  297. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  298. #@param relational_filters : same composition thant filters except that
  299. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  300. # CLASS2:RFIELD2})
  301. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  302. def __process_filters(self,target, filters, relational_filters):
  303. # Simple filters lodel2 -> pymongo converting
  304. res = self.__filters2mongo(filters)
  305. rfilters = self.__prepare_relational_filters(target, relational_filters)
  306. #Now that everything is well organized, begin to forge subquerie
  307. #filters
  308. self.__subqueries_from_relational_filters(target, rfilters)
  309. # Executing subqueries, creating filters from result, and injecting
  310. # them in original filters of the query
  311. if len(rfilters) > 0:
  312. logger.debug("Begining subquery execution")
  313. for fname in rfilters:
  314. if fname not in res:
  315. res[fname] = dict()
  316. subq_results = set()
  317. for leobject, sq_filters in rfilters[fname].items():
  318. uid_fname = mongo_fieldname(leobject._uid)
  319. log_msg = "Subquery running on collection {coll} with filters \
  320. '{filters}'"
  321. logger.debug(log_msg.format(
  322. coll=object_collection_name(leobject),
  323. filters=sq_filters))
  324. cursor = self.__collection(leobject).find(
  325. filter=sq_filters,
  326. projection=uid_fname)
  327. subq_results |= set(doc[uid_fname] for doc in cursor)
  328. #generating new filter from result
  329. if '$in' in res[fname]:
  330. #WARNING we allready have a IN on this field, doing dedup
  331. #from result
  332. deduped = set(res[fname]['$in']) & subq_results
  333. if len(deduped) == 0:
  334. del(res[fname]['$in'])
  335. else:
  336. res[fname]['$in'] = list(deduped)
  337. else:
  338. res[fname]['$in'] = list(subq_results)
  339. if len(rfilters) > 0:
  340. logger.debug("End of subquery execution")
  341. return res
  342. ##@brief Generate subqueries from rfilters tree
  343. #
  344. #Returned struct organization :
  345. # - 1st level keys : relational field name of target
  346. # - 2nd level keys : referenced leobject
  347. # - 3th level values : pymongo filters (dict)
  348. #
  349. #@note The only caller of this method is __process_filters
  350. #@warning No return value, the rfilters arguement is modified by
  351. #reference
  352. #
  353. #@param target LeObject subclass (no instance) : Target class
  354. #@param rfilters dict : A struct as returned by
  355. #MongoDbDatasource.__prepare_relational_filters()
  356. #@return None, the rfilters argument is modified by reference
  357. @classmethod
  358. def __subqueries_from_relational_filters(cls, target, rfilters):
  359. for fname in rfilters:
  360. for leobject in rfilters[fname]:
  361. for rfield in rfilters[fname][leobject]:
  362. #This way of doing is not optimized but allows to trigger
  363. #warnings in some case (2 different values for a same op
  364. #on a same field on a same collection)
  365. mongofilters = cls.__op_value_listconv(
  366. rfilters[fname][leobject][rfield])
  367. rfilters[fname][leobject][rfield] = mongofilters
  368. ##@brief Generate a tree from relational_filters
  369. #
  370. #The generated struct is a dict with :
  371. # - 1st level keys : relational field name of target
  372. # - 2nd level keys : referenced leobject
  373. # - 3th level keys : referenced field in referenced class
  374. # - 4th level values : list of tuple(op, value)
  375. #
  376. #@note The only caller of this method is __process_filters
  377. #@warning An assertion is done : if two leobject are stored in the same
  378. #collection they share the same uid
  379. #
  380. #@param target LeObject subclass (no instance) : Target class
  381. #@param relational_filters : same composition thant filters except that
  382. #@return a struct as described above
  383. @classmethod
  384. def __prepare_relational_filters(cls, target, relational_filters):
  385. # We are going to regroup relationnal filters by reference field
  386. # then by collection
  387. rfilters = dict()
  388. for (fname, rfields), op, value in relational_filters:
  389. if fname not in rfilters:
  390. rfilters[fname] = dict()
  391. rfilters[fname] = dict()
  392. # Stores the representative leobject for associated to a collection
  393. # name
  394. leo_collname = dict()
  395. # WARNING ! Here we assert that all leobject that are stored
  396. # in a same collection are identified by the same field
  397. for leobject, rfield in rfields.items():
  398. #here we are filling a dict with leobject as index but
  399. #we are doing a UNIQ on collection name
  400. cur_collname = object_collection_name(leobject)
  401. if cur_collname not in leo_collname:
  402. leo_collname[cur_collname] = leobject
  403. rfilters[fname][leobject] = dict()
  404. #Fecthing the collection's representative leobject
  405. repr_leo = leo_collname[cur_collname]
  406. if rfield not in rfilters[fname][repr_leo]:
  407. rfilters[fname][repr_leo][rfield] = list()
  408. rfilters[fname][repr_leo][rfield].append((op, value))
  409. return rfilters
  410. ##@brief Convert lodel2 filters to pymongo conditions
  411. #@param filters list : list of lodel filters
  412. #@return dict representing pymongo conditions
  413. @classmethod
  414. def __filters2mongo(cls, filters):
  415. res = dict()
  416. eq_fieldname = [] #Stores field with equal comparison OP
  417. for fieldname, op, value in filters:
  418. oop = op
  419. ovalue = value
  420. op, value = cls.__op_value_conv(op, value)
  421. if op == '=':
  422. eq_fieldname.append(fieldname)
  423. if fieldname in res:
  424. logger.warning("Dropping previous condition. Overwritten \
  425. by an equality filter")
  426. res[fieldname] = value
  427. continue
  428. if fieldname in eq_fieldname:
  429. logger.warning("Dropping condition : '%s %s %s'" % (
  430. fieldname, op, value))
  431. continue
  432. if fieldname not in res:
  433. res[fieldname] = dict()
  434. if op in res[fieldname]:
  435. logger.warning("Dropping condition : '%s %s %s'" % (
  436. fieldname, op, value))
  437. else:
  438. if op not in cls.lodel2mongo_op_map:
  439. raise ValueError("Invalid operator : '%s'" % op)
  440. new_op = cls.lodel2mongo_op_map[op]
  441. res[fieldname][new_op] = value
  442. return res
  443. ##@brief Convert lodel2 operator and value to pymongo struct
  444. #
  445. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  446. #@param op str : take value in LeFilteredQuery::_query_operators
  447. #@param value mixed : the value
  448. #@return a tuple(mongo_op, mongo_value)
  449. @classmethod
  450. def __op_value_conv(cls, op, value):
  451. if op not in cls.lodel2mongo_op_map:
  452. msg = "Invalid operator '%s' found" % op
  453. raise MongoDbDataSourceError(msg)
  454. mongop = cls.lodel2mongo_op_map[op]
  455. mongoval = value
  456. #Converting lodel2 wildcarded string into a case insensitive
  457. #mongodb re
  458. if mongop in cls.mongo_op_re:
  459. #unescaping \
  460. mongoval = value.replace('\\\\','\\')
  461. if not mongoval.startswith('*'):
  462. mongoval = '^'+mongoval
  463. #For the end of the string it's harder to detect escaped *
  464. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  465. mongoval += '$'
  466. #Replacing every other unescaped wildcard char
  467. mongoval = cls.wildcard_re.sub('.*', mongoval)
  468. mongoval = {'$regex': mongoval, '$options': 'i'}
  469. return (op, mongoval)
  470. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  471. #@return a dict with mongo op as key and value as value...
  472. @classmethod
  473. def __op_value_listconv(cls, op_value_list):
  474. result = dict()
  475. for op, value in op_value_list:
  476. mongop, mongoval = cls.__op_value_conv(op, value)
  477. if mongop in result:
  478. warnings.warn("Duplicated value given for a single \
  479. field/operator couple in a query. We will keep only the first one")
  480. else:
  481. result[mongop] = mongoval
  482. return result
  483. ##@brief Generate a comparison function for post reccursion sorting in
  484. #select
  485. #@return a lambda function that take 2 dict as arguement
  486. @classmethod
  487. def __generate_lambda_cmp_order(cls, order):
  488. if len(order) == 0:
  489. return lambda a,b: 0
  490. glco = cls.__generate_lambda_cmp_order
  491. fname, cmpdir = order[0]
  492. order = order[1:]
  493. return lambda a,b: glco(order) if a[fname] == b[fname] else (\
  494. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  495. else -1)