#!/usr/bin/env python # scripts/examples/simple_tcp_client.py import fcntl import sys import socket import struct from serial import Serial from serial.serialutil import SerialException from umodbus.client.serial import rtu import json from db.db import sql s = Serial('/dev/ttyUSB0',19200) try: s.open() except SerialException: print('already open') BOOLEAN=1000 FLOAT=500 INT=1250 def read_digital(): global req_adu global sql global s start_address, end_address = sql( f"SELECT start_address, end_address FROM var_type WHERE slabel LIKE 'D';") res = [] # digital - boolean req_adu = rtu.read_coils( slave_id=1, starting_address=1, quantity=1) resp = rtu.send_message(req_adu, s) address = -1 try: for address in range(0, BOOLEAN, 1): res[address+1] = (resp[address] > 0) except KeyError as e: print(e) res[address] = False except IndexError: print(address) return res def read_analog(): global req_adu global sql global s start_address, end_address = sql( f"SELECT start_address, end_address FROM var_type WHERE slabel LIKE 'A';") # analog - float for address in range(1, end_address - start_address, 125): req_adu = rtu.read_holding_registers( slave_id=1, starting_address=start_address + address, quantity=125) resp = rtu.send_message(req_adu, s) for n in range(125): try: res[address + n] = int(resp[n]) / 10 except Exception as e: print(e) res[address + n] = 0. return res def read_int(): global req_adu global sql global s res = [] start_address, end_address = sql( f"SELECT start_address, end_address FROM var_type WHERE slabel LIKE 'I';") # integer - int for address in range(1, end_address - start_address, 125): req_adu = rtu.read_coils( slave_id=1, starting_address=start_address + address, quantity=125) resp = rtu.send_message(req_adu, s) for n in range(125): try: res[address + n] = int(resp[n]) except KeyError as e: print(e) res[address + n] = 0 return res s.close() sys.exit(0)