#!/usr/bin/env python3 # builtins import click import uvicorn import os import sys import re import importlib from pprint import pprint import urllib import requests import json from datetime import datetime from pyheatpump.logger import logger_init from pyheatpump.models import * CONTEXT_SETTINGS={ 'default_map':{'run': {}} } @click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS) @click.option('--version', is_flag=True) @click.pass_context def cli(ctx, version): if version: from pyheatpump import version return click.echo(pyheatpump.version()) @click.option('--host', default=None) @click.option('--port', default=None) @cli.command() def run(host, port): logger = logger_init() from .config import (API_HOST, API_PORT) if not host: host = API_HOST if not port: port = API_PORT log_level = 'info' click.echo('Launching PyHeatpump application') uvicorn.run('pyheatpump.app:application', host=host, port=int(port), log_level=log_level, workers=1) @click.option('--type', '-t', default=None, multiple=True) @cli.command() def fetch(type): logger = logger_init() from pyheatpump import modbus try: if type is None: var_types = VariableType.getall() else: var_types = {} for label, var_type in VariableType.getall().items(): if label in type or var_type.slabel in type: var_types[label] = var_type # Analog - float if 'Analog' in var_types.keys(): analog = var_types['Analog'] logger.info('Read analog variables in registers [{}, {}]'.format( analog.start_address, analog.end_address )) res = modbus.read_holding_registers(analog.start_address, analog.end_address) logger.debug(f'analog length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': analog, 'address': r + analog.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Integer - int if 'Integer' in var_types.keys(): integer = var_types['Integer'] logger.info('Read integer variables in registers [{}, {}]'.format( integer.start_address, integer.end_address )) res = modbus.read_holding_registers(integer.start_address, integer.end_address) logger.debug(f'integer length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': integer, 'address': r + integer.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Digital - bool if 'Digital' in var_types.keys(): digital = var_types['Digital'] logger.info('Read digital variables in coils [{}, {}]'.format( digital.start_address, digital.end_address )) res = modbus.read_coils(digital.start_address, digital.end_address) logger.debug(f'digital length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': digital, 'address': r + digital.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() logger.info('Successfully read all variables') except Exception as exc: logger.error(exc) if modbus.serial_conn: modbus.serial_conn.close() @click.option('--since', is_flag=True) @cli.command() def supervise(since): logger = logger_init() from .config import config, mac_address_init, get_last_update, set_last_update mac_address = config.get('heatpump','mac_address') if not re.match('^(([0-9a-f]{2}):?){6}$', mac_address, re.I): mac_address = mac_address_init() from .models.heatpump import Heatpump if not since: last_update = 0 else: last_update = get_last_update() post_packets = [ Heatpump(mac_address, last_update, ['Analog']).__dict__(), Heatpump(mac_address, last_update, ['Digital']).__dict__(), Heatpump(mac_address, last_update, ['Integer']).__dict__() ] base_url = { 'scheme':config.get('supervisor', 'scheme'), 'hostname':config.get('supervisor', 'host'), 'port':config.getint('supervisor', 'port') } build_url = lambda d: '{scheme}://{hostname}:{port}{path}'.format(**d) """ @TODO : Use a proper certificate if base_url['scheme'] == 'https': certificate = config.get('supervisor', 'certificate') if not os.path.isfile(certificate): raise Exception(f'Certificate not found :{certificate}') print(certificate) else: certificate = None """ post_url = { **base_url, **{'path': config.get('supervisor', 'post_path')} } logger.info(build_url(post_url)) for packet in post_packets: try: logger.debug(json.dumps(packet)) except Exception as e: print(e) sys.exit(1) for packet in post_packets: logger.debug('Will send %s', packet) post_resp = requests.post( url=build_url(post_url), json=packet, verify=False ) if post_resp.status_code == 200: logger.info('POST to supervisor succeeded') set_last_update(int(datetime.now().strftime('%s'))) get_path = '/'.join(( config.get('supervisor', 'get_path'), h.macformat )) get_url = { **base_url, **{'path': get_path} } get_resp = requests.get( url=build_url(get_url), verify=False ) control_data = get_resp.json() if h.control(control_data): logger.info('GET to supervisor succeded : updated values') set_last_update(int(datetime.now().strftime('%s'))) else: logger.warn('Unable to set data from supervisor\n{}'.format(control_data))