No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource.py 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. import bson
  3. from bson.son import SON
  4. from collections import OrderedDict
  5. import pymongo
  6. # from pymongo import MongoClient
  7. from pymongo.errors import BulkWriteError
  8. import urllib
  9. import lodel.datasource.mongodb.utils as utils
  10. from lodel.datasource.generic.datasource import GenericDataSource
  11. class MongoDbDataSourceError(Exception):
  12. pass
  13. class MongoDbDataSource(GenericDataSource):
  14. ## @brief Instanciates a Database object given a connection name
  15. # @param connection_name str
  16. def __init__(self, connection_name='default'):
  17. self.database = utils.mongodbconnect(connection_name)
  18. ## @brief returns a selection of documents from the datasource
  19. # @param target_cls Emclass
  20. # @param field_list list
  21. # @param filters list : List of filters
  22. # @param rel_filters list : List of relational filters
  23. # @param order list : List of column to order. ex: order = [('title', 'ASC'),]
  24. # @param group list : List of tupple representing the column used as "group by" fields. ex: group = [('title', 'ASC'),]
  25. # @param limit int : Number of records to be returned
  26. # @param offset int: used with limit to choose the start record
  27. # @param instanciate bool : If true, the records are returned as instances, else they are returned as dict
  28. # @return list
  29. # @todo Implement the relations
  30. def select(self, target_cls, field_list, filters, rel_filters=None, order=None, group=None, limit=None, offset=0,
  31. instanciate=True):
  32. collection_name = utils.object_collection_name(target_cls.__class__)
  33. collection = self.database[collection_name]
  34. query_filters = utils.parse_query_filters(filters)
  35. query_result_ordering = utils.parse_query_order(order) if order is not None else None
  36. results_field_list = None if len(field_list) == 0 else field_list
  37. limit = limit if limit is not None else 0
  38. if group is None:
  39. cursor = collection.find(
  40. filter=query_filters,
  41. projection=results_field_list,
  42. skip=offset,
  43. limit=limit,
  44. sort=query_result_ordering
  45. )
  46. else:
  47. pipeline = list()
  48. unwinding_list = list()
  49. grouping_dict = OrderedDict()
  50. sorting_list = list()
  51. for group_param in group:
  52. field_name = group_param[0]
  53. field_sort_option = group_param[1]
  54. sort_option = utils.MONGODB_SORT_OPERATORS_MAP[field_sort_option]
  55. unwinding_list.append({'$unwind': '$%s' % field_name})
  56. grouping_dict[field_name] = '$%s' % field_name
  57. sorting_list.append((field_name, sort_option))
  58. sorting_list.extends(query_result_ordering)
  59. pipeline.append({'$match': query_filters})
  60. if results_field_list is not None:
  61. pipeline.append({'$project': SON([{field_name: 1} for field_name in field_list])})
  62. pipeline.extend(unwinding_list)
  63. pipeline.append({'$group': grouping_dict})
  64. pipeline.extend({'$sort': SON(sorting_list)})
  65. if offset > 0:
  66. pipeline.append({'$skip': offset})
  67. if limit is not None:
  68. pipeline.append({'$limit': limit})
  69. results = list()
  70. for document in cursor:
  71. results.append(document)
  72. return results
  73. ## @brief Deletes one record defined by its uid
  74. # @param target_cls Emclass : class of the record to delete
  75. # @param uid dict|list : a dictionary of fields and values composing the unique identifier of the record or a list of several dictionaries
  76. # @return int : number of deleted records
  77. # @TODO Implement the error management
  78. def delete(self, target_cls, uid):
  79. if isinstance(uid, dict):
  80. uid = [uid]
  81. collection_name = utils.object_collection_name(target_cls.__class__)
  82. collection = self.database[collection_name]
  83. result = collection.delete_many(uid)
  84. return result.deleted_count
  85. ## @brief updates one or a list of records
  86. # @param target_cls Emclass : class of the object to insert
  87. # @param uids list : list of uids to update
  88. # @param datas dict : datas to update (new values)
  89. # @return int : Number of updated records
  90. # @todo check if the values need to be parsed
  91. def update(self, target_cls, uids, **datas):
  92. if not isinstance(uids, list):
  93. uids = [uids]
  94. collection_name = utils.object_collection_name(target_cls.__class__)
  95. collection = self.database[collection_name]
  96. results = collection.update_many({'uid': {'$in': uids}}, datas)
  97. return results.modified_count()
  98. ## @brief Inserts a record in a given collection
  99. # @param target_cls Emclass : class of the object to insert
  100. # @param datas dict : datas to insert
  101. # @return bool
  102. # @TODO Implement the error management
  103. def insert(self, target_cls, **datas):
  104. collection_name = utils.object_collection_name(target_cls.__class__)
  105. collection = self.database[collection_name]
  106. result = collection.insert_one(datas)
  107. return len(result.inserted_id)
  108. ## @brief Inserts a list of records in a given collection
  109. # @param target_cls Emclass : class of the objects inserted
  110. # @param datas_list
  111. # @return list : list of the inserted records' ids
  112. # @TODO Implement the error management
  113. def insert_multi(self, target_cls, datas_list):
  114. collection_name = utils.object_collection_name(target_cls.__class__)
  115. collection = self.database[collection_name]
  116. result = collection.insert_many(datas_list)
  117. return len(result.inserted_ids)