import re
import itertools
from time import sleep as sleep_s
from qcodes.instrument.parameter import DelegateParameter
from qcodes.instrument.visa import VisaInstrument
from qcodes.utils import validators
from pyvisa.errors import VisaIOError
from typing import (
Tuple, Sequence, List, Dict, Set, Union, Optional)
from packaging.version import parse
# Version 0.5.0
State = Sequence[Tuple[int, int]]
def _line_tap_split(input: str) -> Tuple[int, int]:
pair = input.split('!')
if len(pair) != 2:
raise ValueError(f'Expected channel pair, got {input}')
if not pair[0].isdecimal():
raise ValueError(f'Expected channel, got {pair[0]}')
if not pair[1].isdecimal():
raise ValueError(f'Expected channel, got {pair[1]}')
return int(pair[0]), int(pair[1])
[docs]
def channel_list_to_state(channel_list: str) -> State:
outer = re.match(r'\(@([0-9,:! ]*)\)', channel_list)
if not outer:
raise ValueError(f'Expected channel list, got {channel_list}')
result: List[Tuple[int, int]] = []
sequences = outer[1].split(',')
if sequences == ['']:
return result
for sequence in sequences:
limits = sequence.split(':')
if limits == ['']:
raise ValueError(f'Expected channel sequence, got {limits}')
line_start, tap_start = _line_tap_split(limits[0])
line_stop, tap_stop = line_start, tap_start
if len(limits) == 2:
line_stop, tap_stop = _line_tap_split(limits[1])
if len(limits) > 2:
raise ValueError(f'Expected channel sequence, got {limits}')
if tap_start != tap_stop:
raise ValueError(
f'Expected same breakout in sequence, got {limits}')
for line in range(line_start, line_stop+1):
result.append((line, tap_start))
return result
[docs]
def state_to_expanded_list(state: State) -> str:
return \
'(@' + \
','.join([f'{line}!{tap}' for (line, tap) in state]) + \
')'
[docs]
def state_to_compressed_list(state: State) -> str:
tap_to_line: Dict[int, Set[int]] = dict()
for line, tap in state:
tap_to_line.setdefault(tap, set()).add(line)
taps = list(tap_to_line.keys())
taps.sort()
intervals = []
for tap in taps:
start_line = None
end_line = None
lines = list(tap_to_line[tap])
lines.sort()
for line in lines:
if not start_line:
start_line = line
end_line = line
continue
if line == end_line + 1:
end_line = line
continue
if start_line == end_line:
intervals.append(f'{start_line}!{tap}')
else:
intervals.append(f'{start_line}!{tap}:{end_line}!{tap}')
start_line = line
end_line = line
if start_line == end_line:
intervals.append(f'{start_line}!{tap}')
else:
intervals.append(f'{start_line}!{tap}:{end_line}!{tap}')
return '(@' + ','.join(intervals) + ')'
[docs]
def expand_channel_list(channel_list: str) -> str:
return state_to_expanded_list(channel_list_to_state(channel_list))
[docs]
def compress_channel_list(channel_list: str) -> str:
return state_to_compressed_list(channel_list_to_state(channel_list))
relay_lines = 24
relays_per_line = 9
def _state_diff(before: State, after: State) -> Tuple[State, State, State]:
initial = frozenset(before)
target = frozenset(after)
return list(target - initial), list(initial - target), list(target)
[docs]
class QSwitch(VisaInstrument):
[docs]
def __init__(self, name: str, address: str, **kwargs) -> None:
"""Connect to a QSwitch
Args:
name (str): Name for instrument
address (str): Visa identification string
**kwargs: additional argument to the Visa driver
"""
self._check_instrument_name(name)
super().__init__(name, address, terminator='\n', **kwargs)
self._set_up_serial()
self._set_up_debug_settings()
self._set_up_simple_functions()
self.connect_message()
self._check_for_wrong_model()
self._check_for_incompatiable_firmware()
self._set_default_names()
self.state_force_update()
self.add_parameter(
name='state',
label='relays',
set_cmd=self._set_state,
get_cmd=self._get_state,
)
self.add_parameter(
name='closed_relays',
source=self.state,
set_parser=state_to_compressed_list,
get_parser=channel_list_to_state,
parameter_class=DelegateParameter,
snapshot_value=False,
)
self.add_parameter(
name='auto_save',
set_cmd='aut {0}'.format('{}'),
get_cmd='aut?',
get_parser=str,
vals=validators.Enum('on', 'off'),
snapshot_value=False,
)
self.add_parameter(
name='error_indicator',
set_cmd='beep:stat {0}'.format('{}'),
get_cmd='beep:stat?',
get_parser=str,
vals=validators.Enum('on', 'off'),
snapshot_value=False,
)
self._add_monitor_pseudo_parameters()
# -----------------------------------------------------------------------
# Instrument-wide functions
# -----------------------------------------------------------------------
[docs]
def reset(self) -> None:
self._write('*rst')
sleep_s(0.6)
self.state_force_update()
[docs]
def errors(self) -> str:
"""Retrieve and clear all previous errors
Returns:
str: Comma separated list of errors or '0, "No error"'
"""
return self.ask('all?')
[docs]
def error(self) -> str:
"""Retrieve next error
Returns:
str: The next error or '0, "No error"'
"""
return self.ask('next?')
[docs]
def state_force_update(self) -> None:
self._set_state_raw(self.ask('stat?'))
# -----------------------------------------------------------------------
# Direct manipulation of the relays
# -----------------------------------------------------------------------
[docs]
def close_relays(self, relays: State) -> None:
currently = channel_list_to_state(self._state)
union = list(itertools.chain(currently, relays))
self._effectuate(union)
[docs]
def close_relay(self, line: int, tap: int) -> None:
self.close_relays([(line, tap)])
[docs]
def open_relays(self, relays: State) -> None:
currently = frozenset(channel_list_to_state(self._state))
subtraction = frozenset(relays)
self._effectuate(list(currently - subtraction))
[docs]
def open_relay(self, line: int, tap: int) -> None:
self.open_relays([(line, tap)])
# -----------------------------------------------------------------------
# Manipulation by name
# -----------------------------------------------------------------------
OneOrMore = Union[str, Sequence[str]]
[docs]
def ground(self, lines: OneOrMore) -> None:
connections: List[Tuple[int, int]] = []
if isinstance(lines, str):
line = self._to_line(lines)
self.close_relay(line, 0)
taps = range(1, relays_per_line + 1)
connections = list(itertools.zip_longest([], taps, fillvalue=line))
self.open_relays(connections)
else:
numbers = map(self._to_line, lines)
grounds = list(itertools.zip_longest(numbers, [], fillvalue=0))
self.close_relays(grounds)
for tap in range(1, relays_per_line + 1):
connections += itertools.zip_longest(
map(self._to_line, lines), [], fillvalue=tap)
self.open_relays(connections)
[docs]
def connect(self, lines: OneOrMore) -> None:
if isinstance(lines, str):
self.close_relay(self._to_line(lines), 9)
self.open_relay(self._to_line(lines), 0)
else:
numbers = map(self._to_line, lines)
pairs = list(itertools.zip_longest(numbers, [], fillvalue=9))
self.close_relays(pairs)
numbers = map(self._to_line, lines)
connections = list(itertools.zip_longest(numbers, [], fillvalue=0))
self.open_relays(connections)
[docs]
def breakout(self, line: str, tap: str) -> None:
self.close_relay(self._to_line(line), self._to_tap(tap))
self.open_relay(self._to_line(line), 0)
[docs]
def arrange(self, breakouts: Optional[Dict[str, int]] = None,
lines: Optional[Dict[str, int]] = None) -> None:
"""An arrangement of names for lines and breakouts
Args:
breakouts (Dict[str, int]): Name/breakout pairs
lines (Dict[str, int]): Name/line pairs
"""
if lines:
for name, line in lines.items():
self._line_names[name] = line
if breakouts:
for name, tap in breakouts.items():
self._tap_names[name] = tap
# -----------------------------------------------------------------------
# Debugging and testing
[docs]
def start_recording_scpi(self) -> None:
"""Record all SCPI commands sent to the instrument
Any previous recordings are removed. To inspect the SCPI commands sent
to the instrument, call get_recorded_scpi_commands().
"""
self._scpi_sent: List[str] = list()
self._record_commands = True
[docs]
def get_recorded_scpi_commands(self) -> List[str]:
"""
Returns:
Sequence[str]: SCPI commands sent to the instrument
"""
commands = self._scpi_sent
self._scpi_sent = list()
return commands
[docs]
def clear_read_queue(self) -> Sequence[str]:
"""Flush the VISA message queue of the instrument
Takes at least _message_flush_timeout_ms to carry out.
Returns:
Sequence[str]: Messages lingering in queue
"""
lingering = list()
original_timeout = self.visa_handle.timeout
self.visa_handle.timeout = self._message_flush_timeout_ms
while True:
try:
message = self.visa_handle.read()
except VisaIOError:
break
else:
lingering.append(message)
self.visa_handle.timeout = original_timeout
return lingering
# -----------------------------------------------------------------------
# Override communication methods to make it possible to check for errors
# and to record the communication with the instrument.
[docs]
def write(self, cmd: str) -> None:
"""Send SCPI command to instrument
Args:
cmd (str): SCPI command
"""
try:
self._write(cmd)
self.ask('*opc?')
errors = super().ask('all?')
except Exception as error:
raise ValueError(f'Error: {repr(error)} after executing {cmd}')
if errors == '0,"No error"':
return
raise ValueError(f'Error: {errors} after executing {cmd}')
[docs]
def ask(self, cmd: str) -> str:
"""Send SCPI query to instrument
Args:
cmd (str): SCPI query
Returns:
str: SCPI answer
"""
if self._record_commands:
self._scpi_sent.append(cmd)
answer = super().ask(cmd)
return answer
# -----------------------------------------------------------------------
def _write(self, cmd: str) -> None:
if self._record_commands:
self._scpi_sent.append(cmd)
super().write(cmd)
def _channel_list_to_overview(self, channel_list: str) -> dict[str, List[str]]:
state = channel_list_to_state(channel_list)
line_names: dict[int, str] = dict()
for name, line in self._line_names.items():
line_names[line] = name
tap_names: dict[int, str] = dict()
for name, tap in self._tap_names.items():
tap_names[tap] = name
result: dict[str, List[str]] = dict()
for line, _ in state:
line_name = line_names[line]
result[line_name] = list()
for line, tap in state:
line_name = line_names[line]
if tap == 0:
result[line_name].append('grounded')
elif tap == 9:
result[line_name].append('connected')
else:
tap_name = f'breakout {tap_names[tap]}'
result[line_name].append(tap_name)
return result
def _to_line(self, name: str) -> int:
try:
return self._line_names[name]
except KeyError:
raise ValueError(f'Unknown line "{name}"')
def _to_tap(self, name: str) -> int:
try:
return self._tap_names[name]
except KeyError:
raise ValueError(f'Unknown tap "{name}"')
def _get_state(self) -> str:
self.state_force_update()
return self._state
def _set_state_raw(self, channel_list: str) -> None:
self._state = channel_list
def _set_state(self, channel_list: str) -> None:
self._effectuate(channel_list_to_state(channel_list))
def _effectuate(self, state: State) -> None:
currently = channel_list_to_state(self._state)
positive, negative, total = _state_diff(currently, state)
if positive:
self.write(f'clos {state_to_compressed_list(positive)}')
if negative:
self.write(f'open {state_to_compressed_list(negative)}')
self._set_state_raw(state_to_compressed_list(total))
def _set_up_debug_settings(self) -> None:
self._record_commands = False
self._scpi_sent = list()
self._message_flush_timeout_ms = 1
self._round_off = None
def _set_up_serial(self) -> None:
# No harm in setting the speed even if the connection is not serial.
self.visa_handle.baud_rate = 9600 # type: ignore
def _check_for_wrong_model(self) -> None:
model = self.IDN()['model']
if model != 'QSwitch':
raise ValueError(f'Unknown model {model}. Are you using the right'
' driver for your instrument?')
def _check_for_incompatiable_firmware(self) -> None:
firmware = self.IDN()['firmware']
least_compatible_fw = '0.178'
if parse(firmware) < parse(least_compatible_fw):
raise ValueError(f'Incompatible firmware {firmware}. You need at '
f'least {least_compatible_fw}')
def _set_up_simple_functions(self) -> None:
self.add_function('abort', call_cmd='abor')
def _set_default_names(self) -> None:
lines = range(1, relay_lines+1)
taps = range(1, relays_per_line)
self._line_names = dict(zip(map(str, lines), lines))
self._tap_names = dict(zip(map(str, taps), taps))
def _add_monitor_pseudo_parameters(self) -> None:
self.add_parameter(
name='overview',
source=self.state,
get_parser=self._channel_list_to_overview,
parameter_class=DelegateParameter,
snapshot_value=False,
)
def _check_instrument_name(self, name: str) -> None:
if name.isidentifier():
return
raise ValueError(
f'Instrument name "{name}" is incompatible with QCoDeS parameter '
'generation (no spaces, punctuation, prepended numbers, etc)')