123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- #!/usr/bin/env python3
- import os
- import re
- from datetime import datetime
- from configparser import ConfigParser
- from starlette.requests import Request
- from starlette.routing import Route, Router
- from starlette.responses import PlainTextResponse, JSONResponse
- from pprint import pprint
- import uvicorn
- import json
-
- from pyheatpump.logger import logger_init
- logger = logger_init()
-
- default_config = {
- 'heatpump': {
- 'serial_port': '/dev/ttyUSB0',
- 'baudrate': 19200,
- 'mac_address': 'None',
- 'ip_address': 'dhcp',
- 'read_only': 'False',
- 'database': '/var/run/pyheatpump/pyheatpump.sqlite3'
- },
- 'supervisor': {
- 'scheme': 'http',
- 'host': '127.0.0.1',
- 'port': 8008,
- 'certificate': 'None',
- 'get_path': '/',
- 'post_path': '/',
- 'interval': 10000,
- 'auth': 'None',
- 'initialization': str(datetime.now()),
- 'heatpump_id': 'None',
- 'last_update': 0
- },
- 'api': {
- 'host': '127.0.0.1',
- 'port': 80
- }
- }
-
- CONFIG_FILES=[
- '/etc/pyheatpump.ini',
- './pyheatpump.ini']
-
- config = ConfigParser()#default_section='heatpump')
- config.read_dict(default_config)
- config.read(filenames=CONFIG_FILES)
-
- API_HOST=config.get('api', 'host')
- API_PORT=config.get('api', 'port')
-
- def save_config():
- with open(CONFIG_FILES[-1], 'w') as conf_file:
- config.write(conf_file)
-
-
- def mac_address_init():
- from netifaces import gateways, ifaddresses, AF_INET, AF_LINK
-
- interface = gateways()['default'][AF_INET][1]
- if len(ifaddresses(interface)) == 0:
- logger.error("Can't find interface")
- sys.exit(1)
-
- if len(ifaddresses(interface)[AF_LINK]) == 0:
- logger.error("Can't find interface")
- sys.exit(1)
-
- addr = ifaddresses(interface)[AF_LINK][0]['addr']
- config.set('heatpump', 'mac_address', addr)
- save_config()
- return addr
-
-
- def get_config_dict():
- c = config
- return dict([ (s, dict([ (i, j) for i, j in c.items(s) ]) ) for s in c.sections() ])
-
-
- async def get_config(request):
- return JSONResponse(get_config_dict())
-
-
- async def set_config(request):
- d_body = await request.json()
-
- for sect in d_body.keys():
- if type(d_body[sect]) is not dict:
- continue
- for item in d_body[sect].keys():
- if not config.has_option(sect, item):
- raise HTTPException(404, f'The option {sect}.{item} does not exists')
- else:
- config.set(sect, item, value=d_body[sect][item])
-
- save_config()
- return PlainTextResponse('OK')
-
- async def config_route(request, *args, **kwargs):
- if request['method'] == 'GET':
- return await get_config(request)
- elif request['method'] == 'POST':
- return await set_config(request)
-
-
- def get_mac():
- return config.get('heatpump','mac_address')
-
-
- async def mac_route(request, *args, **kwargs):
- if request['method'] == 'GET':
- return PlainTextResponse(get_mac())
- elif request['method'] == 'POST':
- body = await request.body()
- mac_address = body.decode()
- if len(mac_address) == 0:
- mac_address = mac_address_init()
-
- elif re.match('^([0-9a-f]{2}:?){6}$',
- mac_address, re.I) is not None:
-
- config.set('heatpump','mac_address',mac_address)
- save_config()
- else:
- print(f'Wrong mac format : {mac_address}')
-
- return PlainTextResponse(' '.join((
- 'Current ->',
- get_mac(),
- ', New -> ', mac_address)))
-
-
- def get_last_update():
- return config.get('supervisor', 'last_update')
-
-
- def set_last_update(last_update: int=0):
- config.set('supervisor', 'last_update', str(last_update))
- save_config()
- return get_last_update()
-
-
- async def last_update_route(request, *args, **kwargs):
- """
- last_update :
- == 0 : since current date - interval
- < 0 : since current_date + last_update - interval
- > 0 : since current_data - last_update - interval (same behaviour as < 0)
- """
- if type(request) != Request or request['method'] == 'GET':
- return PlainTextResponse(get_last_update())
-
- elif request['method'] == 'POST':
- body = await request.body()
- last_update = int(body.decode() or 0)
- if last_update == 0:
- print(f'Reset last update to current time')
- last_update = int(datetime.now().strftime('%s'))
- else:
- print(f'{"Forward" if last_update < 0 else "Rewind"} of {last_update} seconds')
- last_update = int(datetime.now().strftime('%s')) - last_update
-
-
- return PlainTextResponse(str(set_last_update(last_update)))
-
-
- ROUTES=[
- Route('/', config_route, methods=['GET', 'POST']),
- Route('/mac', mac_route, methods=['GET', 'POST']),
- Route('/last_update', last_update_route, methods=['GET', 'POST'])
- ]
-
- app = Router(routes=ROUTES)
-
- if __name__ == '__main__':
- uvicorn.run('pyHeatpump:conf.app',
- host='127.0.0.1',
- port=8000)
|