import serial import time import sys import datetime CMD_ERROR = 1 CMD_ONLINE_VALUES = 2 CMD_DATETIME = 6 CMD_SERIAL_NUMBER = 8 CMD_FIRMWARE = 9 debug = False if len(sys.argv) > 1: debug_flag = sys.argv[1] if debug_flag == "debug": debug = True s = serial.Serial( port='/dev/ttyUSB0', baudrate=9600, parity=serial.PARITY_NONE, bytesize=serial.EIGHTBITS, stopbits=serial.STOPBITS_ONE, timeout=5 ) def get_data(address, command, block): checksum = address + command + block request = [0, address, command, block, checksum] s.write(bytearray(request)) s.flush() if debug: print("Sending request: {0}".format(request)) needed_bytes = 13 if command == 8: needed_bytes = 24 answer = s.read_until(size=needed_bytes) if debug: print("Answer received: {0}".format(answer)) pretty_print(answer) return answer def pretty_print(answer): output = [int(x) for x in answer] print("Pretty: {0} len={1}".format(output,len(output))) def validate_checksum(answer, command): length = len(answer) if command == 8: length = 13 sum = 0 for i in range(0, length-1): sum += answer[i] given_checksum = answer[length-1] calculated_checksum = sum % 256 if debug: print("Given checksum: {0}, Calculcated checksum: {1}".format(given_checksum, calculated_checksum)) return given_checksum == calculated_checksum def parseOnlineValues(answer1): answer = [chr(x) for x in answer1] udc = ord(answer[0]) * 2.8 +100 idc = ord(answer[1]) * 0.08 uac = ord(answer[2]) + 100.0 iac = ord(answer[3]) * 0.120 t = ord(answer[4]) - 40.0 s = ord(answer[5]) * 6.0 pdc = int(udc * idc) / 1000 pac = int(uac * iac) / 1000 wd = (ord(answer[6]) * 256 + ord(answer[7])) / 1000 wtot = ord(answer[8]) * 256 + ord(answer[9]) values = {"udc":udc, "idc":idc, "uac":uac, "iac":iac, "t":t, "s":s, "pdc":pdc, "pac":pac, "wd":wd, "wtot":wtot} def getOnlineValues(address): answer = get_data(address, CMD_ONLINE_VALUES, 1) if validate_checksum(answer, CMD_ONLINE_VALUES): return parseOnlineValues(answer) return [] def parseSerialNumber(answer1): answer = [chr(x) for x in answer1] return "".join(answer[:-2]) def getSerialNumber(address): answer = get_data(address, CMD_SERIAL_NUMBER, 1) if validate_checksum(answer, CMD_SERIAL_NUMBER): return parseSerialNumber(answer) return [] def parseFirmware(answer1): answer = [chr(x) for x in answer1] return "".join(answer[:-2]) def getFirmware(address): answer = get_data(address, CMD_FIRMWARE, 1) if validate_checksum(answer, CMD_FIRMWARE): return parseFirmware(answer) return [] def parseDateTime(answer1): answer = [ord(chr(x)) for x in answer1] date = datetime.datetime(2000 + answer[0], answer[1], answer[2], answer[3], answer[4], 0, 0) return date def getDateTime(address): answer = get_data(address, CMD_DATETIME, 1) if validate_checksum(answer, CMD_DATETIME): return parseDateTime(answer) return [] def parseError(answer1): answer = [int(x) for x in answer1] #return "".join(answer[:-2]) return answer def getError(address): answer = get_data(address, CMD_ERROR, 1) if validate_checksum(answer, CMD_ERROR): return parseError(answer) return [] for address in range(1,2): print("Data for inverter at address {0}.".format(address)) print("Serialnumber: {0}".format(getSerialNumber(address))) print("Firmware: {0}".format(getFirmware(address))) # print("Error: {0}".format(getError(address))) # print("DateTime: {0}".format(getDateTime(address))) # print("Online Values: {0}".format(getOnlineValues(address)))