#!/usr/bin/env python3 # builtins import click import uvicorn import os import sys import re import importlib from pprint import pprint from pyheatpump.logger import logger_init from pyheatpump.models import * from pyheatpump.db import commit CONTEXT_SETTINGS={ 'default_map':{'run': {}} } @click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS) @click.option('--version', is_flag=True) @click.pass_context def cli(ctx, version): if version: from pyheatpump import version return click.echo(pyheatpump.version()) @click.option('--host', default=None) @click.option('--port', default=None) @cli.command() def run(host, port): logger = logger_init() from .config import (API_HOST, API_PORT) if not host: host = API_HOST if not port: port = API_PORT log_level = 'info' click.echo('Launching PyHeatpump application') uvicorn.run('pyheatpump.app:application', host=host, port=int(port), log_level=log_level, reload=True) @cli.command() def fetch(): logger = logger_init() from pyheatpump import modbus var_types = VariableType.getall() # Analog - float analog = var_types['Analog'] logger.info('Read analog variables in registers [{}, {}]'.format( analog.start_address, analog.end_address )) res = modbus.read_holding_registers(analog.start_address, analog.end_address) for r in range(len(res)): var = Variable(**{ 'type': analog, 'address': r + analog.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Integer - int integer = var_types['Integer'] logger.info('Read integer variables in registers [{}, {}]'.format( integer.start_address, integer.end_address )) res = modbus.read_holding_registers(integer.start_address, integer.end_address) for r in range(len(res)): var = Variable(**{ 'type': integer, 'address': r + integer.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() # Digital - bool digital = var_types['Digital'] logger.info('Read digital variables in coils [{}, {}]'.format( digital.start_address, digital.end_address )) res = modbus.read_coils(digital.start_address, digital.end_address) for r in range(len(res)): var = Variable(**{ 'type': digital, 'address': r + digital.start_address}) if not var.exists(): logger.info('Insert variable {}:{}'.format( var.type, var.address)) var.insert() val = VariableValue(**{ 'type': var.type, 'address': var.address, 'value': res[r]}) val.insert() commit() logger.info('Successfully read all variables')