Projet de remplacement du "RPiPasserelle" d'Otec.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

modbus.py 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python3
  2. from serial import Serial
  3. from serial.serialutil import SerialException
  4. from umodbus.client.serial import rtu
  5. import umodbus
  6. from pprint import pprint
  7. from .config import config
  8. from .models import *
  9. serial_conn = None
  10. def connect():
  11. global serial_conn
  12. if serial_conn is None:
  13. print('Connecting to serial port *{}*'.format(
  14. config.get('heatpump', 'serial_port')))
  15. serial_conn = Serial(
  16. port=config.get('heatpump', 'serial_port'),
  17. baudrate=config.get('heatpump', 'baudrate'),
  18. bytesize=8,
  19. parity='N',
  20. stopbits=1,
  21. timeout=10)
  22. if serial_conn.open is False:
  23. print('Opening serial port')
  24. serial_conn.open()
  25. return serial_conn
  26. def read_coils(start, end):
  27. global serial_con
  28. connect()
  29. res = []
  30. address = -1
  31. qty = -1
  32. try:
  33. for address in range(start, end + 1, 125):
  34. qty = 125 if (end - address) >= 125 else (end - address)
  35. if not qty:
  36. break
  37. req_adu = rtu.read_coils(
  38. slave_id=1,
  39. starting_address=address,
  40. quantity=qty)
  41. resp = rtu.send_message(req_adu, serial_conn)
  42. res.extend(resp)
  43. except umodbus.exceptions.IllegalDataAddressError as e:
  44. print(e)
  45. print(f'{address} {qty}')
  46. return res
  47. def read_holding_registers(start, end):
  48. global serial_conn
  49. connect()
  50. res = []
  51. address = -1
  52. qty = -1
  53. try:
  54. for address in range(start, end + 1, 125):
  55. qty = 125 if (end - address) >= 125 else (end - address)
  56. if not qty:
  57. break
  58. req_adu = rtu.read_holding_registers(
  59. slave_id=1,
  60. starting_address=address,
  61. quantity=qty)
  62. resp = rtu.send_message(req_adu, serial_conn)
  63. res.extend(resp)
  64. except umodbus.exceptions.IllegalDataAddressError as e:
  65. print(e)
  66. print(f'{address} {qty}')
  67. return res
  68. def write_coil(var_value: VariableValue):
  69. global serial_conn
  70. connect()
  71. try:
  72. req_adu = rtu.write_single_coil(
  73. slave_id=1,
  74. address=var_value.address,
  75. value=var_value.value)
  76. return resp.send_message(req_adu, serial_conn)
  77. except Exception as e:
  78. raise e
  79. def write_holding_register(var_value: VariableValue):
  80. global serial_conn
  81. connect()
  82. try:
  83. req_adu = rtu.write_single_register(
  84. slave_id=1,
  85. address=var_value.address,
  86. value=var_value.value)
  87. return resp.send_message(req_adu, serial_conn)
  88. except Exception as e:
  89. raise e
  90. if __name__ == '__main__':
  91. resp = read_holding_registers(1, 5000)
  92. #pprint(resp)
  93. print(len(resp))
  94. resp = read_coils(1, 2000)
  95. #pprint(resp)
  96. print(len(resp))