#!/usr/bin/env python3 import pytest from unittest.mock import patch, MagicMock from serial import Serial import umodbus from umodbus.client.serial import rtu from pyheatpump import modbus @patch('umodbus.client.serial.rtu.send_message') @patch('umodbus.client.serial.rtu.read_coils') def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn): modbus.read_coils(1, 3) RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() modbus.read_coils(1, 3) RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() @patch('umodbus.client.serial.rtu.send_message') @patch('umodbus.client.serial.rtu.read_holding_registers') def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn): modbus.read_holding_registers(1, 3) RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() modbus.read_holding_registers(1, 500) RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125) RtuSendMessage.assert_called() def test_rtu_call_holding_registers(serial_conn): r = modbus.read_holding_registers(1, 1) assert type(r) == list assert len(r) == 0 r = modbus.read_holding_registers(1, 5) assert type(r) == list assert len(r) == 4 r = modbus.read_holding_registers(1, 126) assert type(r) == list assert len(r) == 125 r = modbus.read_holding_registers(1, 151) assert type(r) == list assert len(r) == 150 def test_get_analog(set_test_db, serial_conn, var_types): analog = var_types['Analog'] print(analog.__dict__) res = modbus.read_holding_registers(analog.start_address, analog.end_address) assert type(res) == list assert len(res) == analog.end_address - analog.start_address for r in res: assert type(r) == int def test_get_integer(set_test_db, serial_conn, var_types): integer = var_types['Integer'] print(integer.__dict__) try: res = modbus.read_holding_registers(integer.start_address, integer.end_address) assert type(res) == list assert len(res) == integer.end_address - integer.start_address for r in res: assert type(r) == int except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]') assert False def test_get_digital(set_test_db, serial_conn, var_types): digital = var_types['Digital'] print(digital.__dict__) try: res = modbus.read_coils(digital.start_address, digital.end_address) except umodbus.exceptions.IllegalDataAddressError: print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]') assert type(res) == list assert len(res) == digital.end_address - digital.start_address for r in res: assert type(r) == int assert r in [0, 1]