#!/usr/bin/env python3 import os from serial import Serial from umodbus.client.serial import rtu import umodbus from pyheatpump.config import config from pyheatpump.logger import logger_init logger = logger_init() serial_conn = None def connect(): global serial_conn if serial_conn is None: real_serial_port = os.path.realpath( config.get('heatpump', 'serial_port')) print('Connecting to serial port *{}* ({})'.format( config.get('heatpump', 'serial_port'), real_serial_port)) serial_conn = Serial( port=real_serial_port, baudrate=config.get('heatpump', 'baudrate'), bytesize=8, parity='N', stopbits=1, timeout=20) if serial_conn.open is False: print('Opening serial port') serial_conn.open() return serial_conn def read_coils(start, end): global serial_con connect() res = [] address = -1 qty = -1 logger.info('read_coils: [%s, %s]', start, end) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_coils( slave_id=1, starting_address=address, quantity=qty) response = rtu.send_message(req_adu, serial_conn) res.extend(response) except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'{address} {qty}') logger.debug('read_coils [%s, %s] result: %s', start, end, res) return res def read_holding_registers(start, end): global serial_conn connect() res = [] address = -1 qty = -1 logger.debug('read_holding_registers [%s, %s]', start, end ) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_holding_registers( slave_id=1, starting_address=address, quantity=qty) response = rtu.send_message(req_adu, serial_conn) res.extend(response) except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'{address} {qty}') logger.debug('read_holding_registers [%s, %s] result: %s', start, end, res) return res def write_coil(var_value): global serial_conn connect() logger.debug('write_coil address: %s, value: %s', var_value.address, var_value.value ) try: req_adu = rtu.write_single_coil( slave_id=1, address=var_value.address, value=var_value.value) response = rtu.send_message(req_adu, serial_conn) logger.debug('write_coil address: %s, response: %s', var_value.address, response) if response != var_value.value: return False return True except Exception as e: raise e def write_holding_register(var_value): global serial_conn connect() logger.debug('write_holding_register address: %s, value: %s', var_value.address, var_value.value ) try: req_adu = rtu.write_single_register( slave_id=1, address=var_value.address, value=var_value.value) response = rtu.send_message(req_adu, serial_conn) logger.debug('write_holding_register, addres: %s, response: %s', var_value.address, response) if response != var_value.value: return False return True except Exception as e: raise e if __name__ == '__main__': response = read_holding_registers(1, 10) print(len(response)) response = read_coils(90, 100) print(len(response)) """ Example codeo write_coil(VariableValue(**{ 'type':'D', 'address':91, 'value':1})) resp = read_coils(91, 93) print(f'91 : {response[0]} - 92 : {response[1]}') write_coil(VariableValue(**{ 'type':'D', 'address':91, 'value':0})) response = read_coils(91, 93) print(f'91 : {response[0]} - 92 : {response[1]}') response = read_holding_registers(240, 242) print(response) write_holding_register(VariableValue(**{ 'type':'D', 'address':91, 'value':1}) """