from __future__ import annotations
import time
from functools import wraps
from typing import Any, Callable, TypeVar
from qcodes import Parameter, VisaInstrument
from qcodes.parameters import Group, GroupParameter, create_on_off_val_mapping
_SOC = '.'
"""Start Of Command."""
_SOR = '#'
"""Start Of Response."""
_COMMAND_TIMEOUT = 0.5
"""The required timeout between commands."""
_ERROR_CODES = {'\x30': 'No Error',
'\x31': 'Checksum Error',
'\x32': 'Bad Command',
'\x33': 'Out of Bound Qualifier'}
_MODE_STATUS = {'\x30': 'Auto Start',
'\x31': 'Stand By',
'\x32': 'Chiller Run',
'\x33': 'Safety Default'}
_ALARM_STATUS = {'\x30': 'No Alarms',
'\x31': 'Alarm'}
_CHILLER_STATUS = {'\x30': 'OFF',
'\x31': 'ON'}
_DRYER_STATUS = {'\x30': 'OFF',
'\x31': 'ON'}
_T = TypeVar('_T')
def _ascii_checksum(s: str) -> str:
return f"{sum(s.encode('ascii')) & 0xFF:X}"
def _encode_cmd(fun: Callable[[Any, Any], _T]) -> Callable[[Any, Any], _T]:
"""Encodes a command into valid format by appending the checksum."""
@wraps(fun)
def wrapped(self, cmd: str) -> _T:
cmd = cmd.upper()
if not cmd.startswith(_SOC):
cmd = _SOC + cmd
return fun(self, cmd + _ascii_checksum(cmd))
return wrapped
def _command_timeout(fun: Callable[[Any, Any], _T]) -> Callable[[Any, Any], _T]:
"""Waits for the timeout before another command can be sent."""
@wraps(fun)
def wrapped(self, cmd: str) -> _T:
target = self._response_timestamp + _COMMAND_TIMEOUT
if (delay := target - (now := time.time())) > 0:
time.sleep(delay)
self._command_timestamp = now
result = fun(self, cmd)
self._response_timestamp = time.time()
return result
return wrapped
[docs]
class ThermotekT255p(VisaInstrument):
"""Driver for the Thermotek T255p laser chiller."""
def __init__(self, name: str, address: str, timeout: float = 5,
device_clear: bool = True, visalib: str | None = None,
pyvisa_sim_file: str | None = None, **kwargs: Any):
super().__init__(name, address, timeout, terminator='\r',
device_clear=device_clear, visalib=visalib,
pyvisa_sim_file=pyvisa_sim_file, **kwargs)
self._command_timestamp: float = 0
self._response_timestamp: float = 0
self.enabled = Parameter(
'enabled',
label='Chiller running',
set_cmd='G{}',
get_cmd=lambda: self._watchdog()[2],
val_mapping=create_on_off_val_mapping('1', '0'),
snapshot_get=False,
instrument=self
)
"""Mode Select (0: Stand By, 1: Run Mode)."""
self.temperature_setpoint = GroupParameter(
'temperature_setpoint',
label='Set point Temperature',
unit='°C',
# Drop first byte (command echo in this case). Need to check for
# type since for some reason group parameters pass the set value
# through the get_parser...
get_parser=lambda v: int(v[1:] if isinstance(v, str) else v) / 10,
set_parser=lambda v: int(v * 10),
snapshot_get=False,
instrument=self
)
self.max_power_setpoint = GroupParameter(
'max_power_setpoint',
label='Max Power Setting',
unit='W',
get_parser=float,
snapshot_get=False,
instrument=self
)
self.setpoints = Group(
[self.temperature_setpoint, self.max_power_setpoint],
set_cmd='M{temperature_setpoint:+d}',
get_cmd='H0'
)
self.manifold_temperature = Parameter(
'manifold_temperature',
label='Manifold temperature',
unit='°C',
get_cmd='I',
get_parser=lambda v: int(v) / 100,
snapshot_get=False,
instrument=self
)
self.temperature_sense_mode = Parameter(
'temperature_sense_mode',
label='External Temp Sense Mode',
get_cmd=False,
set_cmd='O{}',
val_mapping={'Internal': '0', 'External': '1'},
snapshot_get=False,
instrument=self
)
self.connect_message()
[docs]
@_encode_cmd
@_command_timeout
def write(self, cmd: str) -> None:
super().write(cmd)
# Flush read buffer since otherwise the write response will be returned
# the next time ask() is called
self.visa_handle.read()
[docs]
@_encode_cmd
@_command_timeout
def ask(self, cmd: str) -> str:
# A command has the following structure:
# 1B soc
# 1B command code
# nB n optional qualifier bytes (values passed to the device)
# 2B checksum
# The response has the following structure:
# 1B sor
# 1B command echo
# 1B comm error status
# mB m optional response bytes
# 2B checksum
response = super().ask(cmd)
if not response.startswith(_SOR + cmd[1]):
self.log.error(f'Communication failed. Response was {response}')
raise RuntimeError('Communication failed. Try again.')
if response[-2:] != _ascii_checksum(response[:-2]):
self.log.error(f'Checksum does not match. Response was {response}')
raise RuntimeError('Checksum does not match.')
if (status := _ERROR_CODES.get(response[2])) != 'No Error':
self.log.error(f'Error code {status}. Response was {response}')
raise RuntimeError(status)
return response[3:-2]
[docs]
def get_idn(self) -> dict[str, str | None]:
return {'vendor': 'Thermotek',
'model': 'T255p',
'serial': None,
'firmware': None}
def _watchdog(self) -> str:
return self.ask('U')
[docs]
def status(self) -> dict[str, str]:
response = self._watchdog()
return {'Mode Status': _MODE_STATUS[response[0]],
'Alarm Status': _ALARM_STATUS[response[1]],
'Chiller Status': _CHILLER_STATUS[response[2]],
'Dryer Status': _DRYER_STATUS[response[3]]}
[docs]
def alarm_state(self) -> dict[str, bool]:
flags = self.ask('J')
alarms = ['Float Switch', 'Hi Alarm', 'Lo Alarm', 'Sensor Alarm',
'EEPROM Fail', 'Watch dog']
return {alarm: bool(int(flag)) for alarm, flag in zip(alarms, flags)}