Projet de remplacement du "RPiPasserelle" d'Otec.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

variable_type.py 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from pyheatpump.db import DB, RowClass
  2. from pyheatpump.logger import logger_init
  3. logger = logger_init()
  4. class VariableType(RowClass):
  5. slabel: str = ''
  6. label: str = ''
  7. type: str = ''
  8. start_address: int = None
  9. end_address: int = None
  10. def __init__(self, **kwargs):
  11. super().__init__(**kwargs)
  12. if self.slabel is None and self.label is not None:
  13. self.slabel = self.label[0]
  14. def __str__(self):
  15. return self.slabel or self.label[0]
  16. def cast(self):
  17. # Function to convert numbers > 2**15 to negative numbers (issue #30)
  18. #complement = lambda x: x - (1 << 16 if x & (1 << 15) else 0)
  19. complement = lambda x: x - (1 << 16) if x >> 15 else x
  20. #complement = lambda x: int.from_bytes(bytes(x), 'big', signed=True)
  21. if self.type == 'bool':
  22. # returns a boolean
  23. return lambda x: x % 2
  24. elif self.type == 'float':
  25. # returns a signed float
  26. return lambda x: round(complement(x) / 10, 2)
  27. elif self.type == 'int':
  28. # returns the signed integer
  29. return lambda x: int(complement(x))
  30. return None
  31. def select(self):
  32. try:
  33. elt = next(super().select('slabel', 'var_type'))
  34. except StopIteration:
  35. logger.error('VariableType.select(%s, %s): does not exist')
  36. return elt
  37. def save(self):
  38. q = ['UPDATE var_type SET']
  39. updates = []
  40. if self.start_address is not None:
  41. updates.append(f'start_address = {self.start_address}')
  42. if self.end_address is not None:
  43. updates.append(f'end_address = {self.end_address}')
  44. if len(updates) == 0:
  45. return False
  46. q.append(','.join(updates))
  47. q.append(f"WHERE slabel = {self.slabel}'")
  48. return DB.sql(' '.join(q), self.__dict__)
  49. def get_variables(self):
  50. from .variable import Variable
  51. return Variable.getall_of_type(self)
  52. def get_variables_values(self):
  53. from .variable import Variable
  54. values = Variable.getall_values_of_type(self)
  55. res = {}
  56. for address, variable in values.items():
  57. logger.debug('address: %s, start_addres: %s, calc: %s',
  58. address, self.start_address, address - self.start_address + 1)
  59. res[str(address - self.start_address + 1)] = variable
  60. return res
  61. def get_variables_values_since(self, since: int):
  62. from .variable import Variable
  63. res = {}
  64. values = Variable.getall_values_of_type_since(self, since)
  65. for address, variable in values.items():
  66. logger.debug('address: %s, start_addres: %s, calc: %s',
  67. address, self.start_address, address - self.start_address + 1)
  68. res[str(address - self.start_address + 1)] = variable
  69. return res
  70. def control(self, data):
  71. from .variable import Variable
  72. from .variable_value import VariableValue
  73. for s_address, s_value in data.items():
  74. address = int(s_address)
  75. if self.slabel == 'I':
  76. # For integers, we have to respect the offset to get the address
  77. address = address + self.start_address - 1
  78. # All values are stored in strings that represent floats
  79. # We have to convert them
  80. f_value = float(s_value)
  81. if self.slabel == 'A':
  82. f_value = f_value * 10
  83. # Value is stored as integers in database
  84. value = int(f_value)
  85. if not Variable(**{
  86. 'type': self,
  87. 'address': address}).exists():
  88. continue
  89. new_var_value = VariableValue(**{
  90. 'type': self,
  91. 'address': address,
  92. 'value': value})
  93. old_var_value = VariableValue.get(
  94. self, address)
  95. if old_var_value.equals(new_var_value):
  96. continue
  97. new_var_value.set()
  98. @staticmethod
  99. def getall():
  100. return {
  101. row['label']: VariableType(**dict(row))
  102. for row in DB.sql('SELECT * FROM var_type')
  103. }
  104. @staticmethod
  105. def get(slabel: str):
  106. if len(slabel) > 1:
  107. slabel = slabel[0]
  108. try:
  109. return VariableType(**dict(
  110. next(DB.sql(
  111. """
  112. SELECT * FROM var_type
  113. WHERE slabel = ?
  114. """, slabel))))
  115. except StopIteration as exc:
  116. raise NameError from exc