123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- #!/usr/bin/env python3
- # builtins
- import click
- from datetime import datetime
- import importlib
- import json
- import os
- import re
- import sys
- import time
- import urllib
- import uvicorn
- import requests
-
- from pyheatpump.logger import logger_init
- from pyheatpump.models import *
-
- CONTEXT_SETTINGS={
- 'default_map':{'run': {}}
- }
-
- @click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS)
- @click.option('--version', is_flag=True)
- @click.pass_context
- def cli(ctx, version):
- """
- if version:
- from pyheatpump import version
- return click.echo(pyheatpump.version())
- """
- pass
-
-
- @click.option('--host', default=None)
- @click.option('--port', default=None)
- @cli.command()
- def run(host, port):
- logger = logger_init()
-
- from .config import (API_HOST, API_PORT)
-
- if not host:
- host = API_HOST
-
- if not port:
- port = API_PORT
-
- log_level = 'info'
-
- click.echo('Launching PyHeatpump application')
-
- uvicorn.run('pyheatpump.app:application',
- host=host,
- port=int(port),
- log_level=log_level,
- workers=1)
-
-
- @click.option('--type', '-t', default=None, multiple=True)
- @cli.command()
- def fetch(type):
- logger = logger_init()
-
- from pyheatpump import modbus
-
- try:
- if type is None:
- var_types = VariableType.getall()
- else:
- var_types = {}
- for label, var_type in VariableType.getall().items():
- if label in type or var_type.slabel in type:
- var_types[label] = var_type
-
- # Analog - float
- if 'Analog' in var_types.keys():
- analog = var_types['Analog']
- logger.info('Read analog variables in registers [{}, {}]'.format(
- analog.start_address, analog.end_address
- ))
- res = modbus.read_holding_registers(analog.start_address, analog.end_address)
-
- logger.debug(f'analog length : {len(res)}')
- for r in range(len(res)):
- var = Variable(**{
- 'type': analog,
- 'address': r + analog.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
-
- # Integer - int
- if 'Integer' in var_types.keys():
- integer = var_types['Integer']
- logger.info('Read integer variables in registers [{}, {}]'.format(
- integer.start_address, integer.end_address
- ))
- res = modbus.read_holding_registers(integer.start_address, integer.end_address)
-
- logger.debug(f'integer length : {len(res)}')
- for r in range(len(res)):
- var = Variable(**{
- 'type': integer,
- 'address': r + integer.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
- # Digital - bool
- if 'Digital' in var_types.keys():
- digital = var_types['Digital']
- logger.info('Read digital variables in coils [{}, {}]'.format(
- digital.start_address, digital.end_address
- ))
- res = modbus.read_coils(digital.start_address, digital.end_address)
-
- logger.debug(f'digital length : {len(res)}')
- for r in range(len(res)):
- var = Variable(**{
- 'type': digital,
- 'address': r + digital.start_address})
-
- if not var.exists():
- logger.info('Insert variable {}:{}'.format(
- var.type, var.address))
- var.insert()
-
- val = VariableValue(**{
- 'type': var.type,
- 'address': var.address,
- 'value': res[r]})
- val.insert()
-
- logger.info('Successfully read all variables')
- except Exception as exc:
- logger.error(exc)
-
- if modbus.serial_conn:
- modbus.serial_conn.close()
-
-
-
- @click.option('--since', is_flag=True)
- @cli.command()
- def supervise(since):
- logger = logger_init()
-
- from .config import config, mac_address_init, get_last_update, set_last_update
- mac_address = config.get('heatpump','mac_address')
- if not re.match('^(([0-9a-f]{2}):?){6}$', mac_address, re.I):
- mac_address = mac_address_init()
-
- from .models.heatpump import Heatpump
-
- if not since:
- last_update = 0
- else:
- last_update = get_last_update()
-
- post_packets = [
- Heatpump(mac_address, last_update, ['Analog']).packet,
- Heatpump(mac_address, last_update, ['Digital']).packet,
- Heatpump(mac_address, last_update, ['Integer']).packet
- ]
-
- base_url = {
- 'scheme':config.get('supervisor', 'scheme'),
- 'hostname':config.get('supervisor', 'host'),
- 'port':config.getint('supervisor', 'port')
- }
-
- build_url = lambda d: '{scheme}://{hostname}:{port}{path}'.format(**d)
-
-
- post_url = {
- **base_url,
- **{'path': config.get('supervisor', 'post_path')}
- }
-
- logger.info(build_url(post_url))
-
- for packet in post_packets:
- logger.debug('Will send %s', packet)
- post_resp = requests.post(
- url=build_url(post_url),
- json=packet,
- verify=False,
- timeout=200
- )
- time.sleep(5)
- if post_resp.status_code == 200:
- logger.info('POST to supervisor succeeded')
- continue
-
- logger.info('POST to supervisor failed')
-
- set_last_update(int(datetime.now().strftime('%s')))
-
- get_path = '/'.join((
- config.get('supervisor', 'get_path'),
- Heatpump(mac_address).macformat
- ))
- get_url = {
- **base_url,
- **{'path': get_path}
- }
-
- get_resp = requests.get(
- url=build_url(get_url),
- verify=False
- )
-
- control_data = get_resp.json()
- if Heatpump(mac_address).control(control_data):
- logger.info('GET to supervisor succeded : updated values')
- set_last_update(int(datetime.now().strftime('%s')))
- else:
- logger.warn('Unable to set data from supervisor\n{}'.format(control_data))
|