|
@@ -1,5 +1,7 @@
|
1
|
1
|
# -*- coding: utf-8 -*-
|
2
|
2
|
|
|
3
|
+import re
|
|
4
|
+import warnings
|
3
|
5
|
import bson
|
4
|
6
|
from bson.son import SON
|
5
|
7
|
from collections import OrderedDict
|
|
@@ -7,41 +9,56 @@ import pymongo
|
7
|
9
|
from pymongo.errors import BulkWriteError
|
8
|
10
|
import urllib
|
9
|
11
|
|
10
|
|
-from .utils import mongodbconnect, object_collection_name, parse_query_filters, parse_query_order, MONGODB_SORT_OPERATORS_MAP
|
|
12
|
+from lodel import logger
|
|
13
|
+
|
|
14
|
+from .utils import mongodbconnect, object_collection_name, MONGODB_SORT_OPERATORS_MAP
|
11
|
15
|
|
12
|
16
|
class MongoDbDataSourceError(Exception):
|
13
|
17
|
pass
|
14
|
18
|
|
15
|
19
|
|
16
|
20
|
class MongoDbDatasource(object):
|
|
21
|
+
|
|
22
|
+ ##@brief Mapping from lodel2 operators to mongodb operator
|
|
23
|
+ lodel2mongo_op_map = {
|
|
24
|
+ '=':'$eq', '<=':'$lte', '>=':'$gte', '!=':'$ne', '<':'$lt',
|
|
25
|
+ '>':'$gt', 'in':'$in', 'not in':'$nin' }
|
|
26
|
+ ##@brief List of mongodb operators that expect re as value
|
|
27
|
+ mongo_op_re = ['$in', '$nin']
|
|
28
|
+ wildcard_re = re.compile('[^\\\\]\*')
|
17
|
29
|
|
18
|
30
|
## @brief instanciates a database object given a connection name
|
19
|
31
|
# @param connection_name str
|
20
|
32
|
def __init__(self, connection_name):
|
21
|
|
- self.database = mongodbconnect(connection_name)
|
22
|
|
-
|
23
|
|
- ## @brief returns a selection of documents from the datasource
|
24
|
|
- # @param target Emclass
|
25
|
|
- # @param field_list list
|
26
|
|
- # @param filters list : List of filters
|
27
|
|
- # @param rel_filters list : List of relational filters
|
28
|
|
- # @param order list : List of column to order. ex: order = [('title', 'ASC'),]
|
29
|
|
- # @param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
|
30
|
|
- # @param limit int : Number of records to be returned
|
31
|
|
- # @param offset int: used with limit to choose the start record
|
32
|
|
- # @param instanciate bool : If true, the records are returned as instances, else they are returned as dict
|
33
|
|
- # @return list
|
34
|
|
- # @todo Implement the relations
|
|
33
|
+ self.r_database = mongodbconnect(connection_name)
|
|
34
|
+
|
|
35
|
+ ##@brief returns a selection of documents from the datasource
|
|
36
|
+ #@param target Emclass
|
|
37
|
+ #@param field_list list
|
|
38
|
+ #@param filters list : List of filters
|
|
39
|
+ #@param rel_filters list : List of relational filters
|
|
40
|
+ #@param order list : List of column to order. ex: order = [('title', 'ASC'),]
|
|
41
|
+ #@param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
|
|
42
|
+ #@param limit int : Number of records to be returned
|
|
43
|
+ #@param offset int: used with limit to choose the start record
|
|
44
|
+ #@param instanciate bool : If true, the records are returned as instances, else they are returned as dict
|
|
45
|
+ #@return list
|
|
46
|
+ #@todo Implement the relations
|
35
|
47
|
def select(self, target, field_list, filters, rel_filters=None, order=None, group=None, limit=None, offset=0, instanciate=True):
|
36
|
48
|
collection_name = object_collection_name(target)
|
37
|
49
|
collection = self.database[collection_name]
|
38
|
|
- query_filters = parse_query_filters(filters)
|
39
|
|
- query_result_ordering = parse_query_order(order) if order is not None else None
|
|
50
|
+ query_filters = self.__process_filters(
|
|
51
|
+ target, filters, relational_filters)
|
|
52
|
+ query_result_ordering = None
|
|
53
|
+ if order is not None:
|
|
54
|
+ query_result_ordering = parse_query_order(order)
|
40
|
55
|
results_field_list = None if len(field_list) == 0 else field_list
|
41
|
56
|
limit = limit if limit is not None else 0
|
42
|
57
|
|
43
|
58
|
if group is None:
|
44
|
|
- cursor = collection.find(filter=query_filters, projection=results_field_list, skip=offset, limit=limit, sort=query_result_ordering)
|
|
59
|
+ cursor = collection.find(
|
|
60
|
+ filter=query_filters, projection=results_field_list,
|
|
61
|
+ skip=offset, limit=limit, sort=query_result_ordering)
|
45
|
62
|
else:
|
46
|
63
|
pipeline = list()
|
47
|
64
|
unwinding_list = list()
|
|
@@ -59,7 +76,9 @@ class MongoDbDatasource(object):
|
59
|
76
|
|
60
|
77
|
pipeline.append({'$match': query_filters})
|
61
|
78
|
if results_field_list is not None:
|
62
|
|
- pipeline.append({'$project': SON([{field_name: 1} for field_name in field_list])})
|
|
79
|
+ pipeline.append({
|
|
80
|
+ '$project': SON([{field_name: 1}
|
|
81
|
+ for field_name in field_list])})
|
63
|
82
|
pipeline.extend(unwinding_list)
|
64
|
83
|
pipeline.append({'$group': grouping_dict})
|
65
|
84
|
pipeline.extend({'$sort': SON(sorting_list)})
|
|
@@ -74,11 +93,12 @@ class MongoDbDatasource(object):
|
74
|
93
|
|
75
|
94
|
return results
|
76
|
95
|
|
77
|
|
- ## @brief Deletes one record defined by its uid
|
78
|
|
- # @param target Emclass : class of the record to delete
|
79
|
|
- # @param uid dict|list : a dictionary of fields and values composing the unique identifier of the record or a list of several dictionaries
|
80
|
|
- # @return int : number of deleted records
|
81
|
|
- # @TODO Implement the error management
|
|
96
|
+ ##@brief Deletes one record defined by its uid
|
|
97
|
+ #@param target Emclass : class of the record to delete
|
|
98
|
+ #@param uid dict|list : a dictionary of fields and values composing the
|
|
99
|
+ # unique identifier of the record or a list of several dictionaries
|
|
100
|
+ #@return int : number of deleted records
|
|
101
|
+ #@TODO Implement the error management
|
82
|
102
|
def delete(self, target, uid):
|
83
|
103
|
if isinstance(uid, dict):
|
84
|
104
|
uid = [uid]
|
|
@@ -122,3 +142,181 @@ class MongoDbDatasource(object):
|
122
|
142
|
collection = self.database[collection_name]
|
123
|
143
|
result = collection.insert_many(datas_list)
|
124
|
144
|
return len(result.inserted_ids)
|
|
145
|
+
|
|
146
|
+ ##@brief Return a pymongo collection given a LeObject child class
|
|
147
|
+ #@param leobject LeObject child class (no instance)
|
|
148
|
+ #return a pymongo.collection instance
|
|
149
|
+ def __collection(self, leobject):
|
|
150
|
+ return self.database[object_collection_name(leobject)]
|
|
151
|
+
|
|
152
|
+ ##@brief Perform subqueries implies by relational filters and append the
|
|
153
|
+ # result to existing filters
|
|
154
|
+ #
|
|
155
|
+ #The processing is divided in multiple steps :
|
|
156
|
+ # - determine (for each relational field of the target) every collection
|
|
157
|
+ #that are involved
|
|
158
|
+ # - generate subqueries for relational_filters that concerns a different
|
|
159
|
+ #collection than target collection
|
|
160
|
+ #filters
|
|
161
|
+ # - execute subqueries
|
|
162
|
+ # - transform subqueries results in filters
|
|
163
|
+ # - merge subqueries generated filters with existing filters
|
|
164
|
+ #
|
|
165
|
+ #@param target LeObject subclass (no instance) : Target class
|
|
166
|
+ #@param filters list : List of tuple(FIELDNAME, OP, VALUE)
|
|
167
|
+ #@param relational_filters : same composition thant filters except that
|
|
168
|
+ # FIELD is represented by a tuple(FIELDNAME, {CLASS1:RFIELD1,
|
|
169
|
+ # CLASS2:RFIELD2})
|
|
170
|
+ #@return a list of pymongo filters ( dict {FIELD:{OPERATOR:VALUE}} )
|
|
171
|
+ def __process_filters(self,target, filters, relational_filters):
|
|
172
|
+ # Simple filters lodel2 -> pymongo converting
|
|
173
|
+ res = [convert_filter(filt) for filt in filters]
|
|
174
|
+ rfilters = self.__prepare_relational_filters(relational_filters)
|
|
175
|
+ #Now that everything is well organized, begin to forge subquerie
|
|
176
|
+ #filters
|
|
177
|
+ subq_filters = self.__subqueries_from_relational_filters(
|
|
178
|
+ target, rfilters)
|
|
179
|
+ # Executing subqueries, creating filters from result, and injecting
|
|
180
|
+ # them in original filters of the query
|
|
181
|
+ if len(subq_filters) > 0:
|
|
182
|
+ logger.debug("Begining subquery execution")
|
|
183
|
+ for fname in subq_filters:
|
|
184
|
+ if fname not in res:
|
|
185
|
+ res[fname] = dict()
|
|
186
|
+ subq_results = set()
|
|
187
|
+ for leobject, sq_filters in subq_filters[fname].items():
|
|
188
|
+ uid_fname = mongo_fieldname(leobject._uid)
|
|
189
|
+ log_msg = "Subquery running on collection {coll} with filters \
|
|
190
|
+'{filters}'"
|
|
191
|
+ logger.debug(log_msg.format(
|
|
192
|
+ coll=object_collection_name(leobject),
|
|
193
|
+ filters=sq_filters))
|
|
194
|
+
|
|
195
|
+ cursor = self.__collection(leobject).find(
|
|
196
|
+ filter=sq_filters,
|
|
197
|
+ projection=uid_fname)
|
|
198
|
+ subq_results |= set(doc[uid_fname] for doc in cursor)
|
|
199
|
+ #generating new filter from result
|
|
200
|
+ if '$in' in res[fname]:
|
|
201
|
+ #WARNING we allready have a IN on this field, doing dedup
|
|
202
|
+ #from result
|
|
203
|
+ deduped = set(res[fname]['$in']) & subq
|
|
204
|
+ if len(deduped) == 0:
|
|
205
|
+ del(res[fname]['$in'])
|
|
206
|
+ else:
|
|
207
|
+ res[fname]['$in'] = list(deduped)
|
|
208
|
+ else:
|
|
209
|
+ res[fname]['$in'] = list(subq_results)
|
|
210
|
+ if len(subq_filters) > 0:
|
|
211
|
+ logger.debug("End of subquery execution")
|
|
212
|
+ return res
|
|
213
|
+
|
|
214
|
+ ##@brief Generate subqueries from rfilters tree
|
|
215
|
+ #
|
|
216
|
+ #Returned struct organization :
|
|
217
|
+ # - 1st level keys : relational field name of target
|
|
218
|
+ # - 2nd level keys : referenced leobject
|
|
219
|
+ # - 3th level values : pymongo filters (dict)
|
|
220
|
+ #
|
|
221
|
+ #@note The only caller of this method is __process_filters
|
|
222
|
+ #@warning No return value, the rfilters arguement is modified by
|
|
223
|
+ #reference
|
|
224
|
+ #
|
|
225
|
+ #@param target LeObject subclass (no instance) : Target class
|
|
226
|
+ #@param rfilters dict : A struct as returned by
|
|
227
|
+ #MongoDbDatasource.__prepare_relational_filters()
|
|
228
|
+ #@return None, the rfilters argument is modified by reference
|
|
229
|
+ def __subqueries_from_relational_filters(self, target, rfilters):
|
|
230
|
+ for fname in rfilters:
|
|
231
|
+ for leobject in rfilters[fname]:
|
|
232
|
+ for rfield in rfilters[fname][leobject]:
|
|
233
|
+ #This way of doing is not optimized but allows to trigger
|
|
234
|
+ #warnings in some case (2 different values for a same op
|
|
235
|
+ #on a same field on a same collection)
|
|
236
|
+ mongofilters = self.__op_value_listconv(
|
|
237
|
+ rfilters[fname][leobject][rfield])
|
|
238
|
+ rfilters[fname][leobject][rfield] = mongofilters
|
|
239
|
+
|
|
240
|
+ ##@brief Generate a tree from relational_filters
|
|
241
|
+ #
|
|
242
|
+ #The generated struct is a dict with :
|
|
243
|
+ # - 1st level keys : relational field name of target
|
|
244
|
+ # - 2nd level keys : referenced leobject
|
|
245
|
+ # - 3th level keys : referenced field in referenced class
|
|
246
|
+ # - 4th level values : list of tuple(op, value)
|
|
247
|
+ #
|
|
248
|
+ #@note The only caller of this method is __process_filters
|
|
249
|
+ #@warning An assertion is done : if two leobject are stored in the same
|
|
250
|
+ #collection they share the same uid
|
|
251
|
+ #
|
|
252
|
+ #@param target LeObject subclass (no instance) : Target class
|
|
253
|
+ #@param relational_filters : same composition thant filters except that
|
|
254
|
+ #@return a struct as described above
|
|
255
|
+ def __prepare_relational_filters(self, target, relational_filters):
|
|
256
|
+ # We are going to regroup relationnal filters by reference field
|
|
257
|
+ # then by collection
|
|
258
|
+ rfilters = dict()
|
|
259
|
+ for (fname, rfields), op, value in relational_filters:
|
|
260
|
+ if fname not in rfilters:
|
|
261
|
+ rfilters[fname] = dict()
|
|
262
|
+ rfilters[fname] = dict()
|
|
263
|
+ # Stores the representative leobject for associated to a collection
|
|
264
|
+ # name
|
|
265
|
+ leo_collname = dict()
|
|
266
|
+ # WARNING ! Here we assert that all leobject that are stored
|
|
267
|
+ # in a same collection are identified by the same field
|
|
268
|
+ for leobject, rfield in rfields.items():
|
|
269
|
+ #here we are filling a dict with leobject as index but
|
|
270
|
+ #we are doing a UNIQ on collection name
|
|
271
|
+ cur_collname = object_collection_name(leobject)
|
|
272
|
+ if cur_collname not in collnames:
|
|
273
|
+ leo_collname[cur_collame] = leobject
|
|
274
|
+ rfilters[fname][leobject] = dict()
|
|
275
|
+ #Fecthing the collection's representative leobject
|
|
276
|
+ repr_leo = leo_collname[cur_collname]
|
|
277
|
+
|
|
278
|
+ if rfield not in rfilters[fname][repr_leo]:
|
|
279
|
+ rfilters[fname][repr_leo][rfield] = list()
|
|
280
|
+ rfilters[fname][repr_leo][rfield].append((op, value))
|
|
281
|
+ return rfilters
|
|
282
|
+
|
|
283
|
+ ##@brief Convert lodel2 operator and value to pymongo struct
|
|
284
|
+ #
|
|
285
|
+ #Convertion is done using MongoDbDatasource::lodel2mongo_op_map
|
|
286
|
+ #@param op str : take value in LeFilteredQuery::_query_operators
|
|
287
|
+ #@param value mixed : the value
|
|
288
|
+ #@return a tuple(mongo_op, mongo_value)
|
|
289
|
+ def __op_value_conv(self, op, value):
|
|
290
|
+ if op not in self.lodel2mongo_op_map:
|
|
291
|
+ msg = "Invalid operator '%s' found" % op
|
|
292
|
+ raise MongoDbDataSourceError(msg)
|
|
293
|
+ mongop = self.lodel2mongo_op_map[op]
|
|
294
|
+ mongoval = value
|
|
295
|
+ #Converting lodel2 wildcarded string into a case insensitive
|
|
296
|
+ #mongodb re
|
|
297
|
+ if mongop in self.mon_op_re:
|
|
298
|
+ #unescaping \
|
|
299
|
+ mongoval = value.replace('\\\\','\\')
|
|
300
|
+ if not mongoval.startswith('*'):
|
|
301
|
+ mongoval = '^'+mongoval
|
|
302
|
+ #For the end of the string it's harder to detect escaped *
|
|
303
|
+ if not (mongoval[-1] == '*' and mongoval[-2] != '\\'):
|
|
304
|
+ mongoval += '$'
|
|
305
|
+ #Replacing every other unescaped wildcard char
|
|
306
|
+ mongoval = self.wildcard_re.sub('.*', mongoval)
|
|
307
|
+ mongoval = {'$regex': mongoval, '$options': 'i'}
|
|
308
|
+ return (op, mongoval)
|
|
309
|
+
|
|
310
|
+ ##@brief Convert a list of tuple(OP, VALUE) into a pymongo filter dict
|
|
311
|
+ #@return a dict with mongo op as key and value as value...
|
|
312
|
+ def __op_value_listconv(self, op_value_list):
|
|
313
|
+ result = dict()
|
|
314
|
+ for op, value in op_value_list:
|
|
315
|
+ mongop, mongoval = self.__op_value_conv(op, value)
|
|
316
|
+ if mongop in result:
|
|
317
|
+ warnings.warn("Duplicated value given for a single \
|
|
318
|
+field/operator couple in a query. We will keep only the first one")
|
|
319
|
+ else:
|
|
320
|
+ result[mongop] = mongoval
|
|
321
|
+ return result
|
|
322
|
+
|