from pyheatpump.db import DB, RowClass from pyheatpump.logger import logger_init logger = logger_init() class VariableType(RowClass): slabel: str = '' label: str = '' type: str = '' start_address: int = None end_address: int = None def __init__(self, **kwargs): super().__init__(**kwargs) if self.slabel is None and self.label is not None: self.slabel = self.label[0] def __str__(self): return self.slabel or self.label[0] def cast(self): # Function to convert numbers > 2**15 to negative numbers (issue #30) #complement = lambda x: x - (1 << 16 if x & (1 << 15) else 0) complement = lambda x: x - (1 << 16) if x >> 15 else x #complement = lambda x: int.from_bytes(bytes(x), 'big', signed=True) if self.type == 'bool': # returns a boolean return lambda x: x % 2 elif self.type == 'float': # returns a signed float return lambda x: round(complement(x) / 10, 2) elif self.type == 'int': # returns the signed integer return lambda x: int(complement(x)) return None def select(self): try: elt = next(super().select('slabel', 'var_type')) except StopIteration: logger.error('VariableType.select(%s, %s): does not exist') return elt def save(self): q = ['UPDATE var_type SET'] updates = [] if self.start_address is not None: updates.append(f'start_address = {self.start_address}') if self.end_address is not None: updates.append(f'end_address = {self.end_address}') if len(updates) == 0: return False q.append(','.join(updates)) q.append(f"WHERE slabel = {self.slabel}'") return DB.sql(' '.join(q), self.__dict__) def get_variables(self): from .variable import Variable return Variable.getall_of_type(self) def get_variables_values(self): from .variable import Variable values = Variable.getall_values_of_type(self) res = {} for address, variable in values.items(): logger.debug('address: %s, start_addres: %s, calc: %s', address, self.start_address, address - self.start_address + 1) res[str(address - self.start_address + 1)] = variable return res def get_variables_values_since(self, since: int): from .variable import Variable res = {} values = Variable.getall_values_of_type_since(self, since) for address, variable in values.items(): logger.debug('address: %s, start_addres: %s, calc: %s', address, self.start_address, address - self.start_address + 1) res[str(address - self.start_address + 1)] = variable return res def control(self, data): from .variable import Variable from .variable_value import VariableValue for s_address, s_value in data.items(): address = int(s_address) if self.slabel == 'I': # For integers, we have to respect the offset to get the address address = address + self.start_address - 1 # All values are stored in strings that represent floats # We have to convert them f_value = float(s_value) if self.slabel == 'A': f_value = f_value * 10 # Value is stored as integers in database value = int(f_value) if not Variable(**{ 'type': self, 'address': address}).exists(): continue new_var_value = VariableValue(**{ 'type': self, 'address': address, 'value': value}) old_var_value = VariableValue.get( self, address) if old_var_value.equals(new_var_value): continue new_var_value.set() @staticmethod def getall(): return { row['label']: VariableType(**dict(row)) for row in DB.sql('SELECT * FROM var_type') } @staticmethod def get(slabel: str): if len(slabel) > 1: slabel = slabel[0] try: return VariableType(**dict( next(DB.sql( """ SELECT * FROM var_type WHERE slabel = ? """, slabel)))) except StopIteration as exc: raise NameError from exc