#!/usr/bin/env python3 import os from serial import Serial from serial.serialutil import SerialException from umodbus.client.serial import rtu import umodbus from pprint import pprint from pyheatpump.config import config from pyheatpump.logger import logger_init logger = logger_init() serial_conn = None def connect(): global serial_conn if serial_conn is None: real_serial_port = os.path.realpath( config.get('heatpump', 'serial_port')) print('Connecting to serial port *{}* ({})'.format( config.get('heatpump', 'serial_port'), real_serial_port)) serial_conn = Serial( port=real_serial_port, baudrate=config.get('heatpump', 'baudrate'), bytesize=8, parity='N', stopbits=1, timeout=10) if serial_conn.open is False: print('Opening serial port') serial_conn.open() return serial_conn def read_coils(start, end): global serial_con connect() res = [] address = -1 qty = -1 logger.info('Read coils [{}, {}]'.format( start, end )) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_coils( slave_id=1, starting_address=address, quantity=qty) resp = rtu.send_message(req_adu, serial_conn) res.extend(resp) except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'{address} {qty}') return res def read_holding_registers(start, end): global serial_conn connect() res = [] address = -1 qty = -1 logger.info('Read registers [{}, {}]'.format( start, end )) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_holding_registers( slave_id=1, starting_address=address, quantity=qty) resp = rtu.send_message(req_adu, serial_conn) res.extend(resp) except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'{address} {qty}') return res def write_coil(var_value): global serial_conn connect() logger.info('Write coil at address {} with value {}]'.format( var_value.address, var_value.value )) try: req_adu = rtu.write_single_coil( slave_id=1, address=var_value.address, value=var_value.value) resp = rtu.send_message(req_adu, serial_conn) logger.info('Modbus response : {}'.format(resp)) if resp != var_value.value: return False return True except Exception as e: raise e def write_holding_register(var_value): global serial_conn connect() logger.info('Write register at address {} with value {}]'.format( var_value.address, var_value.value )) try: req_adu = rtu.write_single_register( slave_id=1, address=var_value.address, value=var_value.value) resp = rtu.send_message(req_adu, serial_conn) logger.info('Modbus response : {}'.format(resp)) if resp != var_value.value: return False return True except Exception as e: raise e if __name__ == '__main__': resp = read_holding_registers(1, 10) #pprint(resp) print(len(resp)) resp = read_coils(90, 100) #pprint(resp) print(len(resp)) from pyheatpump.models.variable_value import VariableValue """ write_coil(VariableValue(**{ 'type':'D', 'address':91, 'value':1})) """ resp = read_coils(91, 93) print(f'91 : {resp[0]} - 92 : {resp[1]}') """ write_coil(VariableValue(**{ 'type':'D', 'address':91, 'value':0})) """ resp = read_coils(91, 93) print(f'91 : {resp[0]} - 92 : {resp[1]}') resp = read_holding_registers(240, 242) print(resp) """ write_holding_register(VariableValue(**{ 'type':'D', 'address':91, 'value':1}) """