Projet de remplacement du "RPiPasserelle" d'Otec.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_modbus.py 3.0KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #!/usr/bin/env python3
  2. import pytest
  3. from unittest.mock import patch, MagicMock
  4. from serial import Serial
  5. import umodbus
  6. from umodbus.client.serial import rtu
  7. from pyheatpump import modbus
  8. @patch('umodbus.client.serial.rtu.send_message')
  9. @patch('umodbus.client.serial.rtu.read_coils')
  10. def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
  11. modbus.read_coils(1, 3)
  12. RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
  13. RtuSendMessage.assert_called()
  14. modbus.read_coils(1, 3)
  15. RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
  16. RtuSendMessage.assert_called()
  17. @patch('umodbus.client.serial.rtu.send_message')
  18. @patch('umodbus.client.serial.rtu.read_holding_registers')
  19. def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn):
  20. modbus.read_holding_registers(1, 3)
  21. RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2)
  22. RtuSendMessage.assert_called()
  23. modbus.read_holding_registers(1, 500)
  24. RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125)
  25. RtuSendMessage.assert_called()
  26. def test_rtu_call_holding_registers(serial_conn):
  27. r = modbus.read_holding_registers(1, 1)
  28. assert type(r) == list
  29. assert len(r) == 0
  30. r = modbus.read_holding_registers(1, 5)
  31. assert type(r) == list
  32. assert len(r) == 4
  33. r = modbus.read_holding_registers(1, 126)
  34. assert type(r) == list
  35. assert len(r) == 125
  36. r = modbus.read_holding_registers(1, 151)
  37. assert type(r) == list
  38. assert len(r) == 150
  39. def test_get_analog(set_test_db, serial_conn, var_types):
  40. analog = var_types['Analog']
  41. print(analog.__dict__)
  42. res = modbus.read_holding_registers(analog.start_address, analog.end_address)
  43. assert type(res) == list
  44. assert len(res) == analog.end_address - analog.start_address
  45. for r in res:
  46. assert type(r) == int
  47. def test_get_integer(set_test_db, serial_conn, var_types):
  48. integer = var_types['Integer']
  49. print(integer.__dict__)
  50. try:
  51. res = modbus.read_holding_registers(integer.start_address, integer.end_address)
  52. assert type(res) == list
  53. assert len(res) == integer.end_address - integer.start_address
  54. for r in res:
  55. assert type(r) == int
  56. except umodbus.exceptions.IllegalDataAddressError as e:
  57. print(e)
  58. print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]')
  59. assert False
  60. def test_get_digital(set_test_db, serial_conn, var_types):
  61. digital = var_types['Digital']
  62. print(digital.__dict__)
  63. try:
  64. res = modbus.read_coils(digital.start_address, digital.end_address)
  65. except umodbus.exceptions.IllegalDataAddressError:
  66. print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]')
  67. assert type(res) == list
  68. assert len(res) == digital.end_address - digital.start_address
  69. for r in res:
  70. assert type(r) == int
  71. assert r in [0, 1]