123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- #!/usr/bin/env python3
- from serial import Serial
- from serial.serialutil import SerialException
- from umodbus.client.serial import rtu
- import umodbus
- from pprint import pprint
-
- from pyheatpump.config import config
-
- from pyheatpump.logger import logger_init
- logger = logger_init()
-
- serial_conn = None
-
- def connect():
- global serial_conn
-
- if serial_conn is None:
- print('Connecting to serial port *{}*'.format(
- config.get('heatpump', 'serial_port')))
- serial_conn = Serial(
- port=config.get('heatpump', 'serial_port'),
- baudrate=config.get('heatpump', 'baudrate'),
- bytesize=8,
- parity='N',
- stopbits=1,
- timeout=10)
-
- if serial_conn.open is False:
- print('Opening serial port')
- serial_conn.open()
-
- return serial_conn
-
- def read_coils(start, end):
- global serial_con
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.info('Read coils [{}, {}]'.format(
- start, end
- ))
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_coils(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- resp = rtu.send_message(req_adu, serial_conn)
- res.extend(resp)
- except umodbus.exceptions.IllegalDataAddressError as e:
- print(e)
- print(f'{address} {qty}')
-
- return res
-
-
-
- def read_holding_registers(start, end):
- global serial_conn
- connect()
- res = []
-
- address = -1
- qty = -1
-
- logger.info('Read registers [{}, {}]'.format(
- start, end
- ))
-
- try:
- for address in range(start, end + 1, 125):
- qty = 125 if (end - address) >= 125 else (end - address)
- if not qty:
- break
-
- req_adu = rtu.read_holding_registers(
- slave_id=1,
- starting_address=address,
- quantity=qty)
-
- resp = rtu.send_message(req_adu, serial_conn)
- res.extend(resp)
- except umodbus.exceptions.IllegalDataAddressError as e:
- print(e)
- print(f'{address} {qty}')
-
- return res
-
- def write_coil(var_value):
- global serial_conn
- connect()
-
- logger.info('Write coil at address {} with value {}]'.format(
- var_value.address, var_value.value
- ))
-
- try:
- req_adu = rtu.write_single_coil(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- resp = rtu.send_message(req_adu, serial_conn)
- logger.info('Modbus response : {}'.format(resp))
- if resp != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
-
-
- def write_holding_register(var_value):
- global serial_conn
- connect()
-
- logger.info('Write register at address {} with value {}]'.format(
- var_value.address, var_value.value
- ))
-
- try:
- req_adu = rtu.write_single_register(
- slave_id=1,
- address=var_value.address,
- value=var_value.value)
- resp = rtu.send_message(req_adu, serial_conn)
- logger.info('Modbus response : {}'.format(resp))
- if resp != var_value.value:
- return False
- return True
- except Exception as e:
- raise e
-
- if __name__ == '__main__':
- resp = read_holding_registers(1, 10)
- #pprint(resp)
- print(len(resp))
- resp = read_coils(90, 100)
- #pprint(resp)
- print(len(resp))
-
- from pyheatpump.models.variable_value import VariableValue
- """
- write_coil(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':1}))
- """
-
- resp = read_coils(91, 93)
- print(f'91 : {resp[0]} - 92 : {resp[1]}')
-
- """
- write_coil(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':0}))
- """
-
- resp = read_coils(91, 93)
- print(f'91 : {resp[0]} - 92 : {resp[1]}')
-
- resp = read_holding_registers(240, 242)
- print(resp)
-
- """
- write_holding_register(VariableValue(**{
- 'type':'D',
- 'address':91,
- 'value':1})
- """
|