1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- #!/usr/bin/env python3
- import pytest
- from unittest.mock import patch, MagicMock
-
- from serial import Serial
- import umodbus
- from umodbus.client.serial import rtu
-
- from pyheatpump import modbus
-
-
- @patch('umodbus.client.serial.rtu.send_message')
- @patch('umodbus.client.serial.rtu.read_coils')
- def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
- modbus.read_coils(1, 3)
- RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
- RtuSendMessage.assert_called()
-
- modbus.read_coils(1, 3)
- RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
- RtuSendMessage.assert_called()
-
-
- @patch('umodbus.client.serial.rtu.send_message')
- @patch('umodbus.client.serial.rtu.read_holding_registers')
- def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn):
- modbus.read_holding_registers(1, 3)
- RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2)
- RtuSendMessage.assert_called()
-
- modbus.read_holding_registers(1, 500)
- RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125)
- RtuSendMessage.assert_called()
-
- def test_rtu_call_holding_registers(serial_conn):
- r = modbus.read_holding_registers(1, 1)
- assert type(r) == list
- assert len(r) == 0
-
- r = modbus.read_holding_registers(1, 5)
- assert type(r) == list
- assert len(r) == 4
-
- r = modbus.read_holding_registers(1, 126)
- assert type(r) == list
- assert len(r) == 125
-
- r = modbus.read_holding_registers(1, 151)
- assert type(r) == list
- assert len(r) == 150
-
-
- def test_get_analog(set_test_db, serial_conn, var_types):
- analog = var_types['Analog']
- print(analog.__dict__)
- res = modbus.read_holding_registers(analog.start_address, analog.end_address)
- assert type(res) == list
- assert len(res) == analog.end_address - analog.start_address
- for r in res:
- assert type(r) == int
-
- def test_get_integer(set_test_db, serial_conn, var_types):
- integer = var_types['Integer']
- print(integer.__dict__)
- try:
- res = modbus.read_holding_registers(integer.start_address, integer.end_address)
- assert type(res) == list
- assert len(res) == integer.end_address - integer.start_address
- for r in res:
- assert type(r) == int
-
- except umodbus.exceptions.IllegalDataAddressError as e:
- print(e)
- print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]')
- assert False
-
-
- def test_get_digital(set_test_db, serial_conn, var_types):
- digital = var_types['Digital']
- print(digital.__dict__)
- try:
- res = modbus.read_coils(digital.start_address, digital.end_address)
- except umodbus.exceptions.IllegalDataAddressError:
- print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]')
- assert type(res) == list
- assert len(res) == digital.end_address - digital.start_address
- for r in res:
- assert type(r) == int
- assert r in [0, 1]
|