[tests] update tests

This commit is contained in:
Maxime Alves LIRMM@home 2020-07-31 22:51:21 +02:00
commit e45a344428
6 changed files with 242 additions and 12 deletions

View file

@ -0,0 +1,55 @@
#!/usr/bin/env python3
import pytest
from starlette.authentication import UnauthenticatedUser
from starlette.testclient import TestClient
#from pyheatpump.conf import app, config, default_config, CONFIG_FILES, get_config, set_config, config_route, ROUTES
from unittest.mock import patch, MagicMock
from pprint import pprint
import json
from tempfile import mkstemp
from configparser import ConfigParser
import os
import sys
from datetime import datetime
from pyheatpump.conf import config
from pyheatpump.db import initialize, connect
from pyheatpump.variable_types import app, get_variable_types, set_variable_types, ROUTES
from pyheatpump.models import *
@pytest.fixture()
def set_test_db():
_, tmpdb = mkstemp(suffix='.db', dir=os.getcwd(), )
print(f'Will store database in {tmpdb}')
config['heatpump']['database'] = tmpdb
if not initialize(os.path.join(os.getcwd(), 'db/pyheatpump.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variables.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variable_values.sql')):
sys.exit(-1)
yield
os.unlink(tmpdb)
def test_last_update(set_test_db):
for _, var_type in VariableType.getall().items():
for _, variable in var_type.get_variables().items():
try:
if not variable.last_update:
continue
time = datetime.fromtimestamp(variable.last_update)
val = VariableValue.get(variable.type,
variable.address,
time)
assert val is not None
assert variable.last_update == val.time
except StopIteration:
assert False

39
tests/test_heatpump.py Normal file
View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import pytest
from starlette.authentication import UnauthenticatedUser
from starlette.testclient import TestClient
from pyheatpump.conf import app, config, default_config, CONFIG_FILES, get_config, set_config, config_route, ROUTES
from pyheatpump.db import initialize, connect
from unittest.mock import patch, MagicMock
from pprint import pprint
import json
from tempfile import mkstemp
from configparser import ConfigParser
import os
@pytest.fixture(scope='module')
def set_test_db():
_, tmpdb = mkstemp(suffix='.db', dir=os.getcwd(), )
print(f'Will store database in {tmpdb}')
config['heatpump']['database'] = tmpdb
if not initialize(os.path.join(os.getcwd(), 'db/pyheatpump.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variables.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variable_values.sql')):
sys.exit(-1)
yield
def test_get_(set_test_db):
c = TestClient(app)
r = c.get('/')
assert r.status_code == 200
resp = r.content.decode()
assert type(resp) == str
d_resp = json.loads(resp)
assert type(d_resp) == dict

94
tests/test_modbus.py Executable file
View file

@ -0,0 +1,94 @@
#!/usr/bin/env python3
import pytest
from unittest.mock import patch, MagicMock
from starlette.authentication import UnauthenticatedUser
from starlette.testclient import TestClient
from pprint import pprint
import json
from tempfile import mkstemp
from configparser import ConfigParser
import os
from serial import Serial
import umodbus
from umodbus.client.serial import rtu
from pyheatpump.conf import config
from pyheatpump.db import initialize
from pyheatpump.models.variable_type import VariableType
from pyheatpump import modbus
@pytest.fixture(scope='module')
def set_test_db():
_, tmpdb = mkstemp(suffix='.db', dir=os.getcwd(), )
print(f'Will store database in {tmpdb}')
config['heatpump']['database'] = tmpdb
if not initialize(os.path.join(os.getcwd(), 'db/pyheatpump.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variables.sql')):
sys.exit(-1)
yield
os.unlink(tmpdb)
@pytest.fixture(scope='module')
def serial_conn():
s = modbus.connect()
assert type(s) == Serial
return s
@patch('umodbus.client.serial.rtu.send_message')
@patch('umodbus.client.serial.rtu.read_coils')
def test_rtu_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
modbus.read_holding_registers(1, 3)
RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
RtuSendMessage.assert_called()
modbus.read_coils(1, 3)
RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
RtuSendMessage.assert_called()
def test_rtu_call_holding_registers(serial_conn):
r = modbus.read_holding_registers(1, 1)
assert type(r) == list
assert len(r) == 0
r = modbus.read_holding_registers(1, 5)
assert type(r) == list
assert len(r) == 4
r = modbus.read_holding_registers(1, 126)
assert type(r) == list
assert len(r) == 125
r = modbus.read_holding_registers(1, 151)
assert type(r) == list
assert len(r) == 150
def test_get_var(set_test_db, serial_conn):
d_var_types = VariableType.getall()
analog = d_var_types['Analog']
res = modbus.read_holding_registers(analog.start_address, analog.end_address)
assert type(res) == list
assert len(res) == analog.end_address - analog.start_address
for r in res:
assert type(r) == int
integer = d_var_types['Integer']
try:
res = modbus.read_holding_registers(integer.start_address, integer.end_address)
except umodbus.exceptions.IllegalDataAddressError:
print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]')
assert type(res) == list
assert len(res) == integer.end_address - integer.start_address
for r in res:
assert type(r) == int

