|
@@ -0,0 +1,124 @@
|
|
1
|
+# -*- coding: utf-8 -*-
|
|
2
|
+
|
|
3
|
+import bson
|
|
4
|
+from bson.son import SON
|
|
5
|
+from collections import OrderedDict
|
|
6
|
+import pymongo
|
|
7
|
+from pymongo.errors import BulkWriteError
|
|
8
|
+import urllib
|
|
9
|
+
|
|
10
|
+from .utils import mongodbconnect, object_collection_name, parse_query_filters, parse_query_order, MONGODB_SORT_OPERATORS_MAP
|
|
11
|
+
|
|
12
|
+class MongoDbDataSourceError(Exception):
|
|
13
|
+ pass
|
|
14
|
+
|
|
15
|
+
|
|
16
|
+class MongoDbDatasource(object):
|
|
17
|
+
|
|
18
|
+ ## @brief instanciates a database object given a connection name
|
|
19
|
+ # @param connection_name str
|
|
20
|
+ def __init__(self, connection_name):
|
|
21
|
+ self.database = mongodbconnect(connection_name)
|
|
22
|
+
|
|
23
|
+ ## @brief returns a selection of documents from the datasource
|
|
24
|
+ # @param target Emclass
|
|
25
|
+ # @param field_list list
|
|
26
|
+ # @param filters list : List of filters
|
|
27
|
+ # @param rel_filters list : List of relational filters
|
|
28
|
+ # @param order list : List of column to order. ex: order = [('title', 'ASC'),]
|
|
29
|
+ # @param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
|
|
30
|
+ # @param limit int : Number of records to be returned
|
|
31
|
+ # @param offset int: used with limit to choose the start record
|
|
32
|
+ # @param instanciate bool : If true, the records are returned as instances, else they are returned as dict
|
|
33
|
+ # @return list
|
|
34
|
+ # @todo Implement the relations
|
|
35
|
+ def select(self, target, field_list, filters, rel_filters=None, order=None, group=None, limit=None, offset=0, instanciate=True):
|
|
36
|
+ collection_name = object_collection_name(target.__class__)
|
|
37
|
+ collection = self.database[collection_name]
|
|
38
|
+ query_filters = parse_query_filters(filters)
|
|
39
|
+ query_result_ordering = parse_query_order(order) if order is not None else None
|
|
40
|
+ results_field_list = None if len(field_list) == 0 else field_list
|
|
41
|
+ limit = limit if limit is not None else 0
|
|
42
|
+
|
|
43
|
+ if group is None:
|
|
44
|
+ cursor = collection.find(filter=query_filters, projection=results_field_list, skip=offset, limit=limit, sort=query_result_ordering)
|
|
45
|
+ else:
|
|
46
|
+ pipeline = list()
|
|
47
|
+ unwinding_list = list()
|
|
48
|
+ grouping_dict = OrderedDict()
|
|
49
|
+ sorting_list = list()
|
|
50
|
+ for group_param in group:
|
|
51
|
+ field_name = group_param[0]
|
|
52
|
+ field_sort_option = group_param[1]
|
|
53
|
+ sort_option = MONGODB_SORT_OPERATORS_MAP[field_sort_option]
|
|
54
|
+ unwinding_list.append({'$unwind': '$%s' % field_name})
|
|
55
|
+ grouping_dict[field_name] = '$%s' % field_name
|
|
56
|
+ sorting_list.append((field_name, sort_option))
|
|
57
|
+
|
|
58
|
+ sorting_list.extends(query_result_ordering)
|
|
59
|
+
|
|
60
|
+ pipeline.append({'$match': query_filters})
|
|
61
|
+ if results_field_list is not None:
|
|
62
|
+ pipeline.append({'$project': SON([{field_name: 1} for field_name in field_list])})
|
|
63
|
+ pipeline.extend(unwinding_list)
|
|
64
|
+ pipeline.append({'$group': grouping_dict})
|
|
65
|
+ pipeline.extend({'$sort': SON(sorting_list)})
|
|
66
|
+ if offset > 0:
|
|
67
|
+ pipeline.append({'$skip': offset})
|
|
68
|
+ if limit is not None:
|
|
69
|
+ pipeline.append({'$limit': limit})
|
|
70
|
+
|
|
71
|
+ results = list()
|
|
72
|
+ for document in cursor:
|
|
73
|
+ results.append(document)
|
|
74
|
+
|
|
75
|
+ return results
|
|
76
|
+
|
|
77
|
+ ## @brief Deletes one record defined by its uid
|
|
78
|
+ # @param target Emclass : class of the record to delete
|
|
79
|
+ # @param uid dict|list : a dictionary of fields and values composing the unique identifier of the record or a list of several dictionaries
|
|
80
|
+ # @return int : number of deleted records
|
|
81
|
+ # @TODO Implement the error management
|
|
82
|
+ def delete(self, target, uid):
|
|
83
|
+ if isinstance(uid, dict):
|
|
84
|
+ uid = [uid]
|
|
85
|
+ collection_name = object_collection_name(target.__class__)
|
|
86
|
+ collection = self.database[collection_name]
|
|
87
|
+ result = collection.delete_many(uid)
|
|
88
|
+ return result.deleted_count
|
|
89
|
+
|
|
90
|
+ ## @brief updates one or a list of records
|
|
91
|
+ # @param target Emclass : class of the object to insert
|
|
92
|
+ # @param uids list : list of uids to update
|
|
93
|
+ # @param datas dict : datas to update (new values)
|
|
94
|
+ # @return int : Number of updated records
|
|
95
|
+ # @todo check if the values need to be parsed
|
|
96
|
+ def update(self, target, uids, **datas):
|
|
97
|
+ if not isinstance(uids, list):
|
|
98
|
+ uids = [uids]
|
|
99
|
+ collection_name = object_collection_name(target.__class__)
|
|
100
|
+ collection = self.database[collection_name]
|
|
101
|
+ results = collection.update_many({'uid': {'$in': uids}}, datas)
|
|
102
|
+ return results.modified_count()
|
|
103
|
+
|
|
104
|
+ ## @brief Inserts a record in a given collection
|
|
105
|
+ # @param target Emclass : class of the object to insert
|
|
106
|
+ # @param datas dict : datas to insert
|
|
107
|
+ # @return bool
|
|
108
|
+ # @TODO Implement the error management
|
|
109
|
+ def insert(self, target, **datas):
|
|
110
|
+ collection_name = object_collection_name(target.__class__)
|
|
111
|
+ collection = self.database[collection_name]
|
|
112
|
+ result = collection.insert_one(datas)
|
|
113
|
+ return len(result.inserted_id)
|
|
114
|
+
|
|
115
|
+ ## @brief Inserts a list of records in a given collection
|
|
116
|
+ # @param target Emclass : class of the objects inserted
|
|
117
|
+ # @param datas_list
|
|
118
|
+ # @return list : list of the inserted records' ids
|
|
119
|
+ # @TODO Implement the error management
|
|
120
|
+ def insert_multi(self, target, datas_list):
|
|
121
|
+ collection_name = object_collection_name(target.__class__)
|
|
122
|
+ collection = self.database[collection_name]
|
|
123
|
+ result = collection.insert_many(datas_list)
|
|
124
|
+ return len(result.inserted_ids)
|