#!/usr/bin/env python3 import os from serial import Serial from umodbus.client.serial import rtu import umodbus from pyheatpump.config import config from pyheatpump.logger import logger_init logger = logger_init() serial_conn = None def connect(): global serial_conn if serial_conn is None: real_serial_port = os.path.realpath( config.get('heatpump', 'serial_port')) logger.info('Connecting to serial port *%s* (%s)', config.get('heatpump', 'serial_port'), real_serial_port) serial_conn = Serial( port=real_serial_port, baudrate=config.get('heatpump', 'baudrate'), bytesize=8, parity='N', stopbits=1, timeout=20) if serial_conn.open is False: logger.debug('Opening serial port *%s*', config.get('heatpump', 'serial_port') serial_conn.open() return serial_conn def read_coils(start, end): global serial_con connect() res = [] address = -1 qty = -1 logger.info('read_coils: [%s, %s]', start, end) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_coils( slave_id=1, starting_address=address, quantity=qty) response = rtu.send_message(req_adu, serial_conn) res.extend(response) except umodbus.exceptions.IllegalDataAddressError as e: logger.error(e) logger.debug('address:%s qty:%s', address, qty) logger.debug('read_coils [%s, %s] result: %s', start, end, res) return res def read_holding_registers(start, end): global serial_conn connect() res = [] address = -1 qty = -1 logger.debug('read_holding_registers [%s, %s]', start, end ) try: for address in range(start, end + 1, 125): qty = 125 if (end - address) >= 125 else (end - address) if not qty: break req_adu = rtu.read_holding_registers( slave_id=1, starting_address=address, quantity=qty) response = rtu.send_message(req_adu, serial_conn) res.extend(response) except umodbus.exceptions.IllegalDataAddressError as e: logger.error('read_holding_registers(%s, %s): address:%s, qty:%s\n%s', start, end, address, qty, e) logger.debug('read_holding_registers [%s, %s] result: %s', start, end, res) return res def write_coil(var_value): global serial_conn connect() logger.debug('write_coil address: %s, value: %s', var_value.address, var_value.value ) try: req_adu = rtu.write_single_coil( slave_id=1, address=var_value.address, value=var_value.value) response = rtu.send_message(req_adu, serial_conn) logger.debug('write_coil address: %s, response: %s', var_value.address, response) if response != var_value.value: return False return True except Exception as e: raise e def write_holding_register(var_value): global serial_conn connect() logger.debug('write_holding_register address: %s, value: %s', var_value.address, var_value.value ) try: req_adu = rtu.write_single_register( slave_id=1, address=var_value.address, value=var_value.value) response = rtu.send_message(req_adu, serial_conn) logger.debug('write_holding_register, addres: %s, response: %s', var_value.address, response) if response != var_value.value: return False return True except Exception as e: raise e