123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- #!/usr/bin/env python3
- # builtins
- import click
- import uvicorn
- import os
- import sys
- import re
- import importlib
- from pprint import pprint
- import urllib
- import requests
- import json
- from datetime import datetime
-
- from pyheatpump.logger import logger_init
- from pyheatpump.models import *
- from pyheatpump.db import commit
-
- CONTEXT_SETTINGS={
- 'default_map':{'run': {}}
- }
-
- @click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS)
- @click.option('--version', is_flag=True)
- @click.pass_context
- def cli(ctx, version):
- if version:
- from pyheatpump import version
- return click.echo(pyheatpump.version())
-
-
- @click.option('--host', default=None)
- @click.option('--port', default=None)
- @cli.command()
- def run(host, port):
- logger = logger_init()
-
- from .config import (API_HOST, API_PORT)
-
- if not host:
- host = API_HOST
-
- if not port:
- port = API_PORT
-
- log_level = 'info'
-
- click.echo('Launching PyHeatpump application')
-
- uvicorn.run('pyheatpump.app:application',
- host=host,
- port=int(port),
- log_level=log_level,
- reload=True)
-
-
- @cli.command()
- def fetch():
- logger = logger_init()
-
- from pyheatpump import modbus
- var_types = VariableType.getall()
-
- # Analog - float
- analog = var_types['Analog']
- logger.info('Read analog variables in registers [{}, {}]'.format(
- analog.start_address, analog.end_address
- ))
- res = modbus.read_holding_registers(analog.start_address, analog.end_address)
-
- for r in range(len(res)):
- var = Variable(**{
- 'type': analog,
- 'address': r + analog.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
-
- # Integer - int
- integer = var_types['Integer']
- logger.info('Read integer variables in registers [{}, {}]'.format(
- integer.start_address, integer.end_address
- ))
- res = modbus.read_holding_registers(integer.start_address, integer.end_address)
-
- for r in range(len(res)):
- var = Variable(**{
- 'type': integer,
- 'address': r + integer.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
- # Digital - bool
- digital = var_types['Digital']
- logger.info('Read digital variables in coils [{}, {}]'.format(
- digital.start_address, digital.end_address
- ))
- res = modbus.read_coils(digital.start_address, digital.end_address)
-
- for r in range(len(res)):
- var = Variable(**{
- 'type': digital,
- 'address': r + digital.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
- commit()
- logger.info('Successfully read all variables')
-
-
- @click.option('--since', is_flag=True)
- @cli.command()
- def supervise(since):
- logger = logger_init()
-
- from .config import config
- from .models.heatpump import Heatpump
-
- last_update = None
- if since:
- last_update = int(datetime.now().strftime('%s')) - config.getint('supervisor', 'interval')
-
- h = Heatpump(config.get('heatpump','mac_address'), last_update)
-
- base_url = {
- 'scheme':config.get('supervisor', 'scheme'),
- 'hostname':config.get('supervisor', 'host'),
- 'port':config.getint('supervisor', 'port')
- }
-
- build_url = lambda d: '{scheme}://{hostname}:{port}{path}'.format(**d)
-
- if base_url['scheme'] == 'https':
- certificate = config.get('supervisor', 'certificate')
- if not os.path.isfile(certificate):
- raise Exception(f'Certificate not found :{certificate}')
- print(certificate)
- else:
- certificate = None
-
- post_url = {
- **base_url,
- **{'path': config.get('supervisor', 'post_path')}
- }
-
- logger.info(build_url(post_url))
- try:
- json.dumps(h.__dict__())
- except Exception as e:
- print(e)
- sys.exit(1)
-
- logger.debug(h.__dict__())
-
- post_req = requests.post(
- url=build_url(post_url),
- json=h.__dict__(),
- verify=False
- )
- if post_req.status_code == 200:
- logger.info('POST to supervisor succeeded')
-
- get_path = '/'.join((
- config.get('supervisor', 'get_path'),
- h.macformat
- ))
- get_url = {
- **base_url,
- **{'path': get_path}
- }
-
- get_req = requests.get(
- url=build_url(get_url),
- verify=False
- )
|