Projet de remplacement du "RPiPasserelle" d'Otec.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

variable_type.py 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from pyheatpump.db import DB, RowClass
  2. from pyheatpump.logger import logger_init
  3. logger = logger_init()
  4. class VariableType(RowClass):
  5. slabel: str = ''
  6. label: str = ''
  7. type: str = ''
  8. start_address: int = None
  9. end_address: int = None
  10. def __init__(self, **kwargs):
  11. super().__init__(**kwargs)
  12. if self.slabel is None and self.label is not None:
  13. self.slabel = self.label[0]
  14. def __str__(self):
  15. return self.slabel or self.label[0]
  16. def cast(self):
  17. # Function to convert numbers > 2**15 to negative numbers (issue #30)
  18. #complement = lambda x: x - (1 << 16 if x & (1 << 15) else 0)
  19. complement = lambda x: x - (1 << 16) if x >> 15 else x
  20. if self.type == 'bool':
  21. # returns a boolean
  22. return lambda x: x % 2
  23. elif self.type == 'float':
  24. # returns a signed float
  25. return lambda x: round(complement(x) / 10, 2)
  26. elif self.type == 'int':
  27. # returns the signed integer
  28. return lambda x: int(complement(x))
  29. return None
  30. def select(self):
  31. try:
  32. elt = next(super().select('slabel', 'var_type'))
  33. except StopIteration:
  34. logger.error('VariableType.select(%s, %s): does not exist')
  35. return elt
  36. def save(self):
  37. q = ['UPDATE var_type SET']
  38. updates = []
  39. if self.start_address is not None:
  40. updates.append(f'start_address = {self.start_address}')
  41. if self.end_address is not None:
  42. updates.append(f'end_address = {self.end_address}')
  43. if len(updates) == 0:
  44. return False
  45. q.append(','.join(updates))
  46. q.append(f"WHERE slabel = {self.slabel}'")
  47. return DB.sql(' '.join(q), self.__dict__)
  48. def get_variables(self):
  49. from .variable import Variable
  50. return Variable.getall_of_type(self)
  51. def get_variables_values(self):
  52. from .variable import Variable
  53. values = Variable.getall_values_of_type(self)
  54. res = {}
  55. for address, variable in values.items():
  56. logger.debug('address: %s, start_addres: %s, calc: %s',
  57. address, self.start_address, address - self.start_address + 1)
  58. res[str(address - self.start_address + 1)] = variable
  59. return res
  60. def get_variables_values_since(self, since: int):
  61. from .variable import Variable
  62. res = {}
  63. values = Variable.getall_values_of_type_since(self, since)
  64. for address, variable in values.items():
  65. logger.debug('address: %s, start_addres: %s, calc: %s',
  66. address, self.start_address, address - self.start_address + 1)
  67. res[str(address - self.start_address + 1)] = variable
  68. return res
  69. def control(self, data):
  70. from .variable import Variable
  71. from .variable_value import VariableValue
  72. for s_address, s_value in data.items():
  73. address = int(s_address)
  74. if self.slabel == 'I':
  75. # For integers, we have to respect the offset to get the address
  76. address = address + self.start_address - 1
  77. # All values are stored in strings that represent floats
  78. # We have to convert them
  79. f_value = float(s_value)
  80. if self.slabel == 'A':
  81. f_value = f_value * 10
  82. # Value is stored as integers in database
  83. value = int(f_value)
  84. if address not in range(self.start_address, self.end_address + 1):
  85. continue
  86. if not Variable(**{
  87. 'type': self,
  88. 'address': address}).exists():
  89. continue
  90. new_var_value = VariableValue(**{
  91. 'type': self,
  92. 'address': address,
  93. 'value': value})
  94. old_var_value = VariableValue.get(
  95. self, address)
  96. if old_var_value.equals(new_var_value):
  97. continue
  98. new_var_value.set()
  99. @staticmethod
  100. def getall():
  101. return {
  102. row['label']: VariableType(**dict(row))
  103. for row in DB.sql('SELECT * FROM var_type')
  104. }
  105. @staticmethod
  106. def get(slabel: str):
  107. if len(slabel) > 1:
  108. slabel = slabel[0]
  109. try:
  110. return VariableType(**dict(
  111. next(DB.sql(
  112. """
  113. SELECT * FROM var_type
  114. WHERE slabel = ?
  115. """, slabel))))
  116. except StopIteration as exc:
  117. raise NameError from exc