123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- #!/usr/bin/env python3
- import os
- from serial import Serial
- from umodbus.client.serial import rtu
- import umodbus
-
- from pyheatpump.config import config
-
- from pyheatpump.logger import logger_init
- logger = logger_init()
-
- serial_conn = None
-
- def connect():
- global serial_conn
-
- if serial_conn is None:
- real_serial_port = os.path.realpath(
- config.get('heatpump', 'serial_port'))
-
- logger.info('Connecting to serial port *%s* (%s)',
- config.get('heatpump', 'serial_port'),
- real_serial_port)
-
- serial_conn = Serial(
- port=real_serial_port,
- baudrate=config.get('heatpump', 'baudrate'),
- bytesize=8,
- parity='N',
- stopbits=1,
- timeout=20)
-
- if serial_conn.open is False:
- logger.debug('Opening serial port *%s*',
- config.get('heatpump', 'serial_port'))
- serial_conn.open()
-
- return serial_conn
-
- def read_coils(start, end):
- global serial_con
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.info('read_coils: [%s, %s]', start, end)
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_coils(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- response = rtu.send_message(req_adu, serial_conn)
- res.extend(response)
- except umodbus.exceptions.IllegalDataAddressError as e:
- logger.error(e)
- logger.debug('address:%s qty:%s', address, qty)
-
- logger.debug('read_coils [%s, %s] result: %s', start, end, res)
- return res
-
-
-
- def read_holding_registers(start, end):
- global serial_conn
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.debug('read_holding_registers [%s, %s]',
- start, end
- )
-
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_holding_registers(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- response = rtu.send_message(req_adu, serial_conn)
- res.extend(response)
- except umodbus.exceptions.IllegalDataAddressError as e:
- logger.error('read_holding_registers(%s, %s): address:%s, qty:%s\n%s',
- start, end, address, qty, e)
-
- logger.debug('read_holding_registers [%s, %s] result: %s', start, end, res)
- return res
-
- def write_coil(var_value):
- global serial_conn
- connect()
-
- logger.debug('write_coil address: %s, value: %s',
- var_value.address, var_value.value
- )
-
- try:
- req_adu = rtu.write_single_coil(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- response = rtu.send_message(req_adu, serial_conn)
-
- logger.debug('write_coil address: %s, response: %s',
- var_value.address, response)
-
- if response != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
-
-
- def write_holding_register(var_value):
- global serial_conn
- connect()
-
- logger.debug('write_holding_register address: %s, value: %s',
- var_value.address, var_value.value
- )
-
- try:
- req_adu = rtu.write_single_register(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- response = rtu.send_message(req_adu, serial_conn)
- logger.debug('write_holding_register, addres: %s, response: %s',
- var_value.address, response)
- if response != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
|