Source code for qcodes.instrument_drivers.oxford.triton

import configparser
import logging
import re
from functools import partial
from time import sleep
from traceback import format_exc
from typing import Any, Dict, List, Optional, Union

from qcodes.instrument import IPInstrument
from qcodes.validators import Enum, Ints


[docs]class OxfordTriton(IPInstrument): r""" Triton Driver Args: tmpfile: Optional: an exported windows registry file from the registry path: `[HKEY_CURRENT_USER\Software\Oxford Instruments\Triton System Control\Thermometry]` and is used to extract the available temperature channels. Status: beta-version. TODO: fetch registry directly from fridge-computer """ def __init__( self, name: str, address: Optional[str] = None, port: Optional[int] = None, terminator: str = '\r\n', tmpfile: Optional[str] = None, timeout: float = 20, **kwargs: Any): super().__init__(name, address=address, port=port, terminator=terminator, timeout=timeout, **kwargs) self._heater_range_auto = False self._heater_range_temp = [0.03, 0.1, 0.3, 1, 12, 40] self._heater_range_curr = [0.316, 1, 3.16, 10, 31.6, 100] self._control_channel = 5 self.add_parameter(name='time', label='System Time', get_cmd='READ:SYS:TIME', get_parser=self._parse_time) self.add_parameter(name='action', label='Current action', get_cmd='READ:SYS:DR:ACTN', get_parser=self._parse_action) self.add_parameter(name='status', label='Status', get_cmd='READ:SYS:DR:STATUS', get_parser=self._parse_status) self.add_parameter(name='pid_control_channel', label='PID control channel', get_cmd=self._get_control_channel, set_cmd=self._set_control_channel, vals=Ints(1, 16)) self.add_parameter(name='pid_mode', label='PID Mode', get_cmd=partial(self._get_control_param, 'MODE'), set_cmd=partial(self._set_control_param, 'MODE'), val_mapping={'on': 'ON', 'off': 'OFF'}) self.add_parameter(name='pid_ramp', label='PID ramp enabled', get_cmd=partial(self._get_control_param, 'RAMP:ENAB'), set_cmd=partial(self._set_control_param, 'RAMP:ENAB'), val_mapping={'on': 'ON', 'off': 'OFF'}) self.add_parameter(name='pid_setpoint', label='PID temperature setpoint', unit='K', get_cmd=partial(self._get_control_param, 'TSET'), set_cmd=partial(self._set_control_param, 'TSET')) self.add_parameter(name='pid_rate', label='PID ramp rate', unit='K/min', get_cmd=partial(self._get_control_param, 'RAMP:RATE'), set_cmd=partial(self._set_control_param, 'RAMP:RATE')) self.add_parameter(name='pid_range', label='PID heater range', # TODO: The units in the software are mA, how to # do this correctly? unit='mA', get_cmd=partial(self._get_control_param, 'RANGE'), set_cmd=partial(self._set_control_param, 'RANGE'), vals=Enum(*self._heater_range_curr)) self.add_parameter(name='magnet_status', label='Magnet status', unit='', get_cmd=partial(self._get_control_B_param, 'ACTN')) self.add_parameter(name='magnet_sweeprate', label='Magnet sweep rate', unit='T/min', get_cmd=partial( self._get_control_B_param, 'RVST:RATE'), set_cmd=partial(self._set_control_magnet_sweeprate_param)) self.add_parameter(name='magnet_sweeprate_insta', label='Instantaneous magnet sweep rate', unit='T/min', get_cmd=partial(self._get_control_B_param, 'RFST')) self.add_parameter(name='B', label='Magnetic field', unit='T', get_cmd=partial(self._get_control_B_param, 'VECT')) self.add_parameter(name='Bx', label='Magnetic field x-component', unit='T', get_cmd=partial( self._get_control_Bcomp_param, 'VECTBx'), set_cmd=partial(self._set_control_Bx_param)) self.add_parameter(name='By', label='Magnetic field y-component', unit='T', get_cmd=partial( self._get_control_Bcomp_param, 'VECTBy'), set_cmd=partial(self._set_control_By_param)) self.add_parameter(name='Bz', label='Magnetic field z-component', unit='T', get_cmd=partial( self._get_control_Bcomp_param, 'VECTBz'), set_cmd=partial(self._set_control_Bz_param)) self.add_parameter(name='magnet_sweep_time', label='Magnet sweep time', unit='T/min', get_cmd=partial(self._get_control_B_param, 'RVST:TIME')) self.chan_alias: Dict[str, str] = {} self.chan_temp_names: Dict[str, Dict[str, Optional[str]]] = {} if tmpfile is not None: self._get_temp_channel_names(tmpfile) self._get_temp_channels() self._get_pressure_channels() try: self._get_named_channels() except Exception: logging.warning('Ignored an error in _get_named_channels\n' + format_exc()) self.connect_message()
[docs] def set_B(self, x: float, y: float, z: float, s: float) -> None: if 0 < s <= 0.2: self.write('SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:' + str(s) + ':VSET:[' + str(x) + ' ' + str(y) + ' ' + str(z) + ']\r\n') self.write('SET:SYS:VRM:ACTN:RTOS\r\n') t_wait = self.magnet_sweep_time() * 60 + 10 print('Please wait ' + str(t_wait) + ' seconds for the field sweep...') sleep(t_wait) else: print('Warning: set magnet sweep rate in range (0 , 0.2] T/min')
def _get_control_B_param( self, param: str ) -> Optional[Union[float, str, List[float]]]: cmd = f'READ:SYS:VRM:{param}' return self._get_response_value(self.ask(cmd)) def _get_control_Bcomp_param( self, param: str ) -> Optional[Union[float, str, List[float]]]: cmd = f'READ:SYS:VRM:{param}' return self._get_response_value(self.ask(cmd[:-2]) + cmd[-2:]) def _get_response(self, msg: str) -> str: return msg.split(':')[-1] def _get_response_value( self, msg: str ) -> Optional[Union[float, str, List[float]]]: msg = self._get_response(msg) if msg.endswith('NOT_FOUND'): return None elif msg.endswith('IDLE'): return 'IDLE' elif msg.endswith('RTOS'): return 'RTOS' elif msg.endswith('Bx'): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]) elif msg.endswith('By'): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1]) elif msg.endswith('Bz'): return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2]) elif len(re.findall(r"[-+]?\d*\.\d+|\d+", msg)) > 1: return [float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]), float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[1]), float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[2])] try: return float(re.findall(r"[-+]?\d*\.\d+|\d+", msg)[0]) except Exception: return msg
[docs] def get_idn(self) -> Dict[str, Optional[str]]: """ Return the Instrument Identifier Message """ idstr = self.ask('*IDN?') idparts = [p.strip() for p in idstr.split(':', 4)][1:] return dict(zip(('vendor', 'model', 'serial', 'firmware'), idparts))
def _get_control_channel(self, force_get: bool = False) -> int: # verify current channel if self._control_channel and not force_get: tempval = self.ask( f'READ:DEV:T{self._control_channel}:TEMP:LOOP:MODE') if not tempval.endswith('NOT_FOUND'): return self._control_channel # either _control_channel is not set or wrong for i in range(1, 17): tempval = self.ask(f'READ:DEV:T{i}:TEMP:LOOP:MODE') if not tempval.endswith('NOT_FOUND'): self._control_channel = i break return self._control_channel def _set_control_channel(self, channel: int) -> None: self._control_channel = channel self.write('SET:DEV:T{}:TEMP:LOOP:HTR:H1'.format( self._get_control_channel())) def _get_control_param( self, param: str ) -> Optional[Union[float, str, List[float]]]: chan = self._get_control_channel() cmd = f'READ:DEV:T{chan}:TEMP:LOOP:{param}' return self._get_response_value(self.ask(cmd)) def _set_control_param(self, param: str, value: float) -> None: chan = self._get_control_channel() cmd = f'SET:DEV:T{chan}:TEMP:LOOP:{param}:{value}' self.write(cmd) def _set_control_magnet_sweeprate_param(self, s: float) -> None: if 0 < s <= 0.2: x = round(self.Bx(), 4) y = round(self.By(), 4) z = round(self.Bz(), 4) self.write('SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:' + str(s) + ':VSET:[' + str(x) + ' ' + str(y) + ' ' + str(z) + ']\r\n') else: print( 'Warning: set sweeprate in range (0 , 0.2] T/min, not setting sweeprate') def _set_control_Bx_param(self, x: float) -> None: s = self.magnet_sweeprate() y = round(self.By(), 4) z = round(self.Bz(), 4) self.write('SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:' + str(s) + ':VSET:[' + str(x) + ' ' + str(y) + ' ' + str(z) + ']\r\n') self.write('SET:SYS:VRM:ACTN:RTOS\r\n') # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print('Please wait ' + str(t_wait) + ' seconds for the field sweep...') while self.magnet_status() != 'IDLE': pass def _set_control_By_param(self, y: float) -> None: s = self.magnet_sweeprate() x = round(self.Bx(), 4) z = round(self.Bz(), 4) self.write('SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:' + str(s) + ':VSET:[' + str(x) + ' ' + str(y) + ' ' + str(z) + ']\r\n') self.write('SET:SYS:VRM:ACTN:RTOS\r\n') # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print('Please wait ' + str(t_wait) + ' seconds for the field sweep...') while self.magnet_status() != 'IDLE': pass def _set_control_Bz_param(self, z: float) -> None: s = self.magnet_sweeprate() x = round(self.Bx(), 4) y = round(self.By(), 4) self.write('SET:SYS:VRM:COO:CART:RVST:MODE:RATE:RATE:' + str(s) + ':VSET:[' + str(x) + ' ' + str(y) + ' ' + str(z) + ']\r\n') self.write('SET:SYS:VRM:ACTN:RTOS\r\n') # just to give an time estimate, +10s for overhead t_wait = self.magnet_sweep_time() * 60 + 10 print('Please wait ' + str(t_wait) + ' seconds for the field sweep...') while self.magnet_status() != 'IDLE': pass def _get_named_channels(self) -> None: allchans_str = self.ask('READ:SYS:DR:CHAN') allchans = allchans_str.replace('STAT:SYS:DR:CHAN:', '', 1).split(':') for ch in allchans: msg = 'READ:SYS:DR:CHAN:%s' % ch rep = self.ask(msg) if 'INVALID' not in rep and 'NONE' not in rep: alias, chan = rep.split(':')[-2:] self.chan_alias[alias] = chan self.add_parameter(name=alias, unit='K', get_cmd='READ:DEV:%s:TEMP:SIG:TEMP' % chan, get_parser=self._parse_temp) def _get_pressure_channels(self) -> None: chan_pressure_list = [] for i in range(1, 7): chan = 'P%d' % i chan_pressure_list.append(chan) self.add_parameter(name=chan, unit='bar', get_cmd='READ:DEV:%s:PRES:SIG:PRES' % chan, get_parser=self._parse_pres) self.chan_pressure = set(chan_pressure_list) def _get_temp_channel_names(self, file: str) -> None: config = configparser.ConfigParser() with open(file, encoding='utf16') as f: next(f) config.read_file(f) for section in config.sections(): options = config.options(section) namestr = '"m_lpszname"' if namestr in options: chan_number = int(section.split('\\')[-1].split('[')[-1]) + 1 # the names used in the register file are base 0 but the api and the gui # uses base one names so add one chan = 'T' + str(chan_number) name = config.get(section, '"m_lpszname"').strip("\"") self.chan_temp_names[chan] = {'name': name, 'value': None} def _get_temp_channels(self) -> None: chan_temps_list = [] for i in range(1, 17): chan = 'T%d' % i chan_temps_list.append(chan) self.add_parameter(name=chan, unit='K', get_cmd='READ:DEV:%s:TEMP:SIG:TEMP' % chan, get_parser=self._parse_temp) self.chan_temps = set(chan_temps_list) def _parse_action(self, msg: str) -> str: """ Parse message and return action as a string Args: msg: message string Returns action: string describing the action """ action = msg[17:] if action == 'PCL': action = 'Precooling' elif action == 'EPCL': action = 'Empty precool loop' elif action == 'COND': action = 'Condensing' elif action == 'NONE': if self.MC.get() < 2: action = 'Circulating' else: action = 'Idle' elif action == 'COLL': action = 'Collecting mixture' else: action = 'Unknown' return action def _parse_status(self, msg: str) -> str: return msg[19:] def _parse_time(self, msg: str) -> str: return msg[14:] def _parse_temp(self, msg: str) -> Optional[float]: if 'NOT_FOUND' in msg: return None return float(msg.split('SIG:TEMP:')[-1].strip('K')) def _parse_pres(self, msg: str) -> Optional[float]: if 'NOT_FOUND' in msg: return None return float(msg.split('SIG:PRES:')[-1].strip('mB')) * 1e3 def _recv(self) -> str: return super()._recv().rstrip()
Triton = OxfordTriton """Alias for backwards compatibility"""