[variables][db] fixed variable addresses and modbus functions

This commit is contained in:
Maxime Alves LIRMM@home 2020-08-01 19:23:02 +02:00
commit 0aa2af892a
6 changed files with 96 additions and 43 deletions

View file

@ -32,11 +32,11 @@ CREATE TABLE IF NOT EXISTS var_value (
);
INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES (
'A', 'Analog', 'float', 1, (1 + 500));
'A', 'Analog', 'float', 1, 5000);
INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES (
'I', 'Integer', 'int', 5002, (5002 + 500));
'I', 'Integer', 'int', 5002, 10002);
INSERT INTO var_type (slabel, label, type, start_address, end_address) VALUES (
'D', 'Digital', 'boolean', 1, (1 + 1000));
'D', 'Digital', 'bool', 1, 2048);
CREATE TRIGGER variable_insert BEFORE INSERT ON var_value
FOR EACH ROW

6
poetry.lock generated
View file

@ -273,7 +273,7 @@ description = "The lightning-fast ASGI server."
name = "uvicorn"
optional = false
python-versions = "*"
version = "0.11.6"
version = "0.11.8"
[package.dependencies]
click = ">=7.0.0,<8.0.0"
@ -436,8 +436,8 @@ urllib3 = [
{file = "urllib3-1.25.10.tar.gz", hash = "sha256:91056c15fa70756691db97756772bb1eb9678fa585d9184f24534b100dc60f4a"},
]
uvicorn = [
{file = "uvicorn-0.11.6-py3-none-any.whl", hash = "sha256:d19a20b17445708fd222e5a7cfc3eacfb31ac269bc8fefa4920833334e199782"},
{file = "uvicorn-0.11.6.tar.gz", hash = "sha256:467c333c743ec6a3eb545517a0e3cac603becfa40c73c65ed46c3a4bf1c8e2d0"},
{file = "uvicorn-0.11.8-py3-none-any.whl", hash = "sha256:4b70ddb4c1946e39db9f3082d53e323dfd50634b95fd83625d778729ef1730ef"},
{file = "uvicorn-0.11.8.tar.gz", hash = "sha256:46a83e371f37ea7ff29577d00015f02c942410288fb57def6440f2653fff1d26"},
]
uvloop = [
{file = "uvloop-0.14.0-cp35-cp35m-macosx_10_11_x86_64.whl", hash = "sha256:08b109f0213af392150e2fe6f81d33261bb5ce968a288eb698aad4f46eb711bd"},

View file

@ -2,6 +2,7 @@
from serial import Serial
from serial.serialutil import SerialException
from umodbus.client.serial import rtu
import umodbus
from pprint import pprint
from pyheatpump.conf import config
@ -14,8 +15,12 @@ def connect():
if serial_conn is None:
print('Connecting to serial port *{}*'.format(
config.get('heatpump', 'serial_port')))
serial_conn = Serial(config.get('heatpump', 'serial_port'),
config.get('heatpump', 'baudrate'))
serial_conn = Serial(
port=config.get('heatpump', 'serial_port'),
baudrate=config.get('heatpump', 'baudrate'),
bytesize=8,
parity='N',
stopbits=1)
if serial_conn.open is False:
print('Opening serial port')
@ -25,24 +30,10 @@ def connect():
def read_coils(start, end):
global serial_con
connect()
res = []
# digital - boolean
req_adu = rtu.read_coils(
slave_id=1,
starting_address=start,
quantity=end - start)
resp = rtu.send_message(req_adu, serial_conn)
return res
def read_holding_registers(start, end):
global serial_conn
res = []
try:
for address in range(start, end + 1, 125):
qty = 125 if (end - address) >= 125 else (end - address)
if not qty:
@ -55,8 +46,38 @@ def read_holding_registers(start, end):
resp = rtu.send_message(req_adu, serial_conn)
res.extend(resp)
except umodbus.exceptions.IllegalDataAddressError as e:
print(e)
return res
def read_holding_registers(start, end):
global serial_conn
connect()
res = []
try:
for address in range(start, end + 1, 125):
qty = 125 if (end - address) >= 125 else (end - address)
if not qty:
break
print(start, end, address, qty)
req_adu = rtu.read_holding_registers(
slave_id=1,
starting_address=address,
quantity=qty)
resp = rtu.send_message(req_adu, serial_conn)
res.extend(resp)
except umodbus.exceptions.IllegalDataAddressError as e:
print(e)
return res
if __name__ == '__main__':
connect()
read_holding_registers(1, 10)
resp = read_holding_registers(1, 5000)
#pprint(resp)
print(len(resp))
resp = read_coils(1, 2000)
#pprint(resp)
print(len(resp))

View file

@ -8,8 +8,8 @@ license = "GPL-3.0-or-later"
[tool.poetry.dependencies]
python = "^3.7"
starlette = "^0.13.6"
umodbus = "^1.0.3"
uvicorn = "^0.11.6"
umodbus = "^1.0.3"
[tool.poetry.dev-dependencies]
pytest = "^5.4.3"

View file

@ -1,3 +1,3 @@
[pytest]
testpaths = tests
addopts = -rP
#addopts = -r

View file

@ -33,7 +33,7 @@ def set_test_db():
yield
os.unlink(tmpdb)
#os.unlink(tmpdb)
@pytest.fixture(scope='module')
def serial_conn():
@ -44,8 +44,8 @@ def serial_conn():
@patch('umodbus.client.serial.rtu.send_message')
@patch('umodbus.client.serial.rtu.read_coils')
def test_rtu_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
modbus.read_holding_registers(1, 3)
def test_rtu__coils_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
modbus.read_coils(1, 3)
RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
RtuSendMessage.assert_called()
@ -53,6 +53,18 @@ def test_rtu_call_mocked(RtuReadCoils, RtuSendMessage, serial_conn):
RtuReadCoils.assert_any_call(slave_id=1, starting_address=1, quantity=2)
RtuSendMessage.assert_called()
@patch('umodbus.client.serial.rtu.send_message')
@patch('umodbus.client.serial.rtu.read_holding_registers')
def test_rtu_registers_call_mocked(RtuReadHoldingRegisters, RtuSendMessage, serial_conn):
modbus.read_holding_registers(1, 3)
RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=2)
RtuSendMessage.assert_called()
modbus.read_holding_registers(1, 500)
RtuReadHoldingRegisters.assert_any_call(slave_id=1, starting_address=1, quantity=125)
RtuSendMessage.assert_called()
def test_rtu_call_holding_registers(serial_conn):
r = modbus.read_holding_registers(1, 1)
assert type(r) == list
@ -70,25 +82,45 @@ def test_rtu_call_holding_registers(serial_conn):
assert type(r) == list
assert len(r) == 150
@pytest.fixture
def var_types():
return VariableType.getall()
def test_get_var(set_test_db, serial_conn):
d_var_types = VariableType.getall()
analog = d_var_types['Analog']
def test_get_analog(set_test_db, serial_conn, var_types):
analog = var_types['Analog']
print(analog.__dict__)
res = modbus.read_holding_registers(analog.start_address, analog.end_address)
assert type(res) == list
assert len(res) == analog.end_address - analog.start_address
for r in res:
assert type(r) == int
integer = d_var_types['Integer']
def test_get_integer(set_test_db, serial_conn, var_types):
integer = var_types['Integer']
print(integer.__dict__)
try:
res = modbus.read_holding_registers(integer.start_address, integer.end_address)
except umodbus.exceptions.IllegalDataAddressError:
print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]')
assert type(res) == list
assert len(res) == integer.end_address - integer.start_address
for r in res:
assert type(r) == int
except umodbus.exceptions.IllegalDataAddressError as e:
print(e)
print(f'The start and end addresses are not available [{integer.start_address}, {integer.end_address}]')
assert False
def test_get_digital(set_test_db, serial_conn, var_types):
digital = var_types['Digital']
print(digital.__dict__)
try:
res = modbus.read_coils(digital.start_address, digital.end_address)
except umodbus.exceptions.IllegalDataAddressError:
print(f'The start and end addresses are not available [{digital.start_address}, {digital.end_address}]')
assert type(res) == list
assert len(res) == digital.end_address - digital.start_address
for r in res:
assert type(r) == int
assert r in [0, 1]