#!/usr/bin/env python3 import pytest from unittest.mock import patch, MagicMock from starlette.authentication import UnauthenticatedUser from starlette.testclient import TestClient from pprint import pprint import json from tempfile import mkstemp from configparser import ConfigParser import os from serial import Serial import umodbus from umodbus.client.serial import rtu from pyheatpump.conf import config from pyheatpump.db import initialize from pyheatpump.models.variable_type import VariableType from pyheatpump import modbus @pytest.fixture(scope='module') def set_test_db(): _, tmpdb = mkstemp(suffix='.db', dir=os.getcwd(), ) print(f'Will store database in {tmpdb}') config['heatpump']['database'] = tmpdb if not initialize(os.path.join(os.getcwd(), 'db/pyheatpump.sql')): sys.exit(-1) if not initialize(os.path.join(os.getcwd(), 'db/test_variables.sql')): sys.exit(-1) yield #os.unlink(tmpdb) @pytest.fixture(scope='module') def serial_conn(): s = modbus.connect() assert type(s) == Serial return s @patch('umodbus.client.serial.rtu.send_message') @patch('umodbus.client.serial.rtu.read_coils') def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn): modbus.read_coils(1, 3) RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() modbus.read_coils(1, 3) RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() @patch('umodbus.client.serial.rtu.send_message') @patch('umodbus.client.serial.rtu.read_holding_registers') def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn): modbus.read_holding_registers(1, 3) RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2) RtuSendMessage.assert_called() modbus.read_holding_registers(1, 500) RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125) RtuSendMessage.assert_called() def test_rtu_call_holding_registers(serial_conn): r = modbus.read_holding_registers(1, 1) assert type(r) == list assert len(r) == 0 r = modbus.read_holding_registers(1, 5) assert type(r) == list assert len(r) == 4 r = modbus.read_holding_registers(1, 126) assert type(r) == list assert len(r) == 125 r = modbus.read_holding_registers(1, 151) assert type(r) == list assert len(r) == 150 @pytest.fixture def var_types(): return VariableType.getall() def test_get_analog(set_test_db, serial_conn, var_types): analog = var_types['Analog'] print(analog.__dict__) res = modbus.read_holding_registers(analog.start_address, analog.end_address) assert type(res) == list assert len(res) == analog.end_address - analog.start_address for r in res: assert type(r) == int def test_get_integer(set_test_db, serial_conn, var_types): integer = var_types['Integer'] print(integer.__dict__) try: res = modbus.read_holding_registers(integer.start_address, integer.end_address) assert type(res) == list assert len(res) == integer.end_address - integer.start_address for r in res: assert type(r) == int except umodbus.exceptions.IllegalDataAddressError as e: print(e) print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]') assert False def test_get_digital(set_test_db, serial_conn, var_types): digital = var_types['Digital'] print(digital.__dict__) try: res = modbus.read_coils(digital.start_address, digital.end_address) except umodbus.exceptions.IllegalDataAddressError: print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]') assert type(res) == list assert len(res) == digital.end_address - digital.start_address for r in res: assert type(r) == int assert r in [0, 1]