Ingen beskrivning
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 16KB


  1. # -*- coding: utf-8 -*-
  2. import re
  3. import warnings
  4. from bson.son import SON
  5. from collections import OrderedDict
  6. import pymongo
  7. from pymongo.errors import BulkWriteError
  8. from lodel import logger
  9. from . import utils
  10. from .utils import object_collection_name,\
  11. MONGODB_SORT_OPERATORS_MAP, connection_string
  12. class MongoDbDataSourceError(Exception):
  13. pass
  14. class MongoDbDatasource(object):
  15. ##@brief Stores existing connections
  16. #
  17. #The key of this dict is a hash of the connection string + ro parameter.
  18. #The value is a dict with 2 keys :
  19. # - conn_count : the number of instanciated datasource that use this
  20. #connection
  21. # - db : the pymongo database object instance
  22. _connections = dict()
  23. ##@brief Mapping from lodel2 operators to mongodb operator
  24. lodel2mongo_op_map = {
  25. '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
  26. '>':'$gt', 'in':'$in', 'not in':'$nin' }
  27. ##@brief List of mongodb operators that expect re as value
  28. mongo_op_re = ['$in', '$nin']
  29. wildcard_re = re.compile('[^\\\\]\*')
  30. ##@brief instanciates a database object given a connection name
  31. #@param host str : hostname or IP
  32. #@param port int : mongodb listening port
  33. #@param db_name str
  34. #@param username str
  35. #@param password str
  36. #@param ro bool : If True the Datasource is for read only, else the
  37. #Datasource is write only !
  38. def __init__(self, host, port, db_name, username, password, read_only = False):
  39. ##@brief Connections infos that can be kept securly
  40. self.__db_infos = {'host': host, 'port': port, 'db_name': db_name}
  41. ##@brief Is the instance read only ? (if not it's write only)
  42. self.__read_only = bool(read_only)
  43. ##@brief Uniq ID for mongodb connection
  44. self.__conn_hash= None
  45. ##@brief Stores the database cursor
  46. self.database = self.__connect(
  47. username, password, ro = self.__read_only)
  48. ##@brief Destructor that attempt to close connection to DB
  49. #
  50. #Decrease the conn_count of associated MongoDbDatasource::_connections
  51. #item. If it reach 0 close the connection to the db
  52. #@see MongoDbDatasource::__connect()
  53. def __del__(self):
  54. self._connections[self.__conn_hash]['conn_count'] -= 1
  55. if self._connections[self.__conn_hash]['conn_count'] <= 0:
  56. self._connections[self.__conn_hash]['db'].close()
  57. del(self._connections[self.__conn_hash])
  58. logger.info("Closing connection to database")
  59. ##@brief returns a selection of documents from the datasource
  60. #@param target Emclass
  61. #@param field_list list
  62. #@param filters list : List of filters
  63. #@param rel_filters list : List of relational filters
  64. #@param order list : List of column to order. ex: order = [('title', 'ASC'),]
  65. #@param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
  66. #@param limit int : Number of records to be returned
  67. #@param offset int: used with limit to choose the start record
  68. #@param instanciate bool : If true, the records are returned as instances, else they are returned as dict
  69. #@return list
  70. #@todo Implement the relations
  71. def select(self, target, field_list, filters, rel_filters=None, order=None, group=None, limit=None, offset=0):
  72. collection_name = object_collection_name(target)
  73. collection = self.database[collection_name]
  74. query_filters = self.__process_filters(
  75. target, filters, relational_filters)
  76. query_result_ordering = None
  77. if order is not None:
  78. query_result_ordering = parse_query_order(order)
  79. results_field_list = None if len(field_list) == 0 else field_list
  80. limit = limit if limit is not None else 0
  81. if group is None:
  82. cursor = collection.find(
  83. filter=query_filters, projection=results_field_list,
  84. skip=offset, limit=limit, sort=query_result_ordering)
  85. else:
  86. pipeline = list()
  87. unwinding_list = list()
  88. grouping_dict = OrderedDict()
  89. sorting_list = list()
  90. for group_param in group:
  91. field_name = group_param[0]
  92. field_sort_option = group_param[1]
  93. sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  94. unwinding_list.append({'$unwind': '$%s' % field_name})
  95. grouping_dict[field_name] = '$%s' % field_name
  96. sorting_list.append((field_name, sort_option))
  97. sorting_list.extends(query_result_ordering)
  98. pipeline.append({'$match': query_filters})
  99. if results_field_list is not None:
  100. pipeline.append({
  101. '$project': SON([{field_name: 1}
  102. for field_name in field_list])})
  103. pipeline.extend(unwinding_list)
  104. pipeline.append({'$group': grouping_dict})
  105. pipeline.extend({'$sort': SON(sorting_list)})
  106. if offset > 0:
  107. pipeline.append({'$skip': offset})
  108. if limit is not None:
  109. pipeline.append({'$limit': limit})
  110. results = list()
  111. for document in cursor:
  112. results.append(document)
  113. return results
  114. ##@brief Deletes records according to given filters
  115. #@param target Emclass : class of the record to delete
  116. #@param filters list : List of filters
  117. #@param relational_filters list : List of relational filters
  118. #@return int : number of deleted records
  119. def delete(self, target, filters, relational_filters):
  120. mongo_filters = self.__process_filters(
  121. target, filters, relational_filters)
  122. res = self.__collection(target).delete_many(mongo_filters)
  123. return res.deleted_count
  124. ## @brief updates records according to given filters
  125. #@param target Emclass : class of the object to insert
  126. #@param filters list : List of filters
  127. #@param rel_filters list : List of relational filters
  128. #@param upd_datas dict : datas to update (new values)
  129. #@return int : Number of updated records
  130. def update(self, target, filters, relational_filters, upd_datas):
  131. mongo_filters = self.__process_filters(
  132. target, filters, relational_filters)
  133. res = self.__collection(target).update_many(mongo_filters, upd_datas)
  134. return res.modified_count()
  135. ## @brief Inserts a record in a given collection
  136. # @param target Emclass : class of the object to insert
  137. # @param new_datas dict : datas to insert
  138. # @return the inserted uid
  139. def insert(self, target, new_datas):
  140. res = self.__collection(target).insert_one(new_datas)
  141. return res.inserted_id
  142. ## @brief Inserts a list of records in a given collection
  143. # @param target Emclass : class of the objects inserted
  144. # @param datas_list list : list of dict
  145. # @return list : list of the inserted records' ids
  146. def insert_multi(self, target, datas_list):
  147. res = self.__collection.insert_many(datas_list)
  148. return list(result.inserted_ids)
  149. ##@brief Connect to database
  150. #@not this method avoid opening two times the same connection using
  151. #MongoDbDatasource::_connections static attribute
  152. #@param host str : hostname or IP
  153. #@param port int : mongodb listening port
  154. #@param db_name str
  155. #@param username str
  156. #@param password str
  157. #@param ro bool : If True the Datasource is for read only, else the
  158. def __connect(self, username, password, ro):
  159. conn_string = connection_string(
  160. username = username, password = password,
  161. host = self.__db_infos['host'],
  162. port = self.__db_infos['port'])
  163. conn_string += "__ReadOnly__:"+str(self.__read_only)
  164. self.__conn_hash = conn_h = hash(conn_string)
  165. if conn_h in self._connections:
  166. self._connections[conn_h]['conn_count'] += 1
  167. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  168. else:
  169. logger.info("Opening a new connection to database")
  170. self._connections[conn_h] = {
  171. 'conn_count': 1,
  172. 'db': utils.connection(
  173. host = self.__db_infos['host'],
  174. port = self.__db_infos['port'],
  175. username = username,
  176. password = password)}
  177. return self._connections[conn_h]['db'][self.__db_infos['db_name']]
  178. ##@brief Return a pymongo collection given a LeObject child class
  179. #@param leobject LeObject child class (no instance)
  180. #return a pymongo.collection instance
  181. def __collection(self, leobject):
  182. return self.database[object_collection_name(leobject)]
  183. ##@brief Perform subqueries implies by relational filters and append the
  184. # result to existing filters
  185. #
  186. #The processing is divided in multiple steps :
  187. # - determine (for each relational field of the target) every collection
  188. #that are involved
  189. # - generate subqueries for relational_filters that concerns a different
  190. #collection than target collection
  191. #filters
  192. # - execute subqueries
  193. # - transform subqueries results in filters
  194. # - merge subqueries generated filters with existing filters
  195. #
  196. #@param target LeObject subclass (no instance) : Target class
  197. #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
  198. #@param relational_filters : same composition thant filters except that
  199. # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
  200. # CLASS2:RFIELD2})
  201. #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
  202. def __process_filters(self,target, filters, relational_filters):
  203. # Simple filters lodel2 -> pymongo converting
  204. res = [convert_filter(filt) for filt in filters]
  205. rfilters = self.__prepare_relational_filters(relational_filters)
  206. #Now that everything is well organized, begin to forge subquerie
  207. #filters
  208. subq_filters = self.__subqueries_from_relational_filters(
  209. target, rfilters)
  210. # Executing subqueries, creating filters from result, and injecting
  211. # them in original filters of the query
  212. if len(subq_filters) > 0:
  213. logger.debug("Begining subquery execution")
  214. for fname in subq_filters:
  215. if fname not in res:
  216. res[fname] = dict()
  217. subq_results = set()
  218. for leobject, sq_filters in subq_filters[fname].items():
  219. uid_fname = mongo_fieldname(leobject._uid)
  220. log_msg = "Subquery running on collection {coll} with filters \
  221. '{filters}'"
  222. logger.debug(log_msg.format(
  223. coll=object_collection_name(leobject),
  224. filters=sq_filters))
  225. cursor = self.__collection(leobject).find(
  226. filter=sq_filters,
  227. projection=uid_fname)
  228. subq_results |= set(doc[uid_fname] for doc in cursor)
  229. #generating new filter from result
  230. if '$in' in res[fname]:
  231. #WARNING we allready have a IN on this field, doing dedup
  232. #from result
  233. deduped = set(res[fname]['$in']) & subq
  234. if len(deduped) == 0:
  235. del(res[fname]['$in'])
  236. else:
  237. res[fname]['$in'] = list(deduped)
  238. else:
  239. res[fname]['$in'] = list(subq_results)
  240. if len(subq_filters) > 0:
  241. logger.debug("End of subquery execution")
  242. return res
  243. ##@brief Generate subqueries from rfilters tree
  244. #
  245. #Returned struct organization :
  246. # - 1st level keys : relational field name of target
  247. # - 2nd level keys : referenced leobject
  248. # - 3th level values : pymongo filters (dict)
  249. #
  250. #@note The only caller of this method is __process_filters
  251. #@warning No return value, the rfilters arguement is modified by
  252. #reference
  253. #
  254. #@param target LeObject subclass (no instance) : Target class
  255. #@param rfilters dict : A struct as returned by
  256. #MongoDbDatasource.__prepare_relational_filters()
  257. #@return None, the rfilters argument is modified by reference
  258. def __subqueries_from_relational_filters(self, target, rfilters):
  259. for fname in rfilters:
  260. for leobject in rfilters[fname]:
  261. for rfield in rfilters[fname][leobject]:
  262. #This way of doing is not optimized but allows to trigger
  263. #warnings in some case (2 different values for a same op
  264. #on a same field on a same collection)
  265. mongofilters = self.__op_value_listconv(
  266. rfilters[fname][leobject][rfield])
  267. rfilters[fname][leobject][rfield] = mongofilters
  268. ##@brief Generate a tree from relational_filters
  269. #
  270. #The generated struct is a dict with :
  271. # - 1st level keys : relational field name of target
  272. # - 2nd level keys : referenced leobject
  273. # - 3th level keys : referenced field in referenced class
  274. # - 4th level values : list of tuple(op, value)
  275. #
  276. #@note The only caller of this method is __process_filters
  277. #@warning An assertion is done : if two leobject are stored in the same
  278. #collection they share the same uid
  279. #
  280. #@param target LeObject subclass (no instance) : Target class
  281. #@param relational_filters : same composition thant filters except that
  282. #@return a struct as described above
  283. def __prepare_relational_filters(self, target, relational_filters):
  284. # We are going to regroup relationnal filters by reference field
  285. # then by collection
  286. rfilters = dict()
  287. for (fname, rfields), op, value in relational_filters:
  288. if fname not in rfilters:
  289. rfilters[fname] = dict()
  290. rfilters[fname] = dict()
  291. # Stores the representative leobject for associated to a collection
  292. # name
  293. leo_collname = dict()
  294. # WARNING ! Here we assert that all leobject that are stored
  295. # in a same collection are identified by the same field
  296. for leobject, rfield in rfields.items():
  297. #here we are filling a dict with leobject as index but
  298. #we are doing a UNIQ on collection name
  299. cur_collname = object_collection_name(leobject)
  300. if cur_collname not in collnames:
  301. leo_collname[cur_collame] = leobject
  302. rfilters[fname][leobject] = dict()
  303. #Fecthing the collection's representative leobject
  304. repr_leo = leo_collname[cur_collname]
  305. if rfield not in rfilters[fname][repr_leo]:
  306. rfilters[fname][repr_leo][rfield] = list()
  307. rfilters[fname][repr_leo][rfield].append((op, value))
  308. return rfilters
  309. ##@brief Convert lodel2 operator and value to pymongo struct
  310. #
  311. #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
  312. #@param op str : take value in LeFilteredQuery::_query_operators
  313. #@param value mixed : the value
  314. #@return a tuple(mongo_op, mongo_value)
  315. def __op_value_conv(self, op, value):
  316. if op not in self.lodel2mongo_op_map:
  317. msg = "Invalid operator '%s' found" % op
  318. raise MongoDbDataSourceError(msg)
  319. mongop = self.lodel2mongo_op_map[op]
  320. mongoval = value
  321. #Converting lodel2 wildcarded string into a case insensitive
  322. #mongodb re
  323. if mongop in self.mon_op_re:
  324. #unescaping \
  325. mongoval = value.replace('\\\\','\\')
  326. if not mongoval.startswith('*'):
  327. mongoval = '^'+mongoval
  328. #For the end of the string it's harder to detect escaped *
  329. if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
  330. mongoval += '$'
  331. #Replacing every other unescaped wildcard char
  332. mongoval = self.wildcard_re.sub('.*', mongoval)
  333. mongoval = {'$regex': mongoval, '$options': 'i'}
  334. return (op, mongoval)
  335. ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
  336. #@return a dict with mongo op as key and value as value...
  337. def __op_value_listconv(self, op_value_list):
  338. result = dict()
  339. for op, value in op_value_list:
  340. mongop, mongoval = self.__op_value_conv(op, value)
  341. if mongop in result:
  342. warnings.warn("Duplicated value given for a single \
  343. field/operator couple in a query. We will keep only the first one")
  344. else:
  345. result[mongop] = mongoval
  346. return result