diff --git a/db/pyheatpump.sql b/db/pyheatpump.sql index d9ef711..d9e3654 100644 --- a/db/pyheatpump.sql +++ b/db/pyheatpump.sql @@ -32,11 +32,11 @@ CREATE TABLE IF NOT EXISTS var_value ( ); INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES ( - 'A', 'Analog', 'float', 1, (1 + 500)); + 'A', 'Analog', 'float', 1, 5000); INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES ( - 'I', 'Integer', 'int', 5002, (5002 + 500)); + 'I', 'Integer', 'int', 5002, 10002); INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES ( - 'D', 'Digital', 'boolean', 1, (1 + 1000)); + 'D', 'Digital', 'bool', 1, 2048); CREATE TRIGGER variable_insert BEFORE INSERT ON var_value FOR EACH ROW diff --git a/poetry.lock b/poetry.lock index dc3b4f3..12c1f61 100644 --- a/poetry.lock +++ b/poetry.lock @@ -273,7 +273,7 @@ description = "The lightning-fast ASGI server." name = "uvicorn" optional = false python-versions = "*" -version = "0.11.6" +version = "0.11.8" [package.dependencies] click = ">=7.0.0,<8.0.0" @@ -436,8 +436,8 @@ urllib3 = [ {file = "urllib3-1.25.10.tar.gz", hash = "sha256:91056c15fa70756691db97756772bb1eb9678fa585d9184f24534b100dc60f4a"}, ] uvicorn = [ - {file = "uvicorn-0.11.6-py3-none-any.whl", hash = "sha256:d19a20b17445708fd222e5a7cfc3eacfb31ac269bc8fefa4920833334e199782"}, - {file = "uvicorn-0.11.6.tar.gz", hash = "sha256:467c333c743ec6a3eb545517a0e3cac603becfa40c73c65ed46c3a4bf1c8e2d0"}, + {file = "uvicorn-0.11.8-py3-none-any.whl", hash = "sha256:4b70ddb4c1946e39db9f3082d53e323dfd50634b95fd83625d778729ef1730ef"}, + {file = "uvicorn-0.11.8.tar.gz", hash = "sha256:46a83e371f37ea7ff29577d00015f02c942410288fb57def6440f2653fff1d26"}, ] uvloop = [ {file = "uvloop-0.14.0-cp35-cp35m-macosx_10_11_x86_64.whl", hash = "sha256:08b109f0213af392150e2fe6f81d33261bb5ce968a288eb698aad4f46eb711bd"}, diff --git a/pyheatpump/modbus.py b/pyheatpump/modbus.py index cc10792..40b3241 100755 --- a/pyheatpump/modbus.py +++ b/pyheatpump/modbus.py @@ -2,6 +2,7 @@ from serial import Serial from serial.serialutil import SerialException from umodbus.client.serial import rtu +import umodbus from pprint import pprint from pyheatpump.conf import config @@ -14,8 +15,12 @@ def connect(): if serial_conn is None: print('Connecting to serial port *{}*'.format( config.get('heatpump', 'serial_port'))) - serial_conn = Serial(config.get('heatpump', 'serial_port'), - config.get('heatpump', 'baudrate')) + serial_conn = Serial( + port=config.get('heatpump', 'serial_port'), + baudrate=config.get('heatpump', 'baudrate'), + bytesize=8, + parity='N', + stopbits=1) if serial_conn.open is False: print('Opening serial port') @@ -25,38 +30,54 @@ def connect(): def read_coils(start, end): global serial_con + connect() res = [] - # digital - boolean - req_adu = rtu.read_coils( - slave_id=1, - starting_address=start, - quantity=end - start) - - resp = rtu.send_message(req_adu, serial_conn) + try: + for address in range(start, end + 1, 125): + qty = 125 if (end - address) >= 125 else (end - address) + if not qty: + break + print(start, end, address, qty) + req_adu = rtu.read_coils( + slave_id=1, + starting_address=address, + quantity=qty) + resp = rtu.send_message(req_adu, serial_conn) + res.extend(resp) + except umodbus.exceptions.IllegalDataAddressError as e: + print(e) return res + def read_holding_registers(start, end): global serial_conn + connect() res = [] + try: + for address in range(start, end + 1, 125): + qty = 125 if (end - address) >= 125 else (end - address) + if not qty: + break + print(start, end, address, qty) + req_adu = rtu.read_holding_registers( + slave_id=1, + starting_address=address, + quantity=qty) - for address in range(start, end + 1, 125): - qty = 125 if (end - address) >= 125 else (end - address) - if not qty: - break - print(start, end, address, qty) - req_adu = rtu.read_coils( - slave_id=1, - starting_address=address, - quantity=qty) - - resp = rtu.send_message(req_adu, serial_conn) - res.extend(resp) + resp = rtu.send_message(req_adu, serial_conn) + res.extend(resp) + except umodbus.exceptions.IllegalDataAddressError as e: + print(e) return res if __name__ == '__main__': - connect() - read_holding_registers(1, 10) + resp = read_holding_registers(1, 5000) + #pprint(resp) + print(len(resp)) + resp = read_coils(1, 2000) + #pprint(resp) + print(len(resp)) diff --git a/pyproject.toml b/pyproject.toml index baf829b..d22444d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,8 @@ license = "GPL-3.0-or-later" [tool.poetry.dependencies] python = "^3.7" starlette = "^0.13.6" -umodbus = "^1.0.3" uvicorn = "^0.11.6" +umodbus = "^1.0.3" [tool.poetry.dev-dependencies] pytest = "^5.4.3" diff --git a/pytest.ini b/pytest.ini index 9b66c86..5eda076 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,3 @@ [pytest] testpaths = tests -addopts = -rP +#addopts = -r diff --git a/tests/test_modbus.py b/tests/test_modbus.py index 675616e..77db9ee 100755 --- a/tests/test_modbus.py +++ b/tests/test_modbus.py @@ -33,7 +33,7 @@ def set_test_db(): yield - os.unlink(tmpdb) + #os.unlink(tmpdb) @pytest.fixture(scope='module') def serial_conn(): @@ -44,8 +44,8 @@ def serial_conn(): @patch('umodbus.client.serial.rtu.send_message') @patch('umodbus.client.serial.rtu.read_coils') -def test_rtu_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn): - modbus.read_holding_registers(1, 3) +def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn): + modbus.read_coils(1, 3) RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() @@ -53,6 +53,18 @@ def test_rtu_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn): RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() + +@patch('umodbus.client.serial.rtu.send_message') +@patch('umodbus.client.serial.rtu.read_holding_registers') +def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn): + modbus.read_holding_registers(1, 3) + RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2) + RtuSendMessage.assert_called() + + modbus.read_holding_registers(1, 500) + RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125) + RtuSendMessage.assert_called() + def test_rtu_call_holding_registers(serial_conn): r = modbus.read_holding_registers(1, 1) assert type(r) == list @@ -70,25 +82,45 @@ def test_rtu_call_holding_registers(serial_conn): assert type(r) == list assert len(r) == 150 +@pytest.fixture +def var_types(): + return VariableType.getall() - - -def test_get_var(set_test_db, serial_conn): - d_var_types = VariableType.getall() - - analog = d_var_types['Analog'] +def test_get_analog(set_test_db, serial_conn, var_types): + analog = var_types['Analog'] + print(analog.__dict__) res = modbus.read_holding_registers(analog.start_address, analog.end_address) assert type(res) == list assert len(res) == analog.end_address - analog.start_address for r in res: assert type(r) == int - integer = d_var_types['Integer'] +def test_get_integer(set_test_db, serial_conn, var_types): + integer = var_types['Integer'] + print(integer.__dict__) try: res = modbus.read_holding_registers(integer.start_address, integer.end_address) - except umodbus.exceptions.IllegalDataAddressError: + assert type(res) == list + assert len(res) == integer.end_address - integer.start_address + for r in res: + assert type(r) == int + + except umodbus.exceptions.IllegalDataAddressError as e: + print(e) print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]') + assert False + + +def test_get_digital(set_test_db, serial_conn, var_types): + digital = var_types['Digital'] + print(digital.__dict__) + try: + res = modbus.read_coils(digital.start_address, digital.end_address) + except umodbus.exceptions.IllegalDataAddressError: + print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]') assert type(res) == list - assert len(res) == integer.end_address - integer.start_address + assert len(res) == digital.end_address - digital.start_address for r in res: assert type(r) == int + assert r in [0, 1] +