123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/env python3
- import os
- from serial import Serial
- from umodbus.client.serial import rtu
- import umodbus
-
- from pyheatpump.config import config
-
- from pyheatpump.logger import logger_init
- logger = logger_init()
-
- serial_conn = None
-
- def connect():
- global serial_conn
-
- if serial_conn is None:
- real_serial_port = os.path.realpath(
- config.get('heatpump', 'serial_port'))
-
- print('Connecting to serial port *{}* ({})'.format(
- config.get('heatpump', 'serial_port'),
- real_serial_port))
-
- serial_conn = Serial(
- port=real_serial_port,
- baudrate=config.get('heatpump', 'baudrate'),
- bytesize=8,
- parity='N',
- stopbits=1,
- timeout=20)
-
- if serial_conn.open is False:
- print('Opening serial port')
- serial_conn.open()
-
- return serial_conn
-
- def read_coils(start, end):
- global serial_con
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.info('read_coils: [%s, %s]', start, end)
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_coils(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- response = rtu.send_message(req_adu, serial_conn)
- res.extend(response)
- except umodbus.exceptions.IllegalDataAddressError as e:
- print(e)
- print(f'{address} {qty}')
-
- logger.debug('read_coils [%s, %s] result: %s', start, end, res)
- return res
-
-
-
- def read_holding_registers(start, end):
- global serial_conn
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.debug('read_holding_registers [%s, %s]',
- start, end
- )
-
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_holding_registers(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- response = rtu.send_message(req_adu, serial_conn)
- res.extend(response)
- except umodbus.exceptions.IllegalDataAddressError as e:
- print(e)
- print(f'{address} {qty}')
-
- logger.debug('read_holding_registers [%s, %s] result: %s', start, end, res)
- return res
-
- def write_coil(var_value):
- global serial_conn
- connect()
-
- logger.debug('write_coil address: %s, value: %s',
- var_value.address, var_value.value
- )
-
- try:
- req_adu = rtu.write_single_coil(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- response = rtu.send_message(req_adu, serial_conn)
-
- logger.debug('write_coil address: %s, response: %s',
- var_value.address, response)
-
- if response != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
-
-
- def write_holding_register(var_value):
- global serial_conn
- connect()
-
- logger.debug('write_holding_register address: %s, value: %s',
- var_value.address, var_value.value
- )
-
- try:
- req_adu = rtu.write_single_register(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- response = rtu.send_message(req_adu, serial_conn)
- logger.debug('write_holding_register, addres: %s, response: %s',
- var_value.address, response)
- if response != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
-
- if __name__ == '__main__':
- response = read_holding_registers(1, 10)
- print(len(response))
- response = read_coils(90, 100)
- print(len(response))
-
- """
- Example codeo
- write_coil(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':1}))
-
- resp = read_coils(91, 93)
- print(f'91 : {response[0]} - 92 : {response[1]}')
-
- write_coil(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':0}))
-
- response = read_coils(91, 93)
- print(f'91 : {response[0]} - 92 : {response[1]}')
-
- response = read_holding_registers(240, 242)
- print(response)
-
- write_holding_register(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':1})
- """
|