Bez popisu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 22KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import copy
  5. import functools
  6. from bson.son import SON
  7. from collections import OrderedDict
  8. import pymongo
  9. from pymongo.errors import BulkWriteError
  10. from lodel import logger
  11. from lodel.leapi.leobject import CLASS_ID_FIELDNAME
  12. from . import utils
  13. from .utils import object_collection_name, collection_name, \
  14. MONGODB_SORT_OPERATORS_MAP, connection_string, mongo_fieldname
  15. class MongoDbDataSourceError(Exception):
  16. pass
  17. class MongoDbDatasource(object):
  18. ##@brief Stores existing connections
  19. #
  20. #The key of this dict is a hash of the connection string + ro parameter.
  21. #The value is a dict with 2 keys :
  22. # - conn_count : the number of instanciated datasource that use this
  23. #connection
  24. # - db : the pymongo database object instance
  25. _connections = dict()
  26. ##@brief Mapping from lodel2 operators to mongodb operator
  27. lodel2mongo_op_map = {
  28. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  29. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  30. ##@brief List of mongodb operators that expect re as value
  31. mongo_op_re = ['$in', '$nin']
  32. wildcard_re = re.compile('[^\\\\]\*')
  33. ##@brief instanciates a database object given a connection name
  34. #@param host str : hostname or IP
  35. #@param port int : mongodb listening port
  36. #@param db_name str
  37. #@param username str
  38. #@param password str
  39. #@param read_only bool : If True the Datasource is for read only, else the
  40. #Datasource is write only !
  41. def __init__(self, host, port, db_name, username, password, read_only = False):
  42. ##@brief Connections infos that can be kept securly
  43. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  44. ##@brief Is the instance read only ? (if not it's write only)
  45. self.__read_only = bool(read_only)
  46. ##@brief Uniq ID for mongodb connection
  47. self.__conn_hash= None
  48. ##@brief Stores the database cursor
  49. self.database = self.__connect(
  50. username, password, ro = self.__read_only)
  51. ##@brief Destructor that attempt to close connection to DB
  52. #
  53. #Decrease the conn_count of associated MongoDbDatasource::_connections
  54. #item. If it reach 0 close the connection to the db
  55. #@see MongoDbDatasource::__connect()
  56. def __del__(self):
  57. self._connections[self.__conn_hash]['conn_count'] -= 1
  58. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  59. self._connections[self.__conn_hash]['db'].close()
  60. del(self._connections[self.__conn_hash])
  61. logger.info("Closing connection to database")
  62. ##@brief Provide a new uniq numeric ID
  63. #@param emcomp LeObject subclass (not instance) : To know on wich things we
  64. #have to be uniq
  65. #@warning multiple UID broken by this method
  66. #@return an integer
  67. def new_numeric_id(self, emcomp):
  68. target = emcomp #.uid_source()
  69. tuid = target._uid[0] # Multiple UID broken here
  70. results = self.select(
  71. target, field_list = [tuid], filters = [],
  72. order=[(tuid, 'DESC')], limit = 1)
  73. if len(results) == 0:
  74. return 1
  75. return results[0][tuid]+1
  76. ##@brief returns a selection of documents from the datasource
  77. #@param target Emclass
  78. #@param field_list list
  79. #@param filters list : List of filters
  80. #@param relational_filters list : List of relational filters
  81. #@param order list : List of column to order. ex: order =
  82. #[('title', 'ASC'),]
  83. #@param group list : List of tupple representing the column used as
  84. #"group by" fields. ex: group = [('title', 'ASC'),]
  85. #@param limit int : Number of records to be returned
  86. #@param offset int: used with limit to choose the start record
  87. #@return list
  88. #@todo Implement group for abstract LeObject childs
  89. def select(self, target, field_list, filters = None,
  90. relational_filters=None, order=None, group=None, limit=None, offset=0):
  91. if target.is_abstract():
  92. #Reccursiv calls for abstract LeObject child
  93. results = self.__act_on_abstract(target, filters,
  94. relational_filters, self.select, field_list = field_list,
  95. order = order, group = group, limit = limit)
  96. #Here we may implement the group
  97. #If sorted query we have to sort again
  98. if order is not None:
  99. results = sorted(results,
  100. key=functools.cmp_to_key(
  101. self.__generate_lambda_cmp_order(order)))
  102. #If limit given apply limit again
  103. if offset > len(results):
  104. results = list()
  105. else:
  106. if limit is not None:
  107. if limit + offset >= len(results):
  108. limit = len(results)-offset-1
  109. results = results[offset:offset+limit]
  110. return results
  111. # Default behavior
  112. if filters is None:
  113. filters = list()
  114. if relational_filters is None:
  115. relational_filters = list()
  116. collection_name = object_collection_name(target)
  117. collection = self.database[collection_name]
  118. query_filters = self.__process_filters(
  119. target, filters, relational_filters)
  120. query_result_ordering = None
  121. if order is not None:
  122. query_result_ordering = utils.parse_query_order(order)
  123. if group is None:
  124. if field_list is None:
  125. field_list = dict()
  126. else:
  127. f_list=dict()
  128. for fl in field_list:
  129. f_list[fl] = 1
  130. field_list = f_list
  131. field_list['_id'] = 0
  132. cursor = collection.find(
  133. spec = query_filters,
  134. fields=field_list,
  135. skip=offset,
  136. limit=limit if limit != None else 0,
  137. sort=query_result_ordering)
  138. else:
  139. pipeline = list()
  140. unwinding_list = list()
  141. grouping_dict = OrderedDict()
  142. sorting_list = list()
  143. for group_param in group:
  144. field_name = group_param[0]
  145. field_sort_option = group_param[1]
  146. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  147. unwinding_list.append({'$unwind': '$%s' % field_name})
  148. grouping_dict[field_name] = '$%s' % field_name
  149. sorting_list.append((field_name, sort_option))
  150. sorting_list.extends(query_result_ordering)
  151. pipeline.append({'$match': query_filters})
  152. if field_list is not None:
  153. pipeline.append({
  154. '$project': SON([{field_name: 1}
  155. for field_name in field_list])})
  156. pipeline.extend(unwinding_list)
  157. pipeline.append({'$group': grouping_dict})
  158. pipeline.extend({'$sort': SON(sorting_list)})
  159. if offset > 0:
  160. pipeline.append({'$skip': offset})
  161. if limit is not None:
  162. pipeline.append({'$limit': limit})
  163. results = list()
  164. for document in cursor:
  165. results.append(document)
  166. return results
  167. ##@brief Deletes records according to given filters
  168. #@param target Emclass : class of the record to delete
  169. #@param filters list : List of filters
  170. #@param relational_filters list : List of relational filters
  171. #@return int : number of deleted records
  172. def delete(self, target, filters, relational_filters):
  173. if target.is_abstract():
  174. #Deletion with abstract LeObject as target (reccursiv calls)
  175. return self.__act_on_abstract(target, filters,
  176. relational_filters, self.delete)
  177. #Non abstract beahavior
  178. mongo_filters = self.__process_filters(
  179. target, filters, relational_filters)
  180. res = self.__collection(target).delete_many(mongo_filters)
  181. return res.deleted_count
  182. ## @brief updates records according to given filters
  183. #@param target Emclass : class of the object to insert
  184. #@param filters list : List of filters
  185. #@param relational_filters list : List of relational filters
  186. #@param upd_datas dict : datas to update (new values)
  187. #@return int : Number of updated records
  188. def update(self, target, filters, relational_filters, upd_datas):
  189. if target.is_abstract():
  190. #Update using abstract LeObject as target (reccursiv calls)
  191. return self.__act_on_abstract(target, filters,
  192. relational_filters, self.update, upd_datas = upd_datas)
  193. #Non abstract beahavior
  194. mongo_filters = self.__process_filters(
  195. target, filters, relational_filters)
  196. res = self.__collection(target).update(mongo_filters, upd_datas)
  197. return res['n']
  198. ## @brief Inserts a record in a given collection
  199. # @param target Emclass : class of the object to insert
  200. # @param new_datas dict : datas to insert
  201. # @return the inserted uid
  202. def insert(self, target, new_datas):
  203. res = self.__collection(target).insert(new_datas)
  204. return str(res)
  205. ## @brief Inserts a list of records in a given collection
  206. # @param target Emclass : class of the objects inserted
  207. # @param datas_list list : list of dict
  208. # @return list : list of the inserted records' ids
  209. def insert_multi(self, target, datas_list):
  210. res = self.__collection(target).insert_many(datas_list)
  211. return list(res.inserted_ids)
  212. ##@brief Act on abstract LeObject child
  213. #
  214. #This method is designed to be called by insert, select and delete method
  215. #when they encounter an abtract class
  216. #@param target LeObject child class
  217. #@param filters
  218. #@param relational_filters
  219. #@param act function : the caller method
  220. #@param **kwargs other arguments
  221. #@return sum of results (if it's an array it will result in a concat)
  222. def __act_on_abstract(self,
  223. target, filters, relational_filters, act, **kwargs):
  224. result = list() if act == self.select else 0
  225. if not target.is_abstract():
  226. target_childs = target
  227. else:
  228. target_childs = [tc for tc in target.child_classes()
  229. if not tc.is_abstract()]
  230. for target_child in target_childs:
  231. #Add target_child to filter
  232. new_filters = copy.copy(filters)
  233. for i in range(len(filters)):
  234. fname, op, val = filters[i]
  235. if fname == CLASS_ID_FIELDNAME:
  236. logger.warning("Dirty drop of filter : '%s %s %s'" % (
  237. fname, op, val))
  238. del(new_filters[i])
  239. new_filters.append(
  240. (CLASS_ID_FIELDNAME, '=',
  241. collection_name(target_child.__name__)))
  242. result += act(
  243. target = target_child,
  244. filters = new_filters,
  245. relational_filters = relational_filters,
  246. **kwargs)
  247. return result
  248. ##@brief Connect to database
  249. #@note this method avoid opening two times the same connection using
  250. #MongoDbDatasource::_connections static attribute
  251. #@param username str
  252. #@param password str
  253. #@param ro bool : If True the Datasource is for read only, else the
  254. def __connect(self, username, password, ro):
  255. conn_string = connection_string(
  256. username = username, password = password,
  257. host = self.__db_infos['host'],
  258. port = self.__db_infos['port'])
  259. conn_string += "__ReadOnly__:"+str(self.__read_only)
  260. self.__conn_hash = conn_h = hash(conn_string)
  261. if conn_h in self._connections:
  262. self._connections[conn_h]['conn_count'] += 1
  263. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  264. else:
  265. logger.info("Opening a new connection to database")
  266. self._connections[conn_h] = {
  267. 'conn_count': 1,
  268. 'db': utils.connection(
  269. host = self.__db_infos['host'],
  270. port = self.__db_infos['port'],
  271. username = username,
  272. password = password)}
  273. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  274. ##@brief Return a pymongo collection given a LeObject child class
  275. #@param leobject LeObject child class (no instance)
  276. #return a pymongo.collection instance
  277. def __collection(self, leobject):
  278. return self.database[object_collection_name(leobject)]
  279. ##@brief Perform subqueries implies by relational filters and append the
  280. # result to existing filters
  281. #
  282. #The processing is divided in multiple steps :
  283. # - determine (for each relational field of the target) every collection
  284. #that are involved
  285. # - generate subqueries for relational_filters that concerns a different
  286. #collection than target collection
  287. #filters
  288. # - execute subqueries
  289. # - transform subqueries results in filters
  290. # - merge subqueries generated filters with existing filters
  291. #
  292. #@param target LeObject subclass (no instance) : Target class
  293. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  294. #@param relational_filters : same composition thant filters except that
  295. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  296. # CLASS2:RFIELD2})
  297. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  298. def __process_filters(self,target, filters, relational_filters):
  299. # Simple filters lodel2 -> pymongo converting
  300. res = self.__filters2mongo(filters)
  301. rfilters = self.__prepare_relational_filters(target, relational_filters)
  302. #Now that everything is well organized, begin to forge subquerie
  303. #filters
  304. self.__subqueries_from_relational_filters(target, rfilters)
  305. # Executing subqueries, creating filters from result, and injecting
  306. # them in original filters of the query
  307. if len(rfilters) > 0:
  308. logger.debug("Begining subquery execution")
  309. for fname in rfilters:
  310. if fname not in res:
  311. res[fname] = dict()
  312. subq_results = set()
  313. for leobject, sq_filters in rfilters[fname].items():
  314. uid_fname = mongo_fieldname(leobject._uid)
  315. log_msg = "Subquery running on collection {coll} with filters \
  316. '{filters}'"
  317. logger.debug(log_msg.format(
  318. coll=object_collection_name(leobject),
  319. filters=sq_filters))
  320. cursor = self.__collection(leobject).find(
  321. filter=sq_filters,
  322. projection=uid_fname)
  323. subq_results |= set(doc[uid_fname] for doc in cursor)
  324. #generating new filter from result
  325. if '$in' in res[fname]:
  326. #WARNING we allready have a IN on this field, doing dedup
  327. #from result
  328. deduped = set(res[fname]['$in']) & subq_results
  329. if len(deduped) == 0:
  330. del(res[fname]['$in'])
  331. else:
  332. res[fname]['$in'] = list(deduped)
  333. else:
  334. res[fname]['$in'] = list(subq_results)
  335. if len(rfilters) > 0:
  336. logger.debug("End of subquery execution")
  337. return res
  338. ##@brief Generate subqueries from rfilters tree
  339. #
  340. #Returned struct organization :
  341. # - 1st level keys : relational field name of target
  342. # - 2nd level keys : referenced leobject
  343. # - 3th level values : pymongo filters (dict)
  344. #
  345. #@note The only caller of this method is __process_filters
  346. #@warning No return value, the rfilters arguement is modified by
  347. #reference
  348. #
  349. #@param target LeObject subclass (no instance) : Target class
  350. #@param rfilters dict : A struct as returned by
  351. #MongoDbDatasource.__prepare_relational_filters()
  352. #@return None, the rfilters argument is modified by reference
  353. @classmethod
  354. def __subqueries_from_relational_filters(cls, target, rfilters):
  355. for fname in rfilters:
  356. for leobject in rfilters[fname]:
  357. for rfield in rfilters[fname][leobject]:
  358. #This way of doing is not optimized but allows to trigger
  359. #warnings in some case (2 different values for a same op
  360. #on a same field on a same collection)
  361. mongofilters = cls.__op_value_listconv(
  362. rfilters[fname][leobject][rfield])
  363. rfilters[fname][leobject][rfield] = mongofilters
  364. ##@brief Generate a tree from relational_filters
  365. #
  366. #The generated struct is a dict with :
  367. # - 1st level keys : relational field name of target
  368. # - 2nd level keys : referenced leobject
  369. # - 3th level keys : referenced field in referenced class
  370. # - 4th level values : list of tuple(op, value)
  371. #
  372. #@note The only caller of this method is __process_filters
  373. #@warning An assertion is done : if two leobject are stored in the same
  374. #collection they share the same uid
  375. #
  376. #@param target LeObject subclass (no instance) : Target class
  377. #@param relational_filters : same composition thant filters except that
  378. #@return a struct as described above
  379. @classmethod
  380. def __prepare_relational_filters(cls, target, relational_filters):
  381. # We are going to regroup relationnal filters by reference field
  382. # then by collection
  383. rfilters = dict()
  384. for (fname, rfields), op, value in relational_filters:
  385. if fname not in rfilters:
  386. rfilters[fname] = dict()
  387. rfilters[fname] = dict()
  388. # Stores the representative leobject for associated to a collection
  389. # name
  390. leo_collname = dict()
  391. # WARNING ! Here we assert that all leobject that are stored
  392. # in a same collection are identified by the same field
  393. for leobject, rfield in rfields.items():
  394. #here we are filling a dict with leobject as index but
  395. #we are doing a UNIQ on collection name
  396. cur_collname = object_collection_name(leobject)
  397. if cur_collname not in leo_collname:
  398. leo_collname[cur_collname] = leobject
  399. rfilters[fname][leobject] = dict()
  400. #Fecthing the collection's representative leobject
  401. repr_leo = leo_collname[cur_collname]
  402. if rfield not in rfilters[fname][repr_leo]:
  403. rfilters[fname][repr_leo][rfield] = list()
  404. rfilters[fname][repr_leo][rfield].append((op, value))
  405. return rfilters
  406. ##@brief Convert lodel2 filters to pymongo conditions
  407. #@param filters list : list of lodel filters
  408. #@return dict representing pymongo conditions
  409. @classmethod
  410. def __filters2mongo(cls, filters):
  411. res = dict()
  412. eq_fieldname = [] #Stores field with equal comparison OP
  413. for fieldname, op, value in filters:
  414. oop = op
  415. ovalue = value
  416. op, value = cls.__op_value_conv(op, value)
  417. if op == '=':
  418. eq_fieldname.append(fieldname)
  419. if fieldname in res:
  420. logger.warning("Dropping previous condition. Overwritten \
  421. by an equality filter")
  422. res[fieldname] = value
  423. continue
  424. if fieldname in eq_fieldname:
  425. logger.warning("Dropping condition : '%s %s %s'" % (
  426. fieldname, op, value))
  427. continue
  428. if fieldname not in res:
  429. res[fieldname] = dict()
  430. if op in res[fieldname]:
  431. logger.warning("Dropping condition : '%s %s %s'" % (
  432. fieldname, op, value))
  433. else:
  434. if op not in cls.lodel2mongo_op_map:
  435. raise ValueError("Invalid operator : '%s'" % op)
  436. new_op = cls.lodel2mongo_op_map[op]
  437. res[fieldname][new_op] = value
  438. return res
  439. ##@brief Convert lodel2 operator and value to pymongo struct
  440. #
  441. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  442. #@param op str : take value in LeFilteredQuery::_query_operators
  443. #@param value mixed : the value
  444. #@return a tuple(mongo_op, mongo_value)
  445. @classmethod
  446. def __op_value_conv(cls, op, value):
  447. if op not in cls.lodel2mongo_op_map:
  448. msg = "Invalid operator '%s' found" % op
  449. raise MongoDbDataSourceError(msg)
  450. mongop = cls.lodel2mongo_op_map[op]
  451. mongoval = value
  452. #Converting lodel2 wildcarded string into a case insensitive
  453. #mongodb re
  454. if mongop in cls.mongo_op_re:
  455. #unescaping \
  456. mongoval = value.replace('\\\\','\\')
  457. if not mongoval.startswith('*'):
  458. mongoval = '^'+mongoval
  459. #For the end of the string it's harder to detect escaped *
  460. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  461. mongoval += '$'
  462. #Replacing every other unescaped wildcard char
  463. mongoval = cls.wildcard_re.sub('.*', mongoval)
  464. mongoval = {'$regex': mongoval, '$options': 'i'}
  465. return (op, mongoval)
  466. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  467. #@return a dict with mongo op as key and value as value...
  468. @classmethod
  469. def __op_value_listconv(cls, op_value_list):
  470. result = dict()
  471. for op, value in op_value_list:
  472. mongop, mongoval = cls.__op_value_conv(op, value)
  473. if mongop in result:
  474. warnings.warn("Duplicated value given for a single \
  475. field/operator couple in a query. We will keep only the first one")
  476. else:
  477. result[mongop] = mongoval
  478. return result
  479. ##@brief Generate a comparison function for post reccursion sorting in
  480. #select
  481. #@return a lambda function that take 2 dict as arguement
  482. @classmethod
  483. def __generate_lambda_cmp_order(cls, order):
  484. if len(order) == 0:
  485. return lambda a,b: 0
  486. glco = cls.__generate_lambda_cmp_order
  487. fname, cmpdir = order[0]
  488. order = order[1:]
  489. return lambda a,b: glco(order) if a[fname] == b[fname] else (\
  490. 1 if (a[fname]>b[fname] if cmpdir == 'ASC' else a[fname]<b[fname])\
  491. else -1)