No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. import bson
  5. from bson.son import SON
  6. from collections import OrderedDict
  7. import pymongo
  8. from pymongo.errors import BulkWriteError
  9. import urllib
  10. from lodel import logger
  11. from .utils import object_collection_name, connect, \
  12. MONGODB_SORT_OPERATORS_MAP, connection_string
  13. class MongoDbDataSourceError(Exception):
  14. pass
  15. class MongoDbDatasource(object):
  16. ##@brief Stores existing connections
  17. #
  18. #The key of this dict is a hash of the connection string + ro parameter.
  19. #The value is a dict with 2 keys :
  20. # - conn_count : the number of instanciated datasource that use this
  21. #connection
  22. # - db : the pymongo database object instance
  23. _connections = dict()
  24. ##@brief Mapping from lodel2 operators to mongodb operator
  25. lodel2mongo_op_map = {
  26. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  27. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  28. ##@brief List of mongodb operators that expect re as value
  29. mongo_op_re = ['$in', '$nin']
  30. wildcard_re = re.compile('[^\\\\]\*')
  31. ##@brief instanciates a database object given a connection name
  32. #@param host str : hostname or IP
  33. #@param port int : mongodb listening port
  34. #@param db_name str
  35. #@param username str
  36. #@param password str
  37. #@param ro bool : If True the Datasource is for read only, else the
  38. #Datasource is write only !
  39. def __init__(self, host, port, db_name, username, password, read_only = False):
  40. ##@brief Connections infos that can be kept securly
  41. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  42. ##@brief Is the instance read only ? (if not it's write only)
  43. self.__read_only = bool(read_only)
  44. ##@brief Uniq ID for mongodb connection
  45. self.__conn_hash= None
  46. ##@brief Stores the connection to MongoDB
  47. self.database = self.__connect(username, password)
  48. ##@brief Destructor that attempt to close connection to DB
  49. #
  50. #Decrease the conn_count of associated MongoDbDatasource::_connections
  51. #item. If it reach 0 close the connection to the db
  52. #@see MongoDbDatasource::__connect()
  53. def __del__(self):
  54. self._connections[self.__conn_hash]['conn_count'] -= 1
  55. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  56. self._connections[self.__conn_hash]['db'].close()
  57. del(self._connections[self.__conn_hash])
  58. ##@brief returns a selection of documents from the datasource
  59. #@param target Emclass
  60. #@param field_list list
  61. #@param filters list : List of filters
  62. #@param rel_filters list : List of relational filters
  63. #@param order list : List of column to order. ex: order = [('title', 'ASC'),]
  64. #@param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
  65. #@param limit int : Number of records to be returned
  66. #@param offset int: used with limit to choose the start record
  67. #@param instanciate bool : If true, the records are returned as instances, else they are returned as dict
  68. #@return list
  69. #@todo Implement the relations
  70. def select(self, target, field_list, filters, rel_filters=None, order=None, group=None, limit=None, offset=0):
  71. collection_name = object_collection_name(target)
  72. collection = self.database[collection_name]
  73. query_filters = self.__process_filters(
  74. target, filters, relational_filters)
  75. query_result_ordering = None
  76. if order is not None:
  77. query_result_ordering = parse_query_order(order)
  78. results_field_list = None if len(field_list) == 0 else field_list
  79. limit = limit if limit is not None else 0
  80. if group is None:
  81. cursor = collection.find(
  82. filter=query_filters, projection=results_field_list,
  83. skip=offset, limit=limit, sort=query_result_ordering)
  84. else:
  85. pipeline = list()
  86. unwinding_list = list()
  87. grouping_dict = OrderedDict()
  88. sorting_list = list()
  89. for group_param in group:
  90. field_name = group_param[0]
  91. field_sort_option = group_param[1]
  92. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  93. unwinding_list.append({'$unwind': '$%s' % field_name})
  94. grouping_dict[field_name] = '$%s' % field_name
  95. sorting_list.append((field_name, sort_option))
  96. sorting_list.extends(query_result_ordering)
  97. pipeline.append({'$match': query_filters})
  98. if results_field_list is not None:
  99. pipeline.append({
  100. '$project': SON([{field_name: 1}
  101. for field_name in field_list])})
  102. pipeline.extend(unwinding_list)
  103. pipeline.append({'$group': grouping_dict})
  104. pipeline.extend({'$sort': SON(sorting_list)})
  105. if offset > 0:
  106. pipeline.append({'$skip': offset})
  107. if limit is not None:
  108. pipeline.append({'$limit': limit})
  109. results = list()
  110. for document in cursor:
  111. results.append(document)
  112. return results
  113. ##@brief Deletes records according to given filters
  114. #@param target Emclass : class of the record to delete
  115. #@param filters list : List of filters
  116. #@param relational_filters list : List of relational filters
  117. #@return int : number of deleted records
  118. def delete(self, target, filters, relational_filters):
  119. mongo_filters = self.__process_filters(
  120. target, filters, relational_filters)
  121. res = self.__collection(target).delete_many(mongo_filters)
  122. return res.deleted_count
  123. ## @brief updates records according to given filters
  124. #@param target Emclass : class of the object to insert
  125. #@param filters list : List of filters
  126. #@param rel_filters list : List of relational filters
  127. #@param upd_datas dict : datas to update (new values)
  128. #@return int : Number of updated records
  129. def update(self, target, filters, relational_filters, upd_datas):
  130. mongo_filters = self.__process_filters(
  131. target, filters, relational_filters)
  132. res = self.__collection(target).update_many(mongo_filters, upd_datas)
  133. return res.modified_count()
  134. ## @brief Inserts a record in a given collection
  135. # @param target Emclass : class of the object to insert
  136. # @param new_datas dict : datas to insert
  137. # @return the inserted uid
  138. def insert(self, target, new_datas):
  139. res = self.__collection(target).insert_one(new_datas)
  140. return res.inserted_id
  141. ## @brief Inserts a list of records in a given collection
  142. # @param target Emclass : class of the objects inserted
  143. # @param datas_list list : list of dict
  144. # @return list : list of the inserted records' ids
  145. def insert_multi(self, target, datas_list):
  146. res = self.__collection.insert_many(datas_list)
  147. return list(result.inserted_ids)
  148. ##@brief Connect to database
  149. #@not this method avoid opening two times the same connection using
  150. #MongoDbDatasource::_connections static attribute
  151. #@param host str : hostname or IP
  152. #@param port int : mongodb listening port
  153. #@param db_name str
  154. #@param username str
  155. #@param password str
  156. #@param ro bool : If True the Datasource is for read only, else the
  157. def __connect(self, username, password, ro):
  158. conn_string = connection_string(
  159. username = username, password = password, **self.__db_infos)
  160. conn_string += "__ReadOnly__:"+self.__read_only
  161. self.__conf_hash = conn_h = hash(conn_string)
  162. if conn_h in self._connections:
  163. self._connections[conn_h]['conn_count'] += 1
  164. return self._connections[conn_h]['db']
  165. ##@brief Return a pymongo collection given a LeObject child class
  166. #@param leobject LeObject child class (no instance)
  167. #return a pymongo.collection instance
  168. def __collection(self, leobject):
  169. return self.database[object_collection_name(leobject)]
  170. ##@brief Perform subqueries implies by relational filters and append the
  171. # result to existing filters
  172. #
  173. #The processing is divided in multiple steps :
  174. # - determine (for each relational field of the target) every collection
  175. #that are involved
  176. # - generate subqueries for relational_filters that concerns a different
  177. #collection than target collection
  178. #filters
  179. # - execute subqueries
  180. # - transform subqueries results in filters
  181. # - merge subqueries generated filters with existing filters
  182. #
  183. #@param target LeObject subclass (no instance) : Target class
  184. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  185. #@param relational_filters : same composition thant filters except that
  186. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  187. # CLASS2:RFIELD2})
  188. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  189. def __process_filters(self,target, filters, relational_filters):
  190. # Simple filters lodel2 -> pymongo converting
  191. res = [convert_filter(filt) for filt in filters]
  192. rfilters = self.__prepare_relational_filters(relational_filters)
  193. #Now that everything is well organized, begin to forge subquerie
  194. #filters
  195. subq_filters = self.__subqueries_from_relational_filters(
  196. target, rfilters)
  197. # Executing subqueries, creating filters from result, and injecting
  198. # them in original filters of the query
  199. if len(subq_filters) > 0:
  200. logger.debug("Begining subquery execution")
  201. for fname in subq_filters:
  202. if fname not in res:
  203. res[fname] = dict()
  204. subq_results = set()
  205. for leobject, sq_filters in subq_filters[fname].items():
  206. uid_fname = mongo_fieldname(leobject._uid)
  207. log_msg = "Subquery running on collection {coll} with filters \
  208. '{filters}'"
  209. logger.debug(log_msg.format(
  210. coll=object_collection_name(leobject),
  211. filters=sq_filters))
  212. cursor = self.__collection(leobject).find(
  213. filter=sq_filters,
  214. projection=uid_fname)
  215. subq_results |= set(doc[uid_fname] for doc in cursor)
  216. #generating new filter from result
  217. if '$in' in res[fname]:
  218. #WARNING we allready have a IN on this field, doing dedup
  219. #from result
  220. deduped = set(res[fname]['$in']) & subq
  221. if len(deduped) == 0:
  222. del(res[fname]['$in'])
  223. else:
  224. res[fname]['$in'] = list(deduped)
  225. else:
  226. res[fname]['$in'] = list(subq_results)
  227. if len(subq_filters) > 0:
  228. logger.debug("End of subquery execution")
  229. return res
  230. ##@brief Generate subqueries from rfilters tree
  231. #
  232. #Returned struct organization :
  233. # - 1st level keys : relational field name of target
  234. # - 2nd level keys : referenced leobject
  235. # - 3th level values : pymongo filters (dict)
  236. #
  237. #@note The only caller of this method is __process_filters
  238. #@warning No return value, the rfilters arguement is modified by
  239. #reference
  240. #
  241. #@param target LeObject subclass (no instance) : Target class
  242. #@param rfilters dict : A struct as returned by
  243. #MongoDbDatasource.__prepare_relational_filters()
  244. #@return None, the rfilters argument is modified by reference
  245. def __subqueries_from_relational_filters(self, target, rfilters):
  246. for fname in rfilters:
  247. for leobject in rfilters[fname]:
  248. for rfield in rfilters[fname][leobject]:
  249. #This way of doing is not optimized but allows to trigger
  250. #warnings in some case (2 different values for a same op
  251. #on a same field on a same collection)
  252. mongofilters = self.__op_value_listconv(
  253. rfilters[fname][leobject][rfield])
  254. rfilters[fname][leobject][rfield] = mongofilters
  255. ##@brief Generate a tree from relational_filters
  256. #
  257. #The generated struct is a dict with :
  258. # - 1st level keys : relational field name of target
  259. # - 2nd level keys : referenced leobject
  260. # - 3th level keys : referenced field in referenced class
  261. # - 4th level values : list of tuple(op, value)
  262. #
  263. #@note The only caller of this method is __process_filters
  264. #@warning An assertion is done : if two leobject are stored in the same
  265. #collection they share the same uid
  266. #
  267. #@param target LeObject subclass (no instance) : Target class
  268. #@param relational_filters : same composition thant filters except that
  269. #@return a struct as described above
  270. def __prepare_relational_filters(self, target, relational_filters):
  271. # We are going to regroup relationnal filters by reference field
  272. # then by collection
  273. rfilters = dict()
  274. for (fname, rfields), op, value in relational_filters:
  275. if fname not in rfilters:
  276. rfilters[fname] = dict()
  277. rfilters[fname] = dict()
  278. # Stores the representative leobject for associated to a collection
  279. # name
  280. leo_collname = dict()
  281. # WARNING ! Here we assert that all leobject that are stored
  282. # in a same collection are identified by the same field
  283. for leobject, rfield in rfields.items():
  284. #here we are filling a dict with leobject as index but
  285. #we are doing a UNIQ on collection name
  286. cur_collname = object_collection_name(leobject)
  287. if cur_collname not in collnames:
  288. leo_collname[cur_collame] = leobject
  289. rfilters[fname][leobject] = dict()
  290. #Fecthing the collection's representative leobject
  291. repr_leo = leo_collname[cur_collname]
  292. if rfield not in rfilters[fname][repr_leo]:
  293. rfilters[fname][repr_leo][rfield] = list()
  294. rfilters[fname][repr_leo][rfield].append((op, value))
  295. return rfilters
  296. ##@brief Convert lodel2 operator and value to pymongo struct
  297. #
  298. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  299. #@param op str : take value in LeFilteredQuery::_query_operators
  300. #@param value mixed : the value
  301. #@return a tuple(mongo_op, mongo_value)
  302. def __op_value_conv(self, op, value):
  303. if op not in self.lodel2mongo_op_map:
  304. msg = "Invalid operator '%s' found" % op
  305. raise MongoDbDataSourceError(msg)
  306. mongop = self.lodel2mongo_op_map[op]
  307. mongoval = value
  308. #Converting lodel2 wildcarded string into a case insensitive
  309. #mongodb re
  310. if mongop in self.mon_op_re:
  311. #unescaping \
  312. mongoval = value.replace('\\\\','\\')
  313. if not mongoval.startswith('*'):
  314. mongoval = '^'+mongoval
  315. #For the end of the string it's harder to detect escaped *
  316. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  317. mongoval += '$'
  318. #Replacing every other unescaped wildcard char
  319. mongoval = self.wildcard_re.sub('.*', mongoval)
  320. mongoval = {'$regex': mongoval, '$options': 'i'}
  321. return (op, mongoval)
  322. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  323. #@return a dict with mongo op as key and value as value...
  324. def __op_value_listconv(self, op_value_list):
  325. result = dict()
  326. for op, value in op_value_list:
  327. mongop, mongoval = self.__op_value_conv(op, value)
  328. if mongop in result:
  329. warnings.warn("Duplicated value given for a single \
  330. field/operator couple in a query. We will keep only the first one")
  331. else:
  332. result[mongop] = mongoval
  333. return result