Projet de remplacement du "RPiPasserelle" d'Otec.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

variable_type.py 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. from pyheatpump.db import DB, RowClass
  2. from pyheatpump.logger import logger_init
  3. logger = logger_init()
  4. class VariableType(RowClass):
  5. slabel: str = ''
  6. label: str = ''
  7. type: str = ''
  8. start_address: int = None
  9. end_address: int = None
  10. def __init__(self, **kwargs):
  11. super().__init__(**kwargs)
  12. if self.slabel is None and self.label is not None:
  13. self.slabel = self.label[0]
  14. def __str__(self):
  15. return self.slabel or self.label[0]
  16. def cast(self):
  17. # Function to convert numbers > 2**15 to negative numbers (issue #30)
  18. #complement = lambda x: x - (1 << 16 if x & (1 << 15) else 0)
  19. #complement = lambda x: x - (1 << 16) if x >> 15 else x
  20. complement = lambda x: int.from_bytes(bytes(x), 'big', signed=True)
  21. if self.type == 'bool':
  22. # returns a boolean
  23. return lambda x: x % 2
  24. elif self.type == 'float':
  25. # returns a signed float
  26. return lambda x: round(complement(x) / 10, 2)
  27. elif self.type == 'int':
  28. # returns the signed integer
  29. return lambda x: int(complement(x))
  30. return None
  31. def select(self):
  32. try:
  33. elt = next(super().select('slabel', 'var_type'))
  34. except StopIteration:
  35. logger.error('VariableType.select(%s, %s): does not exist')
  36. return elt
  37. def save(self):
  38. q = ['UPDATE var_type SET']
  39. updates = []
  40. if self.start_address is not None:
  41. updates.append(f'start_address = {self.start_address}')
  42. if self.end_address is not None:
  43. updates.append(f'end_address = {self.end_address}')
  44. if len(updates) == 0:
  45. return False
  46. q.append(','.join(updates))
  47. q.append(f"WHERE slabel = {self.slabel}'")
  48. return DB.sql(' '.join(q), self.__dict__)
  49. def get_variables(self):
  50. from .variable import Variable
  51. return Variable.getall_of_type(self)
  52. def get_variables_values(self):
  53. from .variable import Variable
  54. values = Variable.getall_values_of_type(self)
  55. res = {}
  56. for address, variable in values.items():
  57. logger.debug('address: %s, start_addres: %s, calc: %s',
  58. address, self.start_address, address - self.start_address + 1)
  59. res[str(address - self.start_address + 1)] = variable
  60. return res
  61. def get_variables_values_since(self, since: int):
  62. from .variable import Variable
  63. res = {}
  64. values = Variable.getall_values_of_type_since(self, since)
  65. for address, variable in values.items():
  66. logger.debug('address: %s, start_addres: %s, calc: %s',
  67. address, self.start_address, address - self.start_address + 1)
  68. res[str(address - self.start_address + 1)] = variable
  69. return res
  70. def control(self, data):
  71. from .variable import Variable
  72. from .variable_value import VariableValue
  73. for s_address, s_value in data.items():
  74. address = int(s_address)
  75. if self.slabel == 'I':
  76. # For integers, we have to respect the offset to get the address
  77. address = address + self.start_address - 1
  78. # All values are stored in strings that represent floats
  79. # We have to convert them
  80. f_value = float(s_value)
  81. if self.slabel == 'A':
  82. f_value = f_value * 10
  83. # Value is stored as integers in database
  84. value = int(f_value)
  85. """
  86. if address not in range(self.start_address, self.end_address + 1):
  87. continue
  88. """
  89. if not Variable(**{
  90. 'type': self,
  91. 'address': address}).exists():
  92. continue
  93. new_var_value = VariableValue(**{
  94. 'type': self,
  95. 'address': address,
  96. 'value': value})
  97. old_var_value = VariableValue.get(
  98. self, address)
  99. if old_var_value.equals(new_var_value):
  100. continue
  101. new_var_value.set()
  102. @staticmethod
  103. def getall():
  104. return {
  105. row['label']: VariableType(**dict(row))
  106. for row in DB.sql('SELECT * FROM var_type')
  107. }
  108. @staticmethod
  109. def get(slabel: str):
  110. if len(slabel) > 1:
  111. slabel = slabel[0]
  112. try:
  113. return VariableType(**dict(
  114. next(DB.sql(
  115. """
  116. SELECT * FROM var_type
  117. WHERE slabel = ?
  118. """, slabel))))
  119. except StopIteration as exc:
  120. raise NameError from exc