#!/usr/bin/env python3 # builtins import click from datetime import datetime import importlib import json import os import re import sys import time import urllib import uvicorn from pprint import pprint import requests from pyheatpump.logger import logger_init from pyheatpump.models import * from pyheatpump.lib import shift_response CONTEXT_SETTINGS={ 'default_map':{'run': {}} } @click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS) @click.option('--version', is_flag=True) @click.pass_context def cli(ctx, version): if version: from pyheatpump import version return click.echo(pyheatpump.version()) @click.option('--host', default=None) @click.option('--port', default=None) @cli.command() def run(host, port): logger = logger_init() from .config import (API_HOST, API_PORT) if not host: host = API_HOST if not port: port = API_PORT log_level = 'info' click.echo('Launching PyHeatpump application') uvicorn.run('pyheatpump.app:application', host=host, port=int(port), log_level=log_level, workers=1) @click.option('--type', '-t', default=None, multiple=True) @cli.command() def fetch(type): logger = logger_init() from pyheatpump import modbus try: if type is None: var_types = VariableType.getall() else: var_types = {} for label, var_type in VariableType.getall().items(): if label in type or var_type.slabel in type: var_types[label] = var_type # Analog - float if 'Analog' in var_types.keys(): analog = var_types['Analog'] logger.info('Read analog variables in registers [{}, {}]'.format( analog.start_address, analog.end_address )) res = modbus.read_holding_registers(analog.start_address, analog.end_address) logger.debug(f'analog length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': analog, 'address': r + analog.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Integer - int if 'Integer' in var_types.keys(): integer = var_types['Integer'] logger.info('Read integer variables in registers [{}, {}]'.format( integer.start_address, integer.end_address )) res = modbus.read_holding_registers(integer.start_address, integer.end_address) logger.debug(f'integer length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': integer, 'address': r + integer.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Digital - bool if 'Digital' in var_types.keys(): digital = var_types['Digital'] logger.info('Read digital variables in coils [{}, {}]'.format( digital.start_address, digital.end_address )) res = modbus.read_coils(digital.start_address, digital.end_address) logger.debug(f'digital length : {len(res)}') for r in range(len(res)): var = Variable(**{ 'type': digital, 'address': r + digital.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() logger.info('Successfully read all variables') except Exception as exc: logger.error(exc) if modbus.serial_conn: modbus.serial_conn.close() @click.option('--since', is_flag=True) @cli.command() def supervise(since): logger = logger_init() from .config import config, mac_address_init, get_last_update, set_last_update mac_address = config.get('heatpump','mac_address') if not re.match('^(([0-9a-f]{2}):?){6}$', mac_address, re.I): mac_address = mac_address_init() from .models.heatpump import Heatpump if not since: last_update = 0 else: last_update = get_last_update() post_packets = [ Heatpump(mac_address, last_update, ['Analog']).__dict__(), Heatpump(mac_address, last_update, ['Digital']).__dict__(), Heatpump(mac_address, last_update, ['Integer']).__dict__() ] base_url = { 'scheme':config.get('supervisor', 'scheme'), 'hostname':config.get('supervisor', 'host'), 'port':config.getint('supervisor', 'port') } build_url = lambda d: '{scheme}://{hostname}:{port}{path}'.format(**d) """ @TODO : Use a proper certificate if base_url['scheme'] == 'https': certificate = config.get('supervisor', 'certificate') if not os.path.isfile(certificate): raise Exception(f'Certificate not found :{certificate}') print(certificate) else: certificate = None """ post_url = { **base_url, **{'path': config.get('supervisor', 'post_path')} } logger.info(build_url(post_url)) for packet in post_packets: logger.debug('Will send %s', shift_response(packet)) post_resp = requests.post( url=build_url(post_url), json=shift_response(packet), verify=False, timeout=200 ) time.sleep(5) if post_resp.status_code == 200: logger.info('POST to supervisor succeeded') continue logger.info('POST to supervisor failed') set_last_update(int(datetime.now().strftime('%s'))) get_path = '/'.join(( config.get('supervisor', 'get_path'), Heatpump(mac_address).macformat )) get_url = { **base_url, **{'path': get_path} } get_resp = requests.get( url=build_url(get_url), verify=False ) control_data = get_resp.json() if Heatpump(mac_address).control(control_data): logger.info('GET to supervisor succeded : updated values') set_last_update(int(datetime.now().strftime('%s'))) else: logger.warn('Unable to set data from supervisor\n{}'.format(control_data))