Projet de remplacement du "RPiPasserelle" d'Otec.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

modbus.py 4.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #!/usr/bin/env python3
  2. import os
  3. from serial import Serial
  4. from umodbus.client.serial import rtu
  5. import umodbus
  6. from pyheatpump.config import config
  7. from pyheatpump.logger import logger_init
  8. logger = logger_init()
  9. serial_conn = None
  10. def connect():
  11. global serial_conn
  12. if serial_conn is None:
  13. real_serial_port = os.path.realpath(
  14. config.get('heatpump', 'serial_port'))
  15. logger.info('Connecting to serial port *%s* (%s)',
  16. config.get('heatpump', 'serial_port'),
  17. real_serial_port)
  18. serial_conn = Serial(
  19. port=real_serial_port,
  20. baudrate=config.get('heatpump', 'baudrate'),
  21. bytesize=8,
  22. parity='N',
  23. stopbits=1,
  24. timeout=20)
  25. if serial_conn.open is False:
  26. logger.debug('Opening serial port *%s*',
  27. config.get('heatpump', 'serial_port'))
  28. serial_conn.open()
  29. return serial_conn
  30. def read_coils(start, end):
  31. global serial_con
  32. connect()
  33. res = []
  34. address = -1
  35. qty = -1
  36. logger.info('read_coils: [%s, %s]', start, end)
  37. try:
  38. for address in range(start, end + 1, 125):
  39. qty = 125 if (end - address) >= 125 else (end - address)
  40. if not qty:
  41. break
  42. req_adu = rtu.read_coils(
  43. slave_id=1,
  44. starting_address=address,
  45. quantity=qty)
  46. response = rtu.send_message(req_adu, serial_conn)
  47. res.extend(response)
  48. except umodbus.exceptions.IllegalDataAddressError as e:
  49. logger.error(e)
  50. logger.debug('address:%s qty:%s', address, qty)
  51. logger.debug('read_coils [%s, %s] result: %s', start, end, res)
  52. return res
  53. def read_holding_registers(start, end):
  54. global serial_conn
  55. connect()
  56. res = []
  57. address = -1
  58. qty = -1
  59. logger.debug('read_holding_registers [%s, %s]',
  60. start, end
  61. )
  62. try:
  63. for address in range(start, end + 1, 125):
  64. qty = 125 if (end - address) >= 125 else (end - address)
  65. if not qty:
  66. break
  67. req_adu = rtu.read_holding_registers(
  68. slave_id=1,
  69. starting_address=address,
  70. quantity=qty)
  71. response = rtu.send_message(req_adu, serial_conn)
  72. res.extend(response)
  73. except umodbus.exceptions.IllegalDataAddressError as e:
  74. logger.error('read_holding_registers(%s, %s): address:%s, qty:%s\n%s',
  75. start, end, address, qty, e)
  76. logger.debug('read_holding_registers [%s, %s] result: %s', start, end, res)
  77. return res
  78. def write_coil(var_value):
  79. global serial_conn
  80. connect()
  81. casted_value = int(var_value.value) * 0xFF00
  82. logger.debug('write_coil address: %s, value: %s',
  83. var_value.address, var_value.value
  84. )
  85. try:
  86. req_adu = rtu.write_single_coil(
  87. slave_id=1,
  88. address=var_value.address,
  89. value=casted_value)
  90. response = rtu.send_message(req_adu, serial_conn)
  91. logger.debug('write_coil address: %s, response: %s',
  92. var_value.address, response)
  93. if response != var_value.value:
  94. return False
  95. return True
  96. except Exception as e:
  97. raise e
  98. def write_holding_register(var_value):
  99. global serial_conn
  100. connect()
  101. logger.debug('write_holding_register address: %s, value: %s',
  102. var_value.address, var_value.value
  103. )
  104. try:
  105. casted_value = (int(var_value.value) + (1 << 16)
  106. if (var_value.value < 0)
  107. else int(var_value.value))
  108. req_adu = rtu.write_single_register(
  109. slave_id=1,
  110. address=var_value.address,
  111. value=casted_value)
  112. response = rtu.send_message(req_adu, serial_conn)
  113. logger.debug('write_holding_register, addres: %s, response: %s',
  114. var_value.address, response)
  115. if response != var_value.value:
  116. return False
  117. return True
  118. except Exception as e:
  119. raise e