View file

@ -36,11 +36,12 @@ def test_get_(set_test_db):
assert r.status_code == 200
d_resp = json.loads(r.content.decode())
pprint(d_resp)
assert 'A' in d_resp.keys()
assert 10 in d_resp['A']
assert len(d_resp['A']) == 4
assert len(d_resp['I']) == 4
assert len(d_resp['D']) == 4
assert '10' in d_resp['A'].keys()
assert len(d_resp['A'].keys()) == 4
assert len(d_resp['I'].keys()) == 4
assert len(d_resp['D'].keys()) == 4
@pytest.mark.skip

View file

@ -50,6 +50,7 @@ async def test_get_variable_types(set_test_db):
assert 'Digital' in d_resp.keys()
assert type(d_resp['Digital']) == dict
@pytest.mark.skip
def test_set_variable_types(set_test_db):
c = TestClient(app)
r = c.post('/', json=json.dumps({

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import pytest
from starlette.authentication import UnauthenticatedUser
from starlette.exceptions import HTTPException
from starlette.testclient import TestClient
from unittest.mock import patch, MagicMock
from pprint import pprint
@ -9,10 +10,13 @@ from tempfile import mkstemp
from configparser import ConfigParser
import os
import sys
from datetime import datetime, timedelta
from pyheatpump.conf import config
from pyheatpump.db import initialize, connect
from pyheatpump.variables import app, get_variables, set_variables, ROUTES
from pyheatpump.variable_values import app, get_variable_value, set_variable_value, ROUTES
from pyheatpump.models.variable_value import VariableValue
@pytest.fixture(scope='module')
def set_test_db():
@ -25,22 +29,58 @@ def set_test_db():
if not initialize(os.path.join(os.getcwd(), 'db/test_variables.sql')):
sys.exit(-1)
if not initialize(os.path.join(os.getcwd(), 'db/test_variable_values.sql')):
sys.exit(-1)
yield
os.unlink(tmpdb)
def test_get_(set_test_db):
def test_get_type_address(set_test_db):
c = TestClient(app)
r = c.get('/')
r = c.get('/A/10')
assert r.status_code == 200
d_resp = json.loads(r.content.decode())
assert 'A' in d_resp.keys()
assert 10 in d_resp['A']
assert len(d_resp['A']) == 4
assert len(d_resp['I']) == 4
assert len(d_resp['D']) == 4
d_resp = r.content.decode()
assert int(d_resp) == 42
try:
r = c.get('/X/10')
except HTTPException as e:
assert e.status_code == 404
r = c.get('/A/10/{}'.format(
(datetime.now() - timedelta(days=1)).isoformat()
))
d_resp = r.content.decode()
assert int(d_resp) == 20
r = c.get('/A/10/{}'.format(
(datetime.now() - timedelta(days=2)).isoformat()
))
d_resp = r.content.decode()
assert int(d_resp) == 0
r = c.get('/I/5010')
d_resp = r.content.decode()
assert int(d_resp) == 42
r = c.get('/I/5011')
d_resp = r.content.decode()
assert int(d_resp) == 24
r = c.get('/I/5010/{}'.format(
(datetime.now() - timedelta(hours=1)).isoformat()
))
d_resp = r.content.decode()
assert int(d_resp) == 20
r = c.get('/I/5010/{}'.format(
(datetime.now() - timedelta(hours=2)).isoformat()
))
d_resp = r.content.decode()
assert int(d_resp) == 0
@pytest.mark.skip