[var_type] added the models, the database base methods, and the tests
This commit is contained in:
parent
da9552a5d2
commit
4c0576377b
8 changed files with 244 additions and 3 deletions
BIN
db/pyheatpump.db
BIN
db/pyheatpump.db
Binary file not shown.
|
|
@ -1,9 +1,51 @@
|
|||
#!/usr/bin/env python3
|
||||
import sqlite3
|
||||
from ..conf import config
|
||||
from subprocess import Popen
|
||||
from .conf import config
|
||||
import sys
|
||||
|
||||
conn = None
|
||||
|
||||
def connect():
|
||||
global conn
|
||||
if conn is None:
|
||||
print('Will connect to database {}'.format(
|
||||
config['heatpump']['database']))
|
||||
conn = sqlite3.connect(config['heatpump']['database'])
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def initialize(filename):
|
||||
p = Popen(
|
||||
'/usr/bin/env sqlite3 -init {} {}'.format(filename, config['heatpump']['database']),
|
||||
shell=True
|
||||
)
|
||||
return True if p.wait() == 0 else False
|
||||
|
||||
|
||||
def sql(query):
|
||||
global conn
|
||||
return conn.execute(query)
|
||||
if conn is None:
|
||||
connect()
|
||||
|
||||
cursor = conn.cursor()
|
||||
|
||||
print(f'Will execute query : \n{query}\n')
|
||||
cursor.execute(query)
|
||||
|
||||
return cursor
|
||||
|
||||
class RowClass(object):
|
||||
def __init__(self, **kwargs):
|
||||
for key in kwargs.keys():
|
||||
if hasattr(self, key):
|
||||
setattr(self, key, kwargs[key])
|
||||
|
||||
def select(self, key, tablename):
|
||||
attr = getattr(self, key)
|
||||
if type(attr) == str:
|
||||
q = f"SELECT * FROM {tablename} WHERE {key} LIKE '{attr}'"
|
||||
elif type(attr) == int:
|
||||
q = f"SELECT * FROM {tablename} WHERE {key} = {attr}"
|
||||
|
||||
return sql(q)
|
||||
|
|
|
|||
0
pyheatpump/models/__init__.py
Normal file
0
pyheatpump/models/__init__.py
Normal file
18
pyheatpump/models/variable.py
Normal file
18
pyheatpump/models/variable.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
from pyheatpump.db import RowClass
|
||||
from pyheatpump.db import sql
|
||||
|
||||
class VariableType(RowClass):
|
||||
slabel: str = None
|
||||
label: str = None
|
||||
type: str = None
|
||||
start_address: int = None
|
||||
end_address: int = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def getall():
|
||||
return dict([
|
||||
(row['label'], VariableType(**dict(row)))
|
||||
for row in sql('SELECT * FROM var_type') ])
|
||||
42
pyheatpump/models/variable_type.py
Normal file
42
pyheatpump/models/variable_type.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
from pyheatpump.db import RowClass
|
||||
from pyheatpump.db import sql
|
||||
|
||||
class VariableType(RowClass):
|
||||
slabel: str = None
|
||||
label: str = None
|
||||
type: str = None
|
||||
start_address: int = None
|
||||
end_address: int = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if self.slabel is None and self.label is not None:
|
||||
self.slabel = self.label[0]
|
||||
|
||||
|
||||
def select():
|
||||
try:
|
||||
elt = next(super().select('slabel', 'var_type'))
|
||||
except StopIteration:
|
||||
print('No element exists')
|
||||
|
||||
def save(self):
|
||||
q = ['UPDATE var_type SET']
|
||||
updates = []
|
||||
if self.start_address is not None:
|
||||
updates.append(f'start_address = {self.start_address}')
|
||||
if self.end_address is not None:
|
||||
updates.append(f'end_address = {self.end_address}')
|
||||
if len(updates) == 0:
|
||||
return
|
||||
q.append(','.join(updates))
|
||||
q.append(f"WHERE slabel LIKE '{self.slabel}'")
|
||||
|
||||
return sql(' '.join(q))
|
||||
|
||||
@staticmethod
|
||||
def getall():
|
||||
return dict([
|
||||
(row['label'], VariableType(**dict(row)))
|
||||
for row in sql('SELECT * FROM var_type') ])
|
||||
18
pyheatpump/models/variable_value.py
Normal file
18
pyheatpump/models/variable_value.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
from pyheatpump.db import RowClass
|
||||
from pyheatpump.db import sql
|
||||
|
||||
class VariableType(RowClass):
|
||||
slabel: str = None
|
||||
label: str = None
|
||||
type: str = None
|
||||
start_address: int = None
|
||||
end_address: int = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def getall():
|
||||
return dict([
|
||||
(row['label'], VariableType(**dict(row)))
|
||||
for row in sql('SELECT * FROM var_type') ])
|
||||
54
pyheatpump/variable_types.py
Normal file
54
pyheatpump/variable_types.py
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
from datetime import datetime
|
||||
from configparser import ConfigParser
|
||||
from starlette.routing import Route, Router
|
||||
from starlette.responses import PlainTextResponse, JSONResponse
|
||||
from pprint import pprint
|
||||
import uvicorn
|
||||
import json
|
||||
|
||||
# pyHeatpump modules
|
||||
from pyheatpump.db import sql
|
||||
from pyheatpump.models.variable_type import VariableType
|
||||
|
||||
def variable_types():
|
||||
assert type(VariableType.getall()) == list
|
||||
|
||||
async def get_variable_types(request):
|
||||
return JSONResponse(dict([
|
||||
(key, val.__dict__)
|
||||
for key, val in VariableType.getall().items()
|
||||
]))
|
||||
|
||||
async def set_variable_types(request):
|
||||
|
||||
body = json.loads(await request.json())
|
||||
|
||||
for var_type_label, var_type_values in body.items():
|
||||
vt = VariableType(label=var_type_label)
|
||||
for key, val in var_type_values.items():
|
||||
if key in [ 'start_address', 'end_address' ]:
|
||||
setattr(vt, key, val)
|
||||
|
||||
vt.save()
|
||||
|
||||
return PlainTextResponse('OK')
|
||||
|
||||
async def variable_types_routes(request, *args, **kwargs):
|
||||
if request['method'] == 'GET':
|
||||
return await get_variable_types(request)
|
||||
elif request['method'] == 'POST':
|
||||
return await set_variable_types(request)
|
||||
|
||||
|
||||
ROUTES=[
|
||||
Route('/', variable_types_routes, methods=['GET', 'POST'])
|
||||
]
|
||||
|
||||
app = Router(routes=ROUTES)
|
||||
|
||||
if __name__ == '__main__':
|
||||
uvicorn.run('pyHeatpump:conf.app',
|
||||
host='127.0.0.1',
|
||||
port=8000)
|
||||
67
tests/test_variable_types.py
Normal file
67
tests/test_variable_types.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
#!/usr/bin/env python3
|
||||
import pytest
|
||||
from starlette.authentication import UnauthenticatedUser
|
||||
from starlette.testclient import TestClient
|
||||
#from pyheatpump.conf import app, config, default_config, CONFIG_FILES, get_config, set_config, config_route, ROUTES
|
||||
from unittest.mock import patch, MagicMock
|
||||
from pprint import pprint
|
||||
import json
|
||||
from tempfile import mkstemp
|
||||
from configparser import ConfigParser
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pyheatpump.conf import config
|
||||
from pyheatpump.db import initialize, connect
|
||||
from pyheatpump.variable_types import app, get_variable_types, set_variable_types, ROUTES
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def set_test_db():
|
||||
_, tmpdb = mkstemp(suffix='.db', dir=os.getcwd(), )
|
||||
print(f'Will store database in {tmpdb}')
|
||||
config['heatpump']['database'] = tmpdb
|
||||
if not initialize(os.path.join(os.getcwd(), 'db/pyheatpump.sql')):
|
||||
sys.exit(-1)
|
||||
|
||||
yield
|
||||
|
||||
os.unlink(tmpdb)
|
||||
|
||||
|
||||
def test_get_(set_test_db):
|
||||
c = TestClient(app)
|
||||
r = c.get('/')
|
||||
assert r.status_code == 200
|
||||
|
||||
class RequestMock(MagicMock):
|
||||
def __get__(self, key):
|
||||
if key == 'method':
|
||||
return 'GET'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_variable_types(set_test_db):
|
||||
resp = await get_variable_types(RequestMock())
|
||||
assert resp.status_code == 200
|
||||
d_resp = json.loads(resp.body.decode())
|
||||
assert 'Analog' in d_resp.keys()
|
||||
assert type(d_resp['Analog']) == dict
|
||||
assert 'Integer' in d_resp.keys()
|
||||
assert type(d_resp['Integer']) == dict
|
||||
assert 'Digital' in d_resp.keys()
|
||||
assert type(d_resp['Digital']) == dict
|
||||
|
||||
def test_set_variable_types(set_test_db):
|
||||
c = TestClient(app)
|
||||
r = c.post('/', json=json.dumps({
|
||||
'Analog': {
|
||||
'start_address': 42,
|
||||
'end_address': 420,
|
||||
}
|
||||
}))
|
||||
|
||||
assert r.status_code == 200
|
||||
|
||||
r = c.get('/')
|
||||
d_resp = json.loads(r.content.decode())
|
||||
assert d_resp['Analog']['start_address'] == 42
|
||||
assert d_resp['Analog']['end_address'] == 420
|
||||
Loading…
Add table
Add a link
Reference in a new issue