123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- #!/usr/bin/env python3
- # scripts/examples/simple_tcp_client.py
- import fcntl
- import sys
- import socket
- import struct
- from serial import Serial
- from serial.serialutil import SerialException
- from umodbus.client.serial import rtu
- import json
-
- from pyheatpump.db import sql
- from pyheatpump.conf import config
- config.set('heatpump', 'database', '/home/emixam/src/otec/pyHeatpump/db/pyheatpump.db')
-
- s = Serial('/dev/ttyUSB0',19200)
- try:
- s.open()
- except SerialException:
- print('already open')
-
- BOOLEAN=1000
- FLOAT=500
- INT=1250
-
- def read_digital():
- global req_adu
- global sql
- global s
-
- start_address, end_address = sql(
- "SELECT start_address, end_address FROM var_type WHERE slabel LIKE 'D'")
-
- res = []
-
- # digital - boolean
- req_adu = rtu.read_coils(
- slave_id=1,
- starting_address=1,
- quantity=1)
-
- resp = rtu.send_message(req_adu, s)
-
- address = -1
- try:
- for address in range(0, BOOLEAN, 1):
- res[address+1] = (resp[address] > 0)
- except KeyError as e:
- print(e)
- res[address] = False
- except IndexError:
- print(address)
-
- return res
-
- def read_analog():
- global req_adu
- global sql
- global s
-
- start_address, end_address = sql(
- """
- SELECT start_address, end_address FROM var_type
- WHERE slabel LIKE 'A';
- """
- )
-
- # analog - float
- for address in range(1, end_address - start_address, 125):
- req_adu = rtu.read_holding_registers(
- slave_id=1,
- starting_address=start_address + address,
- quantity=125)
-
- resp = rtu.send_message(req_adu, s)
-
- for n in range(125):
- try:
- res[address + n] = int(resp[n]) / 10
- except Exception as e:
- print(e)
- res[address + n] = 0.
-
- return res
-
- def read_int():
- global req_adu
- global sql
- global s
-
- res = []
-
- start_address, end_address = 1, 1000
- #next(sql(
- """
- SELECT start_address, end_address FROM var_type
- WHERE slabel LIKE 'I'
- """
- #))
-
- # integer - int
- req_adu = rtu.read_holding_registers(
- slave_id=1,
- starting_address=start_address,
- quantity=125)
- resp = rtu.send_message(req_adu, s)
- for r in resp:
- print(r)
-
- return res
-
-
- read_int()
-
- s.close()
-
- sys.exit(0)
|