No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel import logger
  11. from lodel.leapi.leobject import CLASS_ID_FIELDNAME
  12. from . import utils
  13. from .utils import object_collection_name,\
  14. MONGODB_SORT_OPERATORS_MAP, connection_string
  15. class MongoDbDataSourceError(Exception):
  16. pass
  17. class MongoDbDatasource(object):
  18. ##@brief Stores existing connections
  19. #
  20. #The key of this dict is a hash of the connection string + ro parameter.
  21. #The value is a dict with 2 keys :
  22. # - conn_count : the number of instanciated datasource that use this
  23. #connection
  24. # - db : the pymongo database object instance
  25. _connections = dict()
  26. ##@brief Mapping from lodel2 operators to mongodb operator
  27. lodel2mongo_op_map = {
  28. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  29. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  30. ##@brief List of mongodb operators that expect re as value
  31. mongo_op_re = ['$in', '$nin']
  32. wildcard_re = re.compile('[^\\\\]\*')
  33. ##@brief instanciates a database object given a connection name
  34. #@param host str : hostname or IP
  35. #@param port int : mongodb listening port
  36. #@param db_name str
  37. #@param username str
  38. #@param password str
  39. #@param ro bool : If True the Datasource is for read only, else the
  40. #Datasource is write only !
  41. def __init__(self, host, port, db_name, username, password, read_only = False):
  42. ##@brief Connections infos that can be kept securly
  43. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  44. ##@brief Is the instance read only ? (if not it's write only)
  45. self.__read_only = bool(read_only)
  46. ##@brief Uniq ID for mongodb connection
  47. self.__conn_hash= None
  48. ##@brief Stores the database cursor
  49. self.database = self.__connect(
  50. username, password, ro = self.__read_only)
  51. ##@brief Destructor that attempt to close connection to DB
  52. #
  53. #Decrease the conn_count of associated MongoDbDatasource::_connections
  54. #item. If it reach 0 close the connection to the db
  55. #@see MongoDbDatasource::__connect()
  56. def __del__(self):
  57. self._connections[self.__conn_hash]['conn_count'] -= 1
  58. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  59. self._connections[self.__conn_hash]['db'].close()
  60. del(self._connections[self.__conn_hash])
  61. logger.info("Closing connection to database")
  62. ##@brief Provide a new uniq numeric ID
  63. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  64. #have to be uniq
  65. #@warning multiple UID broken by this method
  66. #@return an integer
  67. def new_numeric_id(self, emcomp):
  68. target = emcomp.uid_source()
  69. tuid = target._uid[0] # Multiple UID broken here
  70. results = self.select(
  71. target, field_list = [tuid], filters = [],
  72. order=[(tuid, 'DESC')], limit = 1)
  73. if len(results) == 0:
  74. return 1
  75. return results[0][tuid]+1
  76. ##@brief returns a selection of documents from the datasource
  77. #@param target Emclass
  78. #@param field_list list
  79. #@param filters list : List of filters
  80. #@param rel_filters list : List of relational filters
  81. #@param order list : List of column to order. ex: order =
  82. #[('title', 'ASC'),]
  83. #@param group list : List of tupple representing the column used as
  84. #"group by" fields. ex: group = [('title', 'ASC'),]
  85. #@param limit int : Number of records to be returned
  86. #@param offset int: used with limit to choose the start record
  87. #@return list
  88. #@todo Implement group for abstract LeObject childs
  89. def select(self, target, field_list, filters = None,
  90. relational_filters=None, order=None, group=None, limit=None, offset=0):
  91. if target.is_abstract():
  92. #Reccursiv calls for abstract LeObject child
  93. results = self.__act_on_abstract(target, filters,
  94. relational_filters, self.select, field_list = field_list,
  95. order = order, group = group, limit = limit)
  96. #Here we may implement the group
  97. #If sorted query we have to sort again
  98. if order is not None:
  99. results = sorted(results,
  100. key=functools.cmp_to_key(
  101. self.__generate_lambda_cmp_order(order)))
  102. #If limit given apply limit again
  103. if offset > len(results):
  104. results = list()
  105. else:
  106. if limit is not None:
  107. if limit + offset >= len(results):
  108. limit = len(results)-offset-1
  109. results = results[offset:offset+limit]
  110. return results
  111. # Default behavior
  112. if filters is None:
  113. filters = list()
  114. if relational_filters is None:
  115. relational_filters = list()
  116. collection_name = object_collection_name(target)
  117. collection = self.database[collection_name]
  118. query_filters = self.__process_filters(
  119. target, filters, relational_filters)
  120. query_result_ordering = None
  121. if order is not None:
  122. query_result_ordering = utils.parse_query_order(order)
  123. results_field_list = None if len(field_list) == 0 else field_list
  124. limit = limit if limit is not None else 0
  125. if group is None:
  126. cursor = collection.find(
  127. filter=query_filters, projection=results_field_list,
  128. skip=offset, limit=limit, sort=query_result_ordering)
  129. else:
  130. pipeline = list()
  131. unwinding_list = list()
  132. grouping_dict = OrderedDict()
  133. sorting_list = list()
  134. for group_param in group:
  135. field_name = group_param[0]
  136. field_sort_option = group_param[1]
  137. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  138. unwinding_list.append({'$unwind': '$%s' % field_name})
  139. grouping_dict[field_name] = '$%s' % field_name
  140. sorting_list.append((field_name, sort_option))
  141. sorting_list.extends(query_result_ordering)
  142. pipeline.append({'$match': query_filters})
  143. if results_field_list is not None:
  144. pipeline.append({
  145. '$project': SON([{field_name: 1}
  146. for field_name in field_list])})
  147. pipeline.extend(unwinding_list)
  148. pipeline.append({'$group': grouping_dict})
  149. pipeline.extend({'$sort': SON(sorting_list)})
  150. if offset > 0:
  151. pipeline.append({'$skip': offset})
  152. if limit is not None:
  153. pipeline.append({'$limit': limit})
  154. results = list()
  155. for document in cursor:
  156. results.append(document)
  157. return results
  158. ##@brief Deletes records according to given filters
  159. #@param target Emclass : class of the record to delete
  160. #@param filters list : List of filters
  161. #@param relational_filters list : List of relational filters
  162. #@return int : number of deleted records
  163. def delete(self, target, filters, relational_filters):
  164. if target.is_asbtract():
  165. #Deletion with abstract LeObject as target (reccursiv calls)
  166. return self.__act_on_abstract(target, filters,
  167. relational_filters, self.delete)
  168. #Non abstract beahavior
  169. mongo_filters = self.__process_filters(
  170. target, filters, relational_filters)
  171. res = self.__collection(target).delete_many(mongo_filters)
  172. return res.deleted_count
  173. ## @brief updates records according to given filters
  174. #@param target Emclass : class of the object to insert
  175. #@param filters list : List of filters
  176. #@param rel_filters list : List of relational filters
  177. #@param upd_datas dict : datas to update (new values)
  178. #@return int : Number of updated records
  179. def update(self, target, filters, relational_filters, upd_datas):
  180. if target.is_asbtract():
  181. #Update using abstract LeObject as target (reccursiv calls)
  182. return self.__act_on_abstract(target, filters,
  183. relational_filters, self.update, upd_datas = upd_datas)
  184. #Non abstract beahavior
  185. mongo_filters = self.__process_filters(
  186. target, filters, relational_filters)
  187. res = self.__collection(target).update_many(mongo_filters, upd_datas)
  188. return res.modified_count()
  189. ## @brief Inserts a record in a given collection
  190. # @param target Emclass : class of the object to insert
  191. # @param new_datas dict : datas to insert
  192. # @return the inserted uid
  193. def insert(self, target, new_datas):
  194. res = self.__collection(target).insert(new_datas)
  195. return str(res)
  196. ## @brief Inserts a list of records in a given collection
  197. # @param target Emclass : class of the objects inserted
  198. # @param datas_list list : list of dict
  199. # @return list : list of the inserted records' ids
  200. def insert_multi(self, target, datas_list):
  201. res = self.__collection(target).insert_many(datas_list)
  202. return list(res.inserted_ids)
  203. ##@brief Act on abstract LeObject child
  204. #
  205. #This method is designed to be called by insert, select and delete method
  206. #when they encounter an abtract class
  207. #@param target LeObject child class
  208. #@param filters
  209. #@param relational_filters
  210. #@param act function : the caller method
  211. #@param **kwargs other arguments
  212. #@return sum of results (if it's an array it will result in a concat)
  213. def __act_on_abstract(self,
  214. target, filters, relational_filters, act, **kwargs):
  215. result = list() if act == self.select else 0
  216. if not target.is_abstract():
  217. target_childs = target
  218. else:
  219. target_childs = [tc for tc in target.child_classes()
  220. if not tc.is_abstract()]
  221. for target_child in target_childs:
  222. #Add target_child to filter
  223. new_filters = copy.copy(filters)
  224. for i in range(len(filters)):
  225. fname, op, val = filters[i]
  226. if fname == CLASS_ID_FIELDNAME:
  227. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  228. fname, op, val))
  229. del(new_filters[i])
  230. new_filters.append(
  231. (CLASS_ID_FIELDNAME, '=', target_child.__name__))
  232. result += act(
  233. target = target_child,
  234. filters = new_filters,
  235. relational_filters = relational_filters,
  236. **kwargs)
  237. return result
  238. ##@brief Connect to database
  239. #@not this method avoid opening two times the same connection using
  240. #MongoDbDatasource::_connections static attribute
  241. #@param host str : hostname or IP
  242. #@param port int : mongodb listening port
  243. #@param db_name str
  244. #@param username str
  245. #@param password str
  246. #@param ro bool : If True the Datasource is for read only, else the
  247. def __connect(self, username, password, ro):
  248. conn_string = connection_string(
  249. username = username, password = password,
  250. host = self.__db_infos['host'],
  251. port = self.__db_infos['port'])
  252. conn_string += "__ReadOnly__:"+str(self.__read_only)
  253. self.__conn_hash = conn_h = hash(conn_string)
  254. if conn_h in self._connections:
  255. self._connections[conn_h]['conn_count'] += 1
  256. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  257. else:
  258. logger.info("Opening a new connection to database")
  259. self._connections[conn_h] = {
  260. 'conn_count': 1,
  261. 'db': utils.connection(
  262. host = self.__db_infos['host'],
  263. port = self.__db_infos['port'],
  264. username = username,
  265. password = password)}
  266. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  267. ##@brief Return a pymongo collection given a LeObject child class
  268. #@param leobject LeObject child class (no instance)
  269. #return a pymongo.collection instance
  270. def __collection(self, leobject):
  271. return self.database[object_collection_name(leobject)]
  272. ##@brief Perform subqueries implies by relational filters and append the
  273. # result to existing filters
  274. #
  275. #The processing is divided in multiple steps :
  276. # - determine (for each relational field of the target) every collection
  277. #that are involved
  278. # - generate subqueries for relational_filters that concerns a different
  279. #collection than target collection
  280. #filters
  281. # - execute subqueries
  282. # - transform subqueries results in filters
  283. # - merge subqueries generated filters with existing filters
  284. #
  285. #@param target LeObject subclass (no instance) : Target class
  286. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  287. #@param relational_filters : same composition thant filters except that
  288. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  289. # CLASS2:RFIELD2})
  290. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  291. def __process_filters(self,target, filters, relational_filters):
  292. # Simple filters lodel2 -> pymongo converting
  293. res = self.__filters2mongo(filters)
  294. rfilters = self.__prepare_relational_filters(target, relational_filters)
  295. #Now that everything is well organized, begin to forge subquerie
  296. #filters
  297. self.__subqueries_from_relational_filters(target, rfilters)
  298. # Executing subqueries, creating filters from result, and injecting
  299. # them in original filters of the query
  300. if len(rfilters) > 0:
  301. logger.debug("Begining subquery execution")
  302. for fname in rfilters:
  303. if fname not in res:
  304. res[fname] = dict()
  305. subq_results = set()
  306. for leobject, sq_filters in rfilters[fname].items():
  307. uid_fname = mongo_fieldname(leobject._uid)
  308. log_msg = "Subquery running on collection {coll} with filters \
  309. '{filters}'"
  310. logger.debug(log_msg.format(
  311. coll=object_collection_name(leobject),
  312. filters=sq_filters))
  313. cursor = self.__collection(leobject).find(
  314. filter=sq_filters,
  315. projection=uid_fname)
  316. subq_results |= set(doc[uid_fname] for doc in cursor)
  317. #generating new filter from result
  318. if '$in' in res[fname]:
  319. #WARNING we allready have a IN on this field, doing dedup
  320. #from result
  321. deduped = set(res[fname]['$in']) & subq
  322. if len(deduped) == 0:
  323. del(res[fname]['$in'])
  324. else:
  325. res[fname]['$in'] = list(deduped)
  326. else:
  327. res[fname]['$in'] = list(subq_results)
  328. if len(rfilters) > 0:
  329. logger.debug("End of subquery execution")
  330. return res
  331. ##@brief Generate subqueries from rfilters tree
  332. #
  333. #Returned struct organization :
  334. # - 1st level keys : relational field name of target
  335. # - 2nd level keys : referenced leobject
  336. # - 3th level values : pymongo filters (dict)
  337. #
  338. #@note The only caller of this method is __process_filters
  339. #@warning No return value, the rfilters arguement is modified by
  340. #reference
  341. #
  342. #@param target LeObject subclass (no instance) : Target class
  343. #@param rfilters dict : A struct as returned by
  344. #MongoDbDatasource.__prepare_relational_filters()
  345. #@return None, the rfilters argument is modified by reference
  346. @classmethod
  347. def __subqueries_from_relational_filters(cls, target, rfilters):
  348. for fname in rfilters:
  349. for leobject in rfilters[fname]:
  350. for rfield in rfilters[fname][leobject]:
  351. #This way of doing is not optimized but allows to trigger
  352. #warnings in some case (2 different values for a same op
  353. #on a same field on a same collection)
  354. mongofilters = cls.__op_value_listconv(
  355. rfilters[fname][leobject][rfield])
  356. rfilters[fname][leobject][rfield] = mongofilters
  357. ##@brief Generate a tree from relational_filters
  358. #
  359. #The generated struct is a dict with :
  360. # - 1st level keys : relational field name of target
  361. # - 2nd level keys : referenced leobject
  362. # - 3th level keys : referenced field in referenced class
  363. # - 4th level values : list of tuple(op, value)
  364. #
  365. #@note The only caller of this method is __process_filters
  366. #@warning An assertion is done : if two leobject are stored in the same
  367. #collection they share the same uid
  368. #
  369. #@param target LeObject subclass (no instance) : Target class
  370. #@param relational_filters : same composition thant filters except that
  371. #@return a struct as described above
  372. @classmethod
  373. def __prepare_relational_filters(cls, target, relational_filters):
  374. # We are going to regroup relationnal filters by reference field
  375. # then by collection
  376. rfilters = dict()
  377. for (fname, rfields), op, value in relational_filters:
  378. if fname not in rfilters:
  379. rfilters[fname] = dict()
  380. rfilters[fname] = dict()
  381. # Stores the representative leobject for associated to a collection
  382. # name
  383. leo_collname = dict()
  384. # WARNING ! Here we assert that all leobject that are stored
  385. # in a same collection are identified by the same field
  386. for leobject, rfield in rfields.items():
  387. #here we are filling a dict with leobject as index but
  388. #we are doing a UNIQ on collection name
  389. cur_collname = object_collection_name(leobject)
  390. if cur_collname not in collnames:
  391. leo_collname[cur_collame] = leobject
  392. rfilters[fname][leobject] = dict()
  393. #Fecthing the collection's representative leobject
  394. repr_leo = leo_collname[cur_collname]
  395. if rfield not in rfilters[fname][repr_leo]:
  396. rfilters[fname][repr_leo][rfield] = list()
  397. rfilters[fname][repr_leo][rfield].append((op, value))
  398. return rfilters
  399. ##@brief Convert lodel2 filters to pymongo conditions
  400. #@param filters list : list of lodel filters
  401. #@return dict representing pymongo conditions
  402. @classmethod
  403. def __filters2mongo(cls, filters):
  404. res = dict()
  405. for fieldname, op, value in filters:
  406. oop = op
  407. ovalue = value
  408. op, value = cls.__op_value_conv(op, value)
  409. if fieldname not in res:
  410. res[fieldname] = dict()
  411. if op in res[fieldname]:
  412. logger.warning("Dropping condition : '%s %s %s'" % (
  413. fieldname, op, value))
  414. else:
  415. res[fieldname][op] = value
  416. return res
  417. ##@brief Convert lodel2 operator and value to pymongo struct
  418. #
  419. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  420. #@param op str : take value in LeFilteredQuery::_query_operators
  421. #@param value mixed : the value
  422. #@return a tuple(mongo_op, mongo_value)
  423. @classmethod
  424. def __op_value_conv(cls, op, value):
  425. if op not in cls.lodel2mongo_op_map:
  426. msg = "Invalid operator '%s' found" % op
  427. raise MongoDbDataSourceError(msg)
  428. mongop = cls.lodel2mongo_op_map[op]
  429. mongoval = value
  430. #Converting lodel2 wildcarded string into a case insensitive
  431. #mongodb re
  432. if mongop in cls.mongo_op_re:
  433. #unescaping \
  434. mongoval = value.replace('\\\\','\\')
  435. if not mongoval.startswith('*'):
  436. mongoval = '^'+mongoval
  437. #For the end of the string it's harder to detect escaped *
  438. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  439. mongoval += '$'
  440. #Replacing every other unescaped wildcard char
  441. mongoval = cls.wildcard_re.sub('.*', mongoval)
  442. mongoval = {'$regex': mongoval, '$options': 'i'}
  443. return (op, mongoval)
  444. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  445. #@return a dict with mongo op as key and value as value...
  446. @classmethod
  447. def __op_value_listconv(cls, op_value_list):
  448. result = dict()
  449. for op, value in op_value_list:
  450. mongop, mongoval = cls.__op_value_conv(op, value)
  451. if mongop in result:
  452. warnings.warn("Duplicated value given for a single \
  453. field/operator couple in a query. We will keep only the first one")
  454. else:
  455. result[mongop] = mongoval
  456. return result
  457. ##@brief Generate a comparison function for post reccursion sorting in
  458. #select
  459. #@return a lambda function that take 2 dict as arguement
  460. @classmethod
  461. def __generate_lambda_cmp_order(cls, order):
  462. if len(order) == 0:
  463. return lambda a,b: 0
  464. glco = cls.__generate_lambda_cmp_order
  465. fname, cmpdir = order[0]
  466. order = order[1:]
  467. return lambda a,b: glco(order) if a[fname] == b[fname] else (\
  468. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  469. else -1)