import serial
import serial.tools.list_ports
import time
import json
import socket
import numpy as np
import subprocess
import os
import logging
[docs]
class Labphox:
_logger = logging.getLogger("libphox")
def __init__(self, port=None, debug=False, IP=None, cmd_logging=False, SN=None, HW_val=False):
self.debug = debug
self.time_out = 5
if self.debug or cmd_logging:
self.log = True
self.logging_dir = r'\logging'
self.logger_init(self._logger)
else:
self.log = False
self.SW_version = 3
self.board_SN = SN
self.board_FW = None
self.adc_ref = 3.3
self.N_channel = 0
self.COM_port = None
self.ETH_HOST = None # The server's IP address
self.ETH_PORT = 7 # The port used by the server
self.ETH_buff_size = 1024
self.communication_handler_sleep_time = 0
self.packet_handler_sleep_time = 0
if IP:
self.USB_or_ETH = 2 # 1 for USB, 2 for ETH
self.ETH_HOST = IP # The server's IP address
self.ETH_PORT = 7 # The port used by the server
self.ETH_buff_size = 1024
else:
self.USB_or_ETH = 1 # 1 for USB, 2 for ETH
self.COM_port = port
self.connect(HW_val=HW_val)
[docs]
def connect(self, HW_val=True):
if self.USB_or_ETH == 1:
if self.COM_port:
# TODO
pass
elif self.board_SN:
for device in serial.tools.list_ports.comports():
if device.pid == 1812:
try:
self.serial_com = serial.Serial(device.device)
if self.board_SN == self.utility_cmd('sn'):
self.COM_port = device.device
self.PID = device.pid
self.serial_com.close()
break
self.serial_com.close()
except Exception as error:
pass
# print('Port' + str(device.device) + ' is already in use:', error)
else:
for device in serial.tools.list_ports.comports():
if device.pid == 1812:
self.PID = device.pid
self.COM_port = device.device
if self.debug:
for i in device:
print(i)
try:
self.serial_com = serial.Serial(self.COM_port)
self.board_info = ''
self.name = ''
self.board_SN = None
self.utility_cmd('info')
print('Connected to ' + self.name + ' on COM port ' + self.COM_port + ', PID:',
str(self.PID) + ', SerialN: ' + str(self.board_SN) + ', Channels:' + str(self.N_channel))
print(self.HW, ', FW_Ver.', self.board_FW)
except:
print('ERROR: Couldn\'t connect via serial')
elif self.USB_or_ETH == 2:
socket.setdefaulttimeout(self.time_out)
self.board_info = ''
self.name = ''
self.board_SN = None
self.utility_cmd('info')
print('Connected to ' + self.name + ', IP:',
str(self.ETH_HOST) + ', SerialN: ' + str(self.board_SN) + ', Channels:' + str(self.N_channel))
print(self.HW, ', FW_Ver.', self.board_FW)
if not self.board_SN:
raise Exception(
"Couldn\'t connect, please check that the device is properly connected or try providing a valid SN, COM port or IP number")
elif self.board_FW != self.SW_version and HW_val:
print("Board Firmware version and Software version are not up to date, Board FW=" + str(
self.board_FW) + " while SW=" + str(self.SW_version))
[docs]
def disconnect(self):
if self.USB_or_ETH == 1:
self.serial_com.close()
elif self.USB_or_ETH == 2:
pass
[docs]
def write(self, cmd):
if self.log:
pass
#self.logging('actions', cmd)
if self.USB_or_ETH == 1:
self.serial_com.write(cmd)
else:
pass
[docs]
def read(self, size):
if self.USB_or_ETH == 1:
data_back = self.serial_com.read(size)
else:
data_back = ''
return data_back
[docs]
def read_buffer(self):
return self.read(self.input_buffer())
[docs]
def decode_buffer(self):
return list(self.read_buffer())
[docs]
def debug_func(self, cmd, reply):
print('Command', cmd)
print('Reply', reply)
print('')
# self._logger.debug(f'Debug: {cmd}')
self._logger.info(f'Debug: {cmd}')
self._logger.debug(f'Command: {cmd}')
self._logger.debug(f'Reply: {reply}')
[docs]
def logger_init(self, logger_instance, outfolder=None):
logger_instance.setLevel(logging.DEBUG)
if outfolder is None:
outfolder = os.path.realpath('.') + self.logging_dir
os.makedirs(name=outfolder, exist_ok=True)
date_fmt = "%d/%m/%Y %H:%M:%S"
# remove all old handlers
for hdlr in logger_instance.handlers[:]:
logger_instance.removeHandler(hdlr)
# INFO level logger
# file logger
fmt = "[%(asctime)s] [%(levelname)s] %(message)s"
log_format = logging.Formatter(fmt=fmt, datefmt=date_fmt)
info_handler = logging.FileHandler(os.path.join(outfolder, 'info.log'))
info_handler.setFormatter(log_format)
info_handler.setLevel(logging.INFO)
logger_instance.addHandler(info_handler)
# DEBUG level logger
fmt = "[%(asctime)s] [%(levelname)s] [%(funcName)s(): line %(lineno)s] %(message)s"
log_format = logging.Formatter(fmt=fmt, datefmt=date_fmt)
debug_handler = logging.FileHandler(os.path.join(outfolder, 'debug.log'))
debug_handler.setFormatter(log_format)
debug_handler.setLevel(logging.DEBUG)
logger_instance.addHandler(debug_handler)
# _logger = logging.getLogger("libphox")
return logger_instance
[docs]
def read_line(self):
if self.USB_or_ETH == 1:
return self.serial_com.readline()
else:
return ''
[docs]
def query_line(self, cmd):
self.write(cmd)
if self.USB_or_ETH == 1:
return self.serial_com.readline()
else:
return ''
[docs]
def compare_cmd(self, cmd1, cmd2):
if cmd1.upper() == cmd2.upper():
return True
else:
return False
[docs]
def encode(self, value):
return str(value).encode()
[docs]
def decode_simple_response(self, response):
return response.decode('UTF-8').strip()
[docs]
def parse_response(self):
##time.sleep(1)
reply = ''
initial_time = time.time()
end = False
while not end:
time.sleep(self.communication_handler_sleep_time)
if self.input_buffer():
reply += self.read_buffer().decode()
if ';' in reply:
end = True
elif (time.time() - initial_time) > self.time_out:
raise Exception("LABPHOX time out exceeded", self.time_out, 's')
reply = reply.split(';')[0]
response = {'reply': reply, 'command': reply.split(':')[:-2], 'value': reply.split(':')[-1]}
if self.log:
self.logging('received', reply)
return response
[docs]
def TCP_communication_handler(self, encoded_cmd=None):
reply = ''
with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as TCP_connection:
TCP_connection.connect((self.ETH_HOST, self.ETH_PORT))
TCP_connection.sendall(encoded_cmd)
packet = TCP_connection.recv(self.ETH_buff_size)
try:
reply += packet.decode()
except:
print('Invalid packet character', packet)
reply = reply.split(';')[0]
return reply
[docs]
def UDP_communication_handler(self, encoded_cmd=None):
reply = ''
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as UDP_connection:
UDP_connection.sendto(encoded_cmd, (self.ETH_HOST, self.ETH_PORT))
end = False
while not end:
# time.sleep(self.communication_handler_sleep_time)
packet = UDP_connection.recvfrom(self.ETH_buff_size)[0]
if b';' in packet:
reply += packet.split(b';')[0].decode()
end = True
else:
try:
reply += packet.decode()
except:
print('Invalid packet character', packet)
break
return reply
[docs]
def USB_communication_handler(self, encoded_cmd=None):
reply = ''
self.flush_input_buffer()
self.write(encoded_cmd)
initial_time = time.time()
end = False
while not end:
time.sleep(self.communication_handler_sleep_time)
if self.input_buffer():
reply += self.read_buffer().decode()
if ';' in reply:
end = True
elif (time.time() - initial_time) > self.time_out:
raise Exception("LABPHOX time out exceeded", self.time_out, 's')
reply = reply.split(';')[0]
return reply
[docs]
def standard_reply_parser(self, cmd, reply):
response = {'reply': reply, 'command': reply.split(':')[:-1], 'value': reply.split(':')[-1]}
if not self.validate_reply(cmd, response):
self.raise_value_mismatch(cmd, response)
return response
[docs]
def communication_handler(self, cmd, standard=True, is_encoded=False):
response = ''
if is_encoded:
encoded_cmd = cmd
else:
encoded_cmd = cmd.encode()
if self.USB_or_ETH == 1:
reply = self.USB_communication_handler(encoded_cmd)
elif self.USB_or_ETH == 2:
reply = self.UDP_communication_handler(encoded_cmd)
elif self.USB_or_ETH == 3:
reply = self.TCP_communication_handler(encoded_cmd)
else:
raise Exception("Invalid communication options USB_or_ETH=", self.USB_or_ETH)
try:
if standard:
if is_encoded:
response = self.standard_reply_parser(cmd=cmd.decode(), reply=reply)
else:
response = self.standard_reply_parser(cmd=cmd, reply=reply)
else:
response = reply
except:
print('Reply Error', reply)
if self.debug:
self.debug_func(cmd, response)
return response
[docs]
def validate_reply(self, cmd, response):
stripped = cmd.strip(';').split(':')
command = stripped[:-1]
value = stripped[-1]
match = True
if command != response['command']:
match = False
return match
[docs]
def USB_packet_handler(self, encoded_cmd, end_sequence):
reply = b''
self.flush_input_buffer()
self.write(encoded_cmd)
initial_time = time.time()
end = False
while not end:
time.sleep(self.packet_handler_sleep_time)
if self.input_buffer():
reply += self.read_buffer()
if end_sequence in reply[-5:]:
end = True
elif (time.time() - initial_time) > self.time_out:
raise Exception("LABPHOX time out exceeded", self.time_out, 's')
reply = reply.replace(end_sequence, b'').replace(encoded_cmd, b'')
return reply
[docs]
def UDP_packet_handler(self, encoded_cmd, end_sequence):
reply = b''
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s:
s.sendto(encoded_cmd, (self.ETH_HOST, self.ETH_PORT))
end = False
while not end:
time.sleep(self.packet_handler_sleep_time)
packet = s.recvfrom(self.ETH_buff_size)[0]
reply += packet
if end_sequence in reply[-5:]:
end = True
reply = reply.replace(end_sequence, b'').replace(encoded_cmd, b'')
return reply[7:]
[docs]
def packet_handler(self, cmd, end_sequence=b'\x00\xff\x00\xff'):
encoded_cmd = cmd.encode()
if self.USB_or_ETH == 1:
reply = self.USB_packet_handler(encoded_cmd, end_sequence)
return reply
elif self.USB_or_ETH == 2:
reply = self.UDP_packet_handler(encoded_cmd, end_sequence)
return reply
[docs]
def raise_value_mismatch(self, cmd, response):
print('Command mismatch!')
print('Command:', cmd)
print('Reply:', response['command'])
[docs]
def utility_cmd(self, cmd, value=0):
response = False
if self.compare_cmd(cmd, 'info'):
self.name = self.utility_cmd('name').upper()
if 'LabP'.upper() in self.name:
self.HW = self.utility_cmd('hw')
self.board_SN = self.utility_cmd('sn')
self.board_FW = int(self.utility_cmd('fw').split('.')[-1])
self.N_channel = int(self.utility_cmd('channels').split()[1])
elif self.compare_cmd(cmd, 'name'):
response = self.communication_handler('W:2:A:;', standard=False)
elif self.compare_cmd(cmd, 'fw'):
response = self.communication_handler('W:2:B:;', standard=False)
elif self.compare_cmd(cmd, 'hw'):
response = self.communication_handler('W:2:D:;', standard=False)
elif self.compare_cmd(cmd, 'sn'):
response = self.communication_handler('W:2:E:;', standard=False)
elif self.compare_cmd(cmd, 'channels'):
response = self.communication_handler('W:2:F:;', standard=False)
elif self.compare_cmd(cmd, 'connected'):
response = self.communication_handler('W:2:C:;')
return response['value']
elif self.compare_cmd(cmd, 'UID'):
response = self.communication_handler('W:2:G:' + str(value) + ';')
return response['value']
elif self.compare_cmd(cmd, 'sleep'):
response = self.communication_handler('W:2:S:' + str(value) + ';')
return response
[docs]
def DAC_cmd(self, cmd, DAC=1, value=0):
response = None
if DAC == 1:
sel_DAC = 5
elif DAC == 2:
sel_DAC = 8
else:
return None
if self.compare_cmd(cmd, 'on'):
response = self.communication_handler('W:' + str(sel_DAC) + ':T:1;')
elif self.compare_cmd(cmd, 'off'):
response = self.communication_handler('W:' + str(sel_DAC) + ':T:0;')
elif self.compare_cmd(cmd, 'set'):
response = self.communication_handler('W:' + str(sel_DAC) + ':S:' + str(value) + ';')
elif self.compare_cmd(cmd, 'buffer'):
response = self.communication_handler('W:' + str(sel_DAC) + ':B:' + str(value) + ';')
return response
[docs]
def application_cmd(self, cmd, value=0):
response = False
if self.compare_cmd(cmd, 'pulse'):
##self.serial_com.flushInput()
##response = self.communication_handler('W:3:T:' + str(value) + ';', standard=False)
response = self.packet_handler('W:3:T:' + str(value) + ';')
return np.fromstring(response, dtype=np.uint8)
elif self.compare_cmd(cmd, 'acquire'):
response = self.communication_handler('W:3:Q:' + str(value) + ';')
elif self.compare_cmd(cmd, 'voltage'):
response = self.communication_handler('W:3:V:' + str(value) + ';')
elif self.compare_cmd(cmd, 'test_circuit'):
response = self.communication_handler('W:3:P:' + str(value) + ';')
return response
[docs]
def timer_cmd(self, cmd, value=0):
response = False
if self.compare_cmd(cmd, 'duration'):
response = self.communication_handler('W:0:A:' + str(value) + ';')
if int(response['value']) != int(value):
self.raise_value_mismatch(cmd, response)
if self.compare_cmd(cmd, 'sampling'):
response = self.communication_handler('W:0:S:' + str(value) + ';')
return response
[docs]
def ADC_cmd(self, cmd, value=0):
response = None
if self.compare_cmd(cmd, 'channel'):
response = self.communication_handler('W:4:C:' + str(value) + ';')
elif self.compare_cmd(cmd, 'start'):
response = self.communication_handler('W:4:T:1;')
elif self.compare_cmd(cmd, 'stop'):
response = self.communication_handler('W:4:T:0;')
elif self.compare_cmd(cmd, 'select'): ##Select and sample
response = self.communication_handler('W:4:S:' + str(value) + ';')
elif self.compare_cmd(cmd, 'get'):
response = self.communication_handler('W:4:G:;')
return int(response['value'])
elif self.compare_cmd(cmd, 'interrupt'): ##Enable interrupt mode
response = self.communication_handler('W:4:I:' + str(value) + ';')
elif self.compare_cmd(cmd, 'buffer'): ##Enable interrupt mode
response = self.communication_handler('W:4:B:' + str(value) + ';')
return int(response['value'])
return response
[docs]
def ADC3_cmd(self, cmd, value=0):
response = None
if self.compare_cmd(cmd, 'channel'):
response = self.communication_handler('W:W:C:' + str(value) + ';')
elif self.compare_cmd(cmd, 'start'):
response = self.communication_handler('W:W:T:1;')
elif self.compare_cmd(cmd, 'stop'):
response = self.communication_handler('W:W:T:0;')
elif self.compare_cmd(cmd, 'select'): ##Select and sample
response = self.communication_handler('W:W:S:' + str(value) + ';')
elif self.compare_cmd(cmd, 'get'):
response = self.communication_handler('W:W:G:;')
return int(response['value'])
return response
[docs]
def gpio_cmd(self, cmd, value=0):
response = None
if self.compare_cmd(cmd, 'EN_3V3'):
response = self.communication_handler('W:1:A:' + str(value) + ';')
elif self.compare_cmd(cmd, 'EN_5V'):
response = self.communication_handler('W:1:B:' + str(value) + ';')
elif self.compare_cmd(cmd, 'EN_CHGP'):
response = self.communication_handler('W:1:C:' + str(value) + ';')
elif self.compare_cmd(cmd, 'FORCE_PWR_EN'):
response = self.communication_handler('W:1:D:' + str(value) + ';')
elif self.compare_cmd(cmd, 'PWR_EN'):
response = self.communication_handler('W:1:E:' + str(value) + ';')
elif self.compare_cmd(cmd, 'DCDC_EN'):
response = self.communication_handler('W:1:F:' + str(value) + ';')
elif self.compare_cmd(cmd, 'CHOPPING_EN'):
response = self.communication_handler('W:1:G:' + str(value) + ';')
elif self.compare_cmd(cmd, 'PWR_STATUS'):
response = self.communication_handler('W:1:H:0;')
return int(response['value'])
elif self.compare_cmd(cmd, 'OCP_OUT_STATUS'):
response = self.communication_handler('W:1:I:0;')
return int(response['value'])
return response
[docs]
def IO_expander_cmd(self, cmd, port='A', value=0):
response = None
if self.compare_cmd(cmd, 'connect'):
response = self.communication_handler('W:' + str(port) + ':C:' + str(value) + ';')
elif self.compare_cmd(cmd, 'disconnect'):
response = self.communication_handler('W:' + str(port) + ':D:' + str(value) + ';')
elif self.compare_cmd(cmd, 'on'):
response = self.communication_handler('W:6:O:' + str(value) + ';')
elif self.compare_cmd(cmd, 'off'):
response = self.communication_handler('W:6:U:' + str(value) + ';')
elif self.compare_cmd(cmd, 'type'):
response = self.communication_handler('W:6:S:' + str(value) + ';')
return response
[docs]
def reset_cmd(self, cmd):
response = None
if self.compare_cmd(cmd, 'reset'):
response = self.communication_handler('W:7:R:;')
elif self.compare_cmd(cmd, 'boot'):
response = self.communication_handler('W:7:B:;')
elif self.compare_cmd(cmd, 'soft_reset'):
response = self.communication_handler('W:7:S:;')
return response
[docs]
def logging(self, list_name, cmd):
with open('history.json', "r") as history_file:
data = json.load(history_file)
if type(cmd) == str:
data_to_append = cmd
elif type(cmd) == bytes:
data_to_append = cmd.decode()
if list_name in data.keys():
data[list_name].append({'data': data_to_append, 'date': time.time()})
else:
data[list_name] = [{'data': data_to_append, 'date': time.time()}]
with open('history.json', "w") as file:
json.dump(data, file)
[docs]
def ETHERNET_cmd(self, cmd, value=0):
response = None
if self.compare_cmd(cmd, 'read'):
response = self.communication_handler('W:Q:R:' + str(value) + ';')
elif self.compare_cmd(cmd, 'set_ip'):
response = self.communication_handler('W:Q:I:' + str(value) + ';')
elif self.compare_cmd(cmd, 'get_ip'):
response = self.communication_handler('W:Q:G:' + str(value) + ';')
elif self.compare_cmd(cmd, 'set_ip_str'):
int_IP = int.from_bytes(socket.inet_aton(value), "little")
response = self.communication_handler('W:Q:I:' + str(int_IP) + ';')
elif self.compare_cmd(cmd, 'get_ip_str'):
response = self.communication_handler('W:Q:G:' + str(value) + ';')
IP = socket.inet_ntoa(int(response['value']).to_bytes(4, 'little'))
print('IP:', IP)
return IP
elif self.compare_cmd(cmd, 'set_mask_str'):
int_mask = int.from_bytes(socket.inet_aton(value), "little")
response = self.communication_handler('W:Q:K:' + str(int_mask) + ';')
elif self.compare_cmd(cmd, 'get_mask_str'):
response = self.communication_handler('W:Q:L:' + str(value) + ';')
print(response)
mask = socket.inet_ntoa(int(response['value']).to_bytes(4, 'little'))
print('Subnet mask:', mask)
return mask
elif self.compare_cmd(cmd, 'get_detection'):
response = self.communication_handler('W:Q:D:;')
return response
[docs]
def UPGRADE_cmd(self, cmd, value):
response = None
if self.compare_cmd(cmd, 'upgrade'):
response = self.communication_handler('U:A:0' + ':' + str(value) + ';')
if int(response['value']) == value:
print('Update successful,', value, 'channels enabled')
else:
print('Couldn\'t update channel number')
elif self.compare_cmd(cmd, 'stream_key'):
for idx, element in enumerate(value):
response = self.communication_handler('U:B:' + str(chr(65 + idx)) + ':' + str(element) + ';')
if int(response['value']) != element:
print('Error while sending key!')
return response
[docs]
def FLASH_utils(self, path=None):
DFU_name = '0483:df11'
found = False
process = subprocess.Popen(['.\Firmware\dfu-util', '-l'], shell=True,
stdout=subprocess.PIPE,
universal_newlines=True)
start_time = time.time()
exc_time = 0
output = ''
while exc_time < self.time_out:
output = process.stdout.readline()
if 'Internal Flash' in output and 'Found DFU: [' + DFU_name + ']' in output:
found = True
break
exc_time = time.time() - start_time
if not found:
print('Couldn\'t find the the device')
else:
print('Device found in', output.strip().strip('Found DFU: '))
if not path:
path = '.'
process = subprocess.Popen('.\Firmware\dfu-util -d ' + DFU_name + ' -a 0 -s 0x08000000:leave -D ' + path + '\Firmware\Labphox.bin', shell=True,
stdout=subprocess.PIPE,
universal_newlines=True)
start_time = time.time()
exc_time = 0
poll = process.poll()
while poll is None:
output = process.stdout.readline()
if 'Download' in output or 'device'.upper() in output.upper() or 'dfu'.upper() in output.upper() and 'bugs' not in output:
print(output.strip())
exc_time = time.time() - start_time
if exc_time > self.time_out:
break
else:
poll = process.poll()
print('Flashing time', round(exc_time, 2), 's')
print('Flash ended! Please disconnect the device.')
if __name__ == "__main__":
labphox = Labphox(debug=True, IP='192.168.1.101')