No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import operator
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel import logger
  11. from lodel.leapi.leobject import CLASS_ID_FIELDNAME
  12. from . import utils
  13. from .utils import object_collection_name,\
  14. MONGODB_SORT_OPERATORS_MAP, connection_string
  15. class MongoDbDataSourceError(Exception):
  16. pass
  17. class MongoDbDatasource(object):
  18. ##@brief Stores existing connections
  19. #
  20. #The key of this dict is a hash of the connection string + ro parameter.
  21. #The value is a dict with 2 keys :
  22. # - conn_count : the number of instanciated datasource that use this
  23. #connection
  24. # - db : the pymongo database object instance
  25. _connections = dict()
  26. ##@brief Mapping from lodel2 operators to mongodb operator
  27. lodel2mongo_op_map = {
  28. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  29. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  30. ##@brief List of mongodb operators that expect re as value
  31. mongo_op_re = ['$in', '$nin']
  32. wildcard_re = re.compile('[^\\\\]\*')
  33. ##@brief instanciates a database object given a connection name
  34. #@param host str : hostname or IP
  35. #@param port int : mongodb listening port
  36. #@param db_name str
  37. #@param username str
  38. #@param password str
  39. #@param ro bool : If True the Datasource is for read only, else the
  40. #Datasource is write only !
  41. def __init__(self, host, port, db_name, username, password, read_only = False):
  42. ##@brief Connections infos that can be kept securly
  43. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  44. ##@brief Is the instance read only ? (if not it's write only)
  45. self.__read_only = bool(read_only)
  46. ##@brief Uniq ID for mongodb connection
  47. self.__conn_hash= None
  48. ##@brief Stores the database cursor
  49. self.database = self.__connect(
  50. username, password, ro = self.__read_only)
  51. ##@brief Destructor that attempt to close connection to DB
  52. #
  53. #Decrease the conn_count of associated MongoDbDatasource::_connections
  54. #item. If it reach 0 close the connection to the db
  55. #@see MongoDbDatasource::__connect()
  56. def __del__(self):
  57. self._connections[self.__conn_hash]['conn_count'] -= 1
  58. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  59. self._connections[self.__conn_hash]['db'].close()
  60. del(self._connections[self.__conn_hash])
  61. logger.info("Closing connection to database")
  62. ##@brief Provide a new uniq numeric ID
  63. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  64. #have to be uniq
  65. #@warning multiple UID broken by this method
  66. #@return an integer
  67. def new_numeric_id(self, emcomp):
  68. target = emcomp.uid_source()
  69. tuid = target._uid[0] # Multiple UID broken here
  70. results = self.select(
  71. target, field_list = [tuid], filters = [],
  72. order=[(tuid, 'DESC')], limit = 1)
  73. if len(results) == 0:
  74. return 1
  75. return results[0][tuid]+1
  76. ##@brief returns a selection of documents from the datasource
  77. #@param target Emclass
  78. #@param field_list list
  79. #@param filters list : List of filters
  80. #@param rel_filters list : List of relational filters
  81. #@param order list : List of column to order. ex: order = [('title', 'ASC'),]
  82. #@param group list : List of tupple representing the column used as
  83. #"group by" fields. ex: group = [('title', 'ASC'),]
  84. #@param limit int : Number of records to be returned
  85. #@param offset int: used with limit to choose the start record
  86. #@return list
  87. #@todo Implement group for abstract LeObject childs
  88. def select(self, target, field_list, filters = None,
  89. relational_filters=None, order=None, group=None, limit=None, offset=0):
  90. if target.is_abstract():
  91. #Reccursiv calls for abstract LeObject child
  92. results = self.__act_on_abstract(target, filters,
  93. relational_filters, self.select, field_list = field_list,
  94. order = order, group = group, limit = limit)
  95. sort_itemgetter_args = list()
  96. sort_dir = None
  97. for fname, csort_dir in order:
  98. sort_itemgetter_args.append(fname)
  99. if sort_dir is None:
  100. sort_dir = csort_dir
  101. elif sort_dir != csort_dir:
  102. raise NotImplementedError("Multiple direction for ordering\
  103. is not implemented yet")
  104. results = sorted(
  105. results, key=operator.itemgetter(*sort_itemgetter_args),
  106. reverse=False if sort_dir == 'ASC' else True)
  107. if limit is not None:
  108. results = results[offset:offset+limit]
  109. return results
  110. # Default behavior
  111. filters = [] if filters is None else filters
  112. relational_filters = [] if relational_filters is None else relational_filters
  113. collection_name = object_collection_name(target)
  114. collection = self.database[collection_name]
  115. query_filters = self.__process_filters(target, filters, relational_filters)
  116. query_result_ordering = None
  117. if order is not None:
  118. query_result_ordering = utils.parse_query_order(order)
  119. results_field_list = None if len(field_list) == 0 else field_list
  120. limit = limit if limit is not None else 0
  121. if group is None:
  122. cursor = collection.find(
  123. filter=query_filters, projection=results_field_list,
  124. skip=offset, limit=limit, sort=query_result_ordering)
  125. else:
  126. pipeline = list()
  127. unwinding_list = list()
  128. grouping_dict = OrderedDict()
  129. sorting_list = list()
  130. for group_param in group:
  131. field_name = group_param[0]
  132. field_sort_option = group_param[1]
  133. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  134. unwinding_list.append({'$unwind': '$%s' % field_name})
  135. grouping_dict[field_name] = '$%s' % field_name
  136. sorting_list.append((field_name, sort_option))
  137. sorting_list.extends(query_result_ordering)
  138. pipeline.append({'$match': query_filters})
  139. if results_field_list is not None:
  140. pipeline.append({
  141. '$project': SON([{field_name: 1}
  142. for field_name in field_list])})
  143. pipeline.extend(unwinding_list)
  144. pipeline.append({'$group': grouping_dict})
  145. pipeline.extend({'$sort': SON(sorting_list)})
  146. if offset > 0:
  147. pipeline.append({'$skip': offset})
  148. if limit is not None:
  149. pipeline.append({'$limit': limit})
  150. results = list()
  151. for document in cursor:
  152. results.append(document)
  153. return results
  154. ##@brief Deletes records according to given filters
  155. #@param target Emclass : class of the record to delete
  156. #@param filters list : List of filters
  157. #@param relational_filters list : List of relational filters
  158. #@return int : number of deleted records
  159. def delete(self, target, filters, relational_filters):
  160. mongo_filters = self.__process_filters(
  161. target, filters, relational_filters)
  162. res = self.__collection(target).delete_many(mongo_filters)
  163. return res.deleted_count
  164. ## @brief updates records according to given filters
  165. #@param target Emclass : class of the object to insert
  166. #@param filters list : List of filters
  167. #@param rel_filters list : List of relational filters
  168. #@param upd_datas dict : datas to update (new values)
  169. #@return int : Number of updated records
  170. def update(self, target, filters, relational_filters, upd_datas):
  171. mongo_filters = self.__process_filters(
  172. target, filters, relational_filters)
  173. res = self.__collection(target).update_many(mongo_filters, upd_datas)
  174. return res.modified_count()
  175. ## @brief Inserts a record in a given collection
  176. # @param target Emclass : class of the object to insert
  177. # @param new_datas dict : datas to insert
  178. # @return the inserted uid
  179. def insert(self, target, new_datas):
  180. res = self.__collection(target).insert(new_datas)
  181. return str(res)
  182. ## @brief Inserts a list of records in a given collection
  183. # @param target Emclass : class of the objects inserted
  184. # @param datas_list list : list of dict
  185. # @return list : list of the inserted records' ids
  186. def insert_multi(self, target, datas_list):
  187. res = self.__collection(target).insert_many(datas_list)
  188. return list(res.inserted_ids)
  189. ##@brief Act on abstract LeObject child
  190. #
  191. #This method is designed to be called by insert, select and delete method
  192. #when they encounter an abtract class
  193. #@param target LeObject child class
  194. #@param filters
  195. #@param relational_filters
  196. #@param act function : the caller method
  197. #@param **kwargs other arguments
  198. #@return sum of results (if it's an array it will result in a concat)
  199. def __act_on_abstract(self, target, filters, relational_filters, act, **kwargs):
  200. result = list() if act == self.select else 0
  201. if not target.is_abstract():
  202. target_childs = target
  203. else:
  204. target_childs = [tc for tc in target.child_classes()
  205. if not tc.is_abstract()]
  206. for target_child in target_childs:
  207. #Add target_child to filter
  208. new_filters = copy.copy(filters)
  209. for i in range(len(filters)):
  210. fname, op, val = filters[i]
  211. if fname == CLASS_ID_FIELDNAME:
  212. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  213. fname, op, val))
  214. del(new_filters[i])
  215. new_filters.append(
  216. (CLASS_ID_FIELDNAME, '=', target_child.__name__))
  217. result += act(
  218. target = target_child,
  219. filters = new_filters,
  220. relational_filters = relational_filters,
  221. **kwargs)
  222. return result
  223. ##@brief Connect to database
  224. #@not this method avoid opening two times the same connection using
  225. #MongoDbDatasource::_connections static attribute
  226. #@param host str : hostname or IP
  227. #@param port int : mongodb listening port
  228. #@param db_name str
  229. #@param username str
  230. #@param password str
  231. #@param ro bool : If True the Datasource is for read only, else the
  232. def __connect(self, username, password, ro):
  233. conn_string = connection_string(
  234. username = username, password = password,
  235. host = self.__db_infos['host'],
  236. port = self.__db_infos['port'])
  237. conn_string += "__ReadOnly__:"+str(self.__read_only)
  238. self.__conn_hash = conn_h = hash(conn_string)
  239. if conn_h in self._connections:
  240. self._connections[conn_h]['conn_count'] += 1
  241. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  242. else:
  243. logger.info("Opening a new connection to database")
  244. self._connections[conn_h] = {
  245. 'conn_count': 1,
  246. 'db': utils.connection(
  247. host = self.__db_infos['host'],
  248. port = self.__db_infos['port'],
  249. username = username,
  250. password = password)}
  251. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  252. ##@brief Return a pymongo collection given a LeObject child class
  253. #@param leobject LeObject child class (no instance)
  254. #return a pymongo.collection instance
  255. def __collection(self, leobject):
  256. return self.database[object_collection_name(leobject)]
  257. ##@brief Perform subqueries implies by relational filters and append the
  258. # result to existing filters
  259. #
  260. #The processing is divided in multiple steps :
  261. # - determine (for each relational field of the target) every collection
  262. #that are involved
  263. # - generate subqueries for relational_filters that concerns a different
  264. #collection than target collection
  265. #filters
  266. # - execute subqueries
  267. # - transform subqueries results in filters
  268. # - merge subqueries generated filters with existing filters
  269. #
  270. #@param target LeObject subclass (no instance) : Target class
  271. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  272. #@param relational_filters : same composition thant filters except that
  273. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  274. # CLASS2:RFIELD2})
  275. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  276. def __process_filters(self,target, filters, relational_filters):
  277. # Simple filters lodel2 -> pymongo converting
  278. res = self.__filters2mongo(filters)
  279. rfilters = self.__prepare_relational_filters(target, relational_filters)
  280. #Now that everything is well organized, begin to forge subquerie
  281. #filters
  282. self.__subqueries_from_relational_filters(target, rfilters)
  283. # Executing subqueries, creating filters from result, and injecting
  284. # them in original filters of the query
  285. if len(rfilters) > 0:
  286. logger.debug("Begining subquery execution")
  287. for fname in rfilters:
  288. if fname not in res:
  289. res[fname] = dict()
  290. subq_results = set()
  291. for leobject, sq_filters in rfilters[fname].items():
  292. uid_fname = mongo_fieldname(leobject._uid)
  293. log_msg = "Subquery running on collection {coll} with filters \
  294. '{filters}'"
  295. logger.debug(log_msg.format(
  296. coll=object_collection_name(leobject),
  297. filters=sq_filters))
  298. cursor = self.__collection(leobject).find(
  299. filter=sq_filters,
  300. projection=uid_fname)
  301. subq_results |= set(doc[uid_fname] for doc in cursor)
  302. #generating new filter from result
  303. if '$in' in res[fname]:
  304. #WARNING we allready have a IN on this field, doing dedup
  305. #from result
  306. deduped = set(res[fname]['$in']) & subq
  307. if len(deduped) == 0:
  308. del(res[fname]['$in'])
  309. else:
  310. res[fname]['$in'] = list(deduped)
  311. else:
  312. res[fname]['$in'] = list(subq_results)
  313. if len(rfilters) > 0:
  314. logger.debug("End of subquery execution")
  315. return res
  316. ##@brief Generate subqueries from rfilters tree
  317. #
  318. #Returned struct organization :
  319. # - 1st level keys : relational field name of target
  320. # - 2nd level keys : referenced leobject
  321. # - 3th level values : pymongo filters (dict)
  322. #
  323. #@note The only caller of this method is __process_filters
  324. #@warning No return value, the rfilters arguement is modified by
  325. #reference
  326. #
  327. #@param target LeObject subclass (no instance) : Target class
  328. #@param rfilters dict : A struct as returned by
  329. #MongoDbDatasource.__prepare_relational_filters()
  330. #@return None, the rfilters argument is modified by reference
  331. @classmethod
  332. def __subqueries_from_relational_filters(cls, target, rfilters):
  333. for fname in rfilters:
  334. for leobject in rfilters[fname]:
  335. for rfield in rfilters[fname][leobject]:
  336. #This way of doing is not optimized but allows to trigger
  337. #warnings in some case (2 different values for a same op
  338. #on a same field on a same collection)
  339. mongofilters = cls.__op_value_listconv(
  340. rfilters[fname][leobject][rfield])
  341. rfilters[fname][leobject][rfield] = mongofilters
  342. ##@brief Generate a tree from relational_filters
  343. #
  344. #The generated struct is a dict with :
  345. # - 1st level keys : relational field name of target
  346. # - 2nd level keys : referenced leobject
  347. # - 3th level keys : referenced field in referenced class
  348. # - 4th level values : list of tuple(op, value)
  349. #
  350. #@note The only caller of this method is __process_filters
  351. #@warning An assertion is done : if two leobject are stored in the same
  352. #collection they share the same uid
  353. #
  354. #@param target LeObject subclass (no instance) : Target class
  355. #@param relational_filters : same composition thant filters except that
  356. #@return a struct as described above
  357. @classmethod
  358. def __prepare_relational_filters(cls, target, relational_filters):
  359. # We are going to regroup relationnal filters by reference field
  360. # then by collection
  361. rfilters = dict()
  362. for (fname, rfields), op, value in relational_filters:
  363. if fname not in rfilters:
  364. rfilters[fname] = dict()
  365. rfilters[fname] = dict()
  366. # Stores the representative leobject for associated to a collection
  367. # name
  368. leo_collname = dict()
  369. # WARNING ! Here we assert that all leobject that are stored
  370. # in a same collection are identified by the same field
  371. for leobject, rfield in rfields.items():
  372. #here we are filling a dict with leobject as index but
  373. #we are doing a UNIQ on collection name
  374. cur_collname = object_collection_name(leobject)
  375. if cur_collname not in collnames:
  376. leo_collname[cur_collame] = leobject
  377. rfilters[fname][leobject] = dict()
  378. #Fecthing the collection's representative leobject
  379. repr_leo = leo_collname[cur_collname]
  380. if rfield not in rfilters[fname][repr_leo]:
  381. rfilters[fname][repr_leo][rfield] = list()
  382. rfilters[fname][repr_leo][rfield].append((op, value))
  383. return rfilters
  384. ##@brief Convert lodel2 filters to pymongo conditions
  385. #@param filters list : list of lodel filters
  386. #@return dict representing pymongo conditions
  387. @classmethod
  388. def __filters2mongo(cls, filters):
  389. res = dict()
  390. for fieldname, op, value in filters:
  391. oop = op
  392. ovalue = value
  393. op, value = cls.__op_value_conv(op, value)
  394. if fieldname not in res:
  395. res[fieldname] = dict()
  396. if op in res[fieldname]:
  397. logger.warning("Dropping condition : '%s %s %s'" % (
  398. fieldname, op, value))
  399. else:
  400. res[fieldname][op] = value
  401. return res
  402. ##@brief Convert lodel2 operator and value to pymongo struct
  403. #
  404. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  405. #@param op str : take value in LeFilteredQuery::_query_operators
  406. #@param value mixed : the value
  407. #@return a tuple(mongo_op, mongo_value)
  408. @classmethod
  409. def __op_value_conv(cls, op, value):
  410. if op not in cls.lodel2mongo_op_map:
  411. msg = "Invalid operator '%s' found" % op
  412. raise MongoDbDataSourceError(msg)
  413. mongop = cls.lodel2mongo_op_map[op]
  414. mongoval = value
  415. #Converting lodel2 wildcarded string into a case insensitive
  416. #mongodb re
  417. if mongop in cls.mongo_op_re:
  418. #unescaping \
  419. mongoval = value.replace('\\\\','\\')
  420. if not mongoval.startswith('*'):
  421. mongoval = '^'+mongoval
  422. #For the end of the string it's harder to detect escaped *
  423. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  424. mongoval += '$'
  425. #Replacing every other unescaped wildcard char
  426. mongoval = cls.wildcard_re.sub('.*', mongoval)
  427. mongoval = {'$regex': mongoval, '$options': 'i'}
  428. return (op, mongoval)
  429. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  430. #@return a dict with mongo op as key and value as value...
  431. @classmethod
  432. def __op_value_listconv(cls, op_value_list):
  433. result = dict()
  434. for op, value in op_value_list:
  435. mongop, mongoval = cls.__op_value_conv(op, value)
  436. if mongop in result:
  437. warnings.warn("Duplicated value given for a single \
  438. field/operator couple in a query. We will keep only the first one")
  439. else:
  440. result[mongop] = mongoval
  441. return result