# QCoDeS driver for the QDevil QDAC using channels
# Adapted by QDevil from the qdev QDac driver in qcodes
# Version 2.2 QDevil 2023-02-20
import logging
import time
from collections import namedtuple
from enum import Enum
from functools import partial
from typing import Any, Dict, Optional, Sequence, Tuple, Union
import pyvisa
import pyvisa.constants
from pyvisa.resources.serial import SerialInstrument
from qcodes import validators as vals
from qcodes.instrument import ChannelList, InstrumentChannel, VisaInstrument
from qcodes.parameters import MultiChannelInstrumentParameter, ParamRawDataType
LOG = logging.getLogger(__name__)
_ModeTuple = namedtuple('_ModeTuple', 'v i')
[docs]
class Mode(Enum):
"""
Enum type use as the mode parameter for channels
defining the combined voltage and current range.
get_label() returns a text representation of the mode.
"""
vhigh_ihigh = _ModeTuple(v=0, i=1)
vhigh_ilow = _ModeTuple(v=0, i=0)
vlow_ilow = _ModeTuple(v=1, i=0)
[docs]
def get_label(self) -> str:
_MODE_LABELS = {
"vhigh_ihigh": "V range high / I range high",
"vhigh_ilow": "V range high / I range low",
"vlow_ilow": "V range low / I range low"}
return _MODE_LABELS[self.name]
[docs]
class Generator:
# Class used in the internal book keeping of generators
def __init__(self, generator_number: int):
self.fg = generator_number
self.t_end = 9.9e9
[docs]
class QDacChannel(InstrumentChannel):
"""
A single output channel of the QDac.
Exposes chan.v, chan.i, chan.mode, chan.slope,
chan.sync, chan.sync_delay, chan.sync_duration.\n
NB: Set v to zero before changing mode if the
mode_force lfag is False (default).
"""
[docs]
def __init__(self, parent: "QDac", name: str, channum: int):
"""
Args:
parent: The instrument to which the channel belongs.
name: The name of the channel
channum: The number of the channel (1-24 or 1-48)
"""
super().__init__(parent, name)
# Add the parameters
self.add_parameter(name='v',
label=f'Channel {channum} voltage',
unit='V',
set_cmd=partial(self._parent._set_voltage, channum),
get_cmd=partial(self._parent._get_voltage, channum),
get_parser=float,
# Initial range. Updated on init and during
# operation:
vals=vals.Numbers(-9.99, 9.99)
)
self.add_parameter(name='mode',
label=f'Channel {channum} mode.',
set_cmd=partial(self._parent._set_mode, channum),
get_cmd=None,
vals=vals.Enum(*list(Mode))
)
self.add_parameter(name='i',
label=f'Channel {channum} current',
get_cmd=f'get {channum}',
unit='A',
get_parser=self._parent._current_parser
)
self.add_parameter(name='slope',
label=f'Channel {channum} slope',
unit='V/s',
set_cmd=partial(self._parent._setslope, channum),
get_cmd=partial(self._parent._getslope, channum),
vals=vals.MultiType(vals.Enum('Inf'),
vals.Numbers(1e-3, 10000))
)
self.add_parameter(name='sync',
label=f'Channel {channum} sync output',
set_cmd=partial(self._parent._setsync, channum),
get_cmd=partial(self._parent._getsync, channum),
vals=vals.Ints(0, 4) # Updated at qdac init
)
self.add_parameter(name='sync_delay',
label=f'Channel {channum} sync pulse delay',
unit='s',
get_cmd=None, set_cmd=None,
vals=vals.Numbers(0, 10000),
initial_value=0
)
self.add_parameter(
name='sync_duration',
label=f'Channel {channum} sync pulse duration',
unit='s',
get_cmd=None, set_cmd=None,
vals=vals.Numbers(0.001, 10000),
initial_value=0.01
)
[docs]
def snapshot_base(
self,
update: Optional[bool] = False,
params_to_skip_update: Optional[Sequence[str]] = None
) -> Dict[Any, Any]:
update_currents = self._parent._update_currents and update
if update and not self._parent._get_status_performed:
self._parent._update_cache(update_currents=update_currents)
# call update_cache rather than getting the status individually for
# each parameter. This is only done if _get_status_performed is False
# this is used to signal that the parent has already called it and
# no need to repeat.
if params_to_skip_update is None:
params_to_skip_update = ('v', 'i', 'mode')
snap = super().snapshot_base(
update=update,
params_to_skip_update=params_to_skip_update)
return snap
[docs]
class QDacMultiChannelParameter(MultiChannelInstrumentParameter):
"""
The class to be returned by __getattr__ of the ChannelList. Here customised
for fast multi-readout of voltages.
"""
def __init__(self, channels: Sequence[InstrumentChannel],
param_name: str,
*args: Any,
**kwargs: Any):
super().__init__(channels, param_name, *args, **kwargs)
[docs]
def get_raw(self) -> Tuple[ParamRawDataType, ...]:
"""
Return a tuple containing the data from each of the channels in the
list.
"""
# For voltages, we can do something slightly faster than the naive
# approach by asking the instrument for a channel overview.
if self._param_name == 'v':
qdac = self._channels[0]._parent
qdac._update_cache(update_currents=False)
output = tuple(chan.parameters[self._param_name].cache()
for chan in self._channels)
else:
output = tuple(chan.parameters[self._param_name].get()
for chan in self._channels)
return output
[docs]
class QDac(VisaInstrument):
"""
Channelised driver for the QDevil QDAC voltage source.
Exposes channels, temperature sensors and calibration output,
and 'ramp_voltages' + 'ramp_voltages_2d' for multi channel ramping.
In addition a 'mode_force' flag (default False) is exposed.
'mode_force' (=True) is used to enable voltage range switching, via
the channel 'mode' parameter, even at non-zero output voltages.
Tested with Firmware Version: 1.07
The driver assumes that the instrument is ALWAYS in verbose mode OFF
and sets this as part of the initialization, so please do not change this.
"""
# set nonzero value (seconds) to accept older status when reading settings
max_status_age = 1
[docs]
def __init__(self,
name: str,
address: str,
update_currents: bool = False,
**kwargs: Any):
"""
Instantiates the instrument.
Args:
name: The instrument name used by qcodes
address: The VISA name of the resource
update_currents: Whether to query all channels for their
current sensor value on startup, which takes about 0.5 sec
per channel. Default: False.
Returns:
QDac object
"""
super().__init__(name, address, **kwargs)
handle = self.visa_handle
self._get_status_performed = False
assert isinstance(handle, SerialInstrument)
# Communication setup + firmware check
handle.baud_rate = 460800
handle.parity = pyvisa.constants.Parity(0)
handle.data_bits = 8
self.set_terminator('\n')
handle.write_termination = '\n'
self._write_response = ''
firmware_version = self._get_firmware_version()
if firmware_version < 1.07:
LOG.warning(f"Firmware version: {firmware_version}")
raise RuntimeError('''
No QDevil QDAC detected or the firmware version is obsolete.
This driver only supports version 1.07 or newer. Please
contact info@qdevil.com for a firmware update.
''')
# Initialse basic information and internal book keeping
self.num_chans = self._get_number_of_channels()
num_boards = int(self.num_chans/8)
self._output_n_lines = self.num_chans + 2
self._chan_range = range(1, 1 + self.num_chans)
self.channel_validator = vals.Ints(1, self.num_chans)
self._reset_bookkeeping()
# Add channels (and channel parameters)
channels = ChannelList(self, "Channels", QDacChannel,
snapshotable=False,
multichan_paramclass=QDacMultiChannelParameter)
for i in self._chan_range:
channel = QDacChannel(self, f'chan{i:02}', i)
channels.append(channel)
self.add_submodule(f"ch{i:02}", channel)
self.add_submodule("channels", channels.to_channel_tuple())
# Updatechannel sync port validator according to number of boards
self._num_syns = max(num_boards-1, 1)
for chan in self._chan_range:
self.channels[chan-1].sync.vals = vals.Ints(0, self._num_syns)
# Add non-channel parameters
for board in range(num_boards):
for sensor in range(3):
label = f'Board {board}, Temperature {sensor}'
self.add_parameter(name=f'temp{board}_{sensor}',
label=label,
unit='C',
get_cmd=f'tem {board} {sensor}',
get_parser=self._num_verbose)
self.add_parameter(name='cal',
set_cmd='cal {}',
vals=vals.Ints(0, self.num_chans))
self.add_parameter(name='mode_force',
label='Mode force',
get_cmd=None, set_cmd=None,
vals=vals.Bool(),
initial_value=False)
# Due to a firmware bug in 1.07 voltage ranges are always reported
# vebosely. So for future compatibility we set verbose True
self.write('ver 1')
self._update_voltage_ranges()
# The driver require verbose mode off except for the above command
self.write('ver 0')
self._verbose = False # Just so that the code can check the state
self.connect_message()
LOG.info('[*] Querying all channels for voltages and currents...')
self._update_cache(update_currents=update_currents)
self._update_currents = update_currents
self._load_state()
LOG.info('[+] Done')
def _reset_bookkeeping(self) -> None:
"""
Resets all internal variables used for ramping and
synchronization outputs.
"""
# Assigned slopes. Entries will eventually be {chan: slope}
self._slopes: Dict[int, Union[str, float]] = {}
# Function generators and triggers (used in ramping)
self._fgs = set(range(1, 9))
self._assigned_fgs: Dict[int, Generator] = {} # {chan: fg}
self._trigs = set(range(1, 10))
self._assigned_triggers: Dict[int, int] = {} # {fg: trigger}
# Sync channels
self._syncoutputs: Dict[int, int] = {} # {chan: syncoutput}
def _load_state(self) -> None:
"""
Used as part of initiaisation. DON'T use _load_state() separately.\n
Updates internal book keeping of running function generators.
used triggers and active sync outputs.\n
Slopes can not be read/updated as it is not possible to
say if a generator is running because a slope has been assigned
or because it is being ramped direcly (by e.g. ramp_voltages_2d()).
"""
# Assumes that all variables and virtual
# parameters have been initialised (and read)
self.write('ver 0') # Just to be on the safe side
self._reset_bookkeeping()
for ch_idx in range(self.num_chans):
chan = ch_idx + 1
# Check if the channels are being ramped
# It is not possible to find out if it has a slope assigned
# as it may be ramped explicitely by the user
# We assume that generators are running, but we cannot know
self.write(f'wav {chan}')
fg_str, amplitude_str, offset_str = self._write_response.split(',')
amplitude = float(amplitude_str)
offset = float(offset_str)
fg = int(fg_str)
if fg in range(1, 9):
voltage = self.channels[ch_idx].v.get()
time_now = time.time()
self.write(f'fun {fg}')
response = self._write_response.split(',')
waveform = int(response[0])
# Probably this driver is involved if a stair case is assigned
if waveform == Waveform.staircase:
if len(response) == 6:
step_length_ms, no_steps, rep, rep_remain_str, trigger \
= response[1:6]
rep_remain = int(rep_remain_str)
else:
step_length_ms, no_steps, rep, trigger = response[1:5]
rep_remain = int(rep)
ramp_time = 0.001 * float(step_length_ms) * int(no_steps)
ramp_remain = 0
if (amplitude != 0):
ramp_remain = (amplitude+offset-voltage)/amplitude
if int(rep) == -1:
time_end = time_now + 315360000
else:
time_end = (ramp_remain + max(0, rep_remain-1)) \
* ramp_time + time_now + 0.001
else:
if waveform == Waveform.sine:
period_ms, rep, rep_remain_str, trigger = response[1:5]
else:
period_ms, _, rep, rep_remain_str, trigger = response[1:6]
if int(rep) == -1:
time_end = time_now + 315360000 # 10 years from now
else: # +1 is just a safe guard
time_end = time_now + 0.001 \
* (int(rep_remain_str)+1) * float(period_ms)
self._assigned_fgs[chan] = Generator(fg)
self._assigned_fgs[chan].t_end = time_end
if int(trigger) != 0:
self._assigned_triggers[fg] = int(trigger)
for syn in range(1, self._num_syns+1):
self.write(f'syn {syn}')
syn_fg, delay_ms, duration_ms = \
self._write_response.split(',')
if int(syn_fg) == fg:
self.channels[ch_idx].sync.cache.set(syn)
self.channels[ch_idx].sync_delay(float(delay_ms)/1000)
self.channels[ch_idx].sync_duration(
float(duration_ms)/1000)
[docs]
def reset(self, update_currents: bool = False) -> None:
"""
Resets the instrument setting all channels to zero output voltage
and all parameters to their default values, including removing any
assigned sync putputs, function generators, triggers etc.
"""
# In case the QDAC has been switched off/on
# clear the io buffer and set verbose False
self.device_clear()
self.write('ver 0')
self.cal(0)
# Resetting all slopes first will cause v.set() disconnect generators
self.channels[0:self.num_chans].slope('Inf')
self.channels[0:self.num_chans].v(0)
self.channels[0:self.num_chans].mode(Mode.vhigh_ihigh)
self.channels[0:self.num_chans].sync(0)
self.channels[0:self.num_chans].sync_delay(0)
self.channels[0:self.num_chans].sync_duration(0.01)
if update_currents:
self.channels[0:self.num_chans].i.get()
self.mode_force(False)
self._reset_bookkeeping()
[docs]
def snapshot_base(
self,
update: Optional[bool] = False,
params_to_skip_update: Optional[Sequence[str]] = None
) -> Dict[Any, Any]:
update_currents = self._update_currents and update is True
if update:
self._update_cache(update_currents=update_currents)
self._get_status_performed = True
# call _update_cache rather than getting the status individually for
# each parameter. We set _get_status_performed to True
# to indicate that each update channel does not need to call this
# function as opposed to when snapshot is called on an individual
# channel
snap = super().snapshot_base(
update=update,
params_to_skip_update=params_to_skip_update)
self._get_status_performed = False
return snap
#########################
# Channel gets/sets
#########################
def _get_voltage(self, chan: int) -> str:
"""
Clear the output from the instrument and ask for the current voltage
Args:
chan (int): The 1-indexed channel number
"""
self.clear_read_queue()
self.write(f'set {chan}')
return self._write_response
def _set_voltage(self, chan: int, v_set: float) -> None:
"""
set_cmd for the chXX_v parameter
Args:
chan: The 1-indexed channel number
v_set: The target voltage
If a finite slope has been assigned, a function generator will
ramp the voltage.
"""
slope = self._slopes.get(chan, None)
if not slope:
# Should not be necessary to wav here.
self.write('wav {ch} 0 0 0;set {ch} {voltage:.6f}'
.format(ch=chan, voltage=v_set))
return
# We need .get and not cache/get_latest in case a ramp
# was interrupted
v_start = self.channels[chan-1].v.get()
v_span = v_set - v_start
v_amplitude = abs(v_span)
s_duration = v_amplitude / slope
LOG.info(f'Slope: {slope}, time: {s_duration}')
if v_amplitude <= 10:
# SYNCing happens inside ramp_voltages
self.ramp_voltages([chan], [v_start], [v_set], s_duration)
return
# Divide sweep into two parts
v_half_span = v_span / 2
s_half_duration = s_duration / 2
v_half_way = v_start + v_half_span
self.ramp_voltages([chan], [v_start], [v_half_way], s_half_duration)
LOG.warning('Trying to ramp more than 10 volts. '
'Waiting for first ramp to finish')
time.sleep(s_half_duration)
self.ramp_voltages([chan], [v_half_way], [v_set], s_half_duration)
def _set_mode(self, chan: int, new_mode: Mode) -> None:
"""
set_cmd for the QDAC's mode (combined voltage and current sense range).
It is not possible to switch from voltage range without setting the
the voltage to zero first or set the global mode_force parameter True.
"""
def _clipto(value: float, min_: float, max_: float) -> float:
errmsg = ("Voltage is outside the bounds of the new voltage range"
" and is therefore clipped.")
if value > max_:
LOG.warning(errmsg)
return max_
elif value < min_:
LOG.warning(errmsg)
return min_
else:
return value
# It is not possible ot say if the channel is connected to
# a generator, so we need to ask.
def wav_or_set_msg(chan: int, new_voltage: float) -> str:
self.write(f'wav {chan}')
fw_str = self._write_response
gen, _, _ = fw_str.split(',')
if int(gen) > 0:
# The amplitude must be set to zero to avoid potential overflow
# Assuming that voltage range is not changed during a ramp
return 'wav {} {} {:.6f} {:.6f}'\
.format(chan, int(gen), 0, new_voltage)
else:
return f'set {chan} {new_voltage:.6f}'
old_mode = self.channels[chan-1].mode.cache()
new_vrange = new_mode.value.v
old_vrange = old_mode.value.v
new_irange = new_mode.value.i
old_irange = old_mode.value.i
message = ''
max_zero_voltage = {0: 20e-6, 1: 3e-6}
NON_ZERO_VOLTAGE_MSG = (
'Please set the voltage to zero before changing the voltage'
' range in order to avoid jumps or spikes.'
' Or set mode_force=True to allow voltage range change for'
' non-zero voltages.')
if old_mode == new_mode:
return
# If the voltage range is going to change we have to take care of
# setting the voltage after the switch, and therefore read it first
# We also need to make sure than only one of the voltage/current
# relays is on at a time (otherwise the firmware will enforce it).
if (new_irange != old_irange) and (new_vrange == old_vrange == 0):
# Only the current sensor relay has to switch:
message += f'cur {chan} {new_irange}'
# The voltage relay (also) has to switch:
else:
# Current sensor relay on->off before voltage relay off->on:
if new_irange < old_irange and new_vrange > old_vrange:
message += f'cur {chan} {new_irange};'
old_voltage = self.channels[chan-1].v.get()
# Check if voltage is non-zero and mode_force is off
if ((self.mode_force() is False) and
(abs(old_voltage) > max_zero_voltage[old_vrange])):
raise ValueError(NON_ZERO_VOLTAGE_MSG)
new_voltage = _clipto(
old_voltage, self.vranges[chan][new_vrange]['Min'],
self.vranges[chan][new_vrange]['Max'])
message += f'vol {chan} {new_vrange};'
message += wav_or_set_msg(chan, new_voltage)
# Current sensor relay off->on after voltage relay on->off:
if new_irange > old_irange and new_vrange < old_vrange:
message += f';cur {chan} {new_irange}'
self.channels[chan-1].v.vals = self._v_vals(chan, new_vrange)
self.channels[chan-1].v.cache.set(new_voltage)
self.write(message)
def _v_vals(self, chan: int, vrange_int: int) -> vals.Numbers:
"""
Returns the validator for the specified voltage range.
"""
return vals.Numbers(self.vranges[chan][vrange_int]['Min'],
self.vranges[chan][vrange_int]['Max'])
def _update_v_validators(self) -> None:
"""
Command for setting all 'v' limits ('vals') of all channels to the
actual calibrated output limits for the range each individual channel
is currently in.
"""
for chan in range(1, self.num_chans+1):
vrange = self.channels[chan-1].mode.value.v
self.channels[chan-1].v.vals = self._v_vals(chan, vrange)
def _num_verbose(self, s: str) -> float:
"""
Turns a return value from the QDac into a number.
If the QDac is in verbose mode, this involves stripping off the
value descriptor.
"""
if self._verbose:
s = s.split(': ')[-1]
return float(s)
def _current_parser(self, s: str) -> float:
"""
Parser for chXX_i parameter (converts from uA to A)
"""
return 1e-6*self._num_verbose(s)
def _update_cache(self, update_currents: bool = False) -> None:
"""
Function to query the instrument and get the status of all channels.
Takes a while to finish.
The `status` call generates 27 or 51 lines of output. Send the command
and read the first one, which is the software version line
the full output looks like:
Software Version: 1.07\r\n
Channel\tOut V\t\tVoltage range\tCurrent range\n
\n
8\t 0.000000\t\tX 1\t\tpA\n
7\t 0.000000\t\tX 1\t\tpA\n
... (all 24/48 channels like this)
(no termination afterward besides the \n ending the last channel)
"""
irange_trans = {'hi cur': 1, 'lo cur': 0}
vrange_trans = {'X 1': 0, 'X 0.1': 1}
# Status call, check the
version_line = self.ask('status')
if version_line.startswith('Software Version: '):
self.version = version_line.strip().split(': ')[1]
else:
self._wait_and_clear()
raise ValueError('unrecognized version line: ' + version_line)
# Check header line
header_line = self.read()
headers = header_line.lower().strip('\r\n').split('\t')
expected_headers = ['channel', 'out v', '', 'voltage range',
'current range']
if headers != expected_headers:
raise ValueError('unrecognized header line: ' + header_line)
chans_left = set(self._chan_range)
while chans_left:
line = self.read().strip()
if not line:
continue
chanstr, v, _, vrange, _, irange = line.split('\t')
chan = int(chanstr)
vrange_int = int(vrange_trans[vrange.strip()])
irange_int = int(irange_trans[irange.strip()])
mode = Mode((vrange_int, irange_int))
self.channels[chan-1].mode.cache.set(mode)
self.channels[chan-1].v.cache.set(float(v))
self.channels[chan-1].v.vals = self._v_vals(chan, vrange_int)
chans_left.remove(chan)
if update_currents:
for chan in self._chan_range:
self.channels[chan-1].i.get()
def _setsync(self, chan: int, sync: int) -> None:
"""
set_cmd for the chXX_sync parameter.
Args:
chan: The channel number (1-48 or 1-24)
sync: The associated sync output (1-3 on 24 ch units
or 1-5 on 48 ch units). 0 means 'unassign'
"""
if chan not in range(1, self.num_chans+1):
raise ValueError(
f'Channel number must be 1-{self.num_chans}.')
if sync == 0:
oldsync = self.channels[chan-1].sync.cache()
# try to remove the sync from internal bookkeeping
self._syncoutputs.pop(chan, None)
# free the previously assigned sync
if oldsync is not None:
self.write(f'syn {oldsync} 0 0 0')
return
# Make sure to clear hardware an _syncoutpus appropriately
if chan in self._syncoutputs:
# Changing SYNC port for a channel
oldsync = self.channels[chan-1].sync.cache()
if sync != oldsync:
self.write(f'syn {oldsync} 0 0 0')
elif sync in self._syncoutputs.values():
# Assigning an already used SYNC port to a different channel
oldchan = [ch for ch, sy in self._syncoutputs.items()
if sy == sync]
self._syncoutputs.pop(oldchan[0], None)
self.write(f'syn {sync} 0 0 0')
self._syncoutputs[chan] = sync
return
def _getsync(self, chan: int) -> int:
"""
get_cmd of the chXX_sync parameter
"""
return self._syncoutputs.get(chan, 0)
[docs]
def print_syncs(self) -> None:
"""
Print assigned SYNC ports, sorted by channel number
"""
for chan, sync in sorted(self._syncoutputs.items()):
print(f'Channel {chan}, SYNC: {sync} (V/s)')
def _setslope(self, chan: int, slope: Union[float, str]) -> None:
"""
set_cmd for the chXX_slope parameter, the maximum slope of a channel.
With a finite slope the channel will be ramped using a generator.
Args:
chan: The channel number (1-24 or 1-48)
slope: The slope in V/s.
Write 'Inf' to release the channelas slope channel and to release
the associated function generator. The output rise time will now
only depend on the analog electronics.
"""
if chan not in range(1, self.num_chans+1):
raise ValueError(
f'Channel number must be 1-{self.num_chans}.')
if slope == 'Inf':
# Set the channel in DC mode
v_set = self.channels[chan-1].v.get()
self.write('set {ch} {voltage:.6f};wav {ch} 0 0 0'
.format(ch=chan, voltage=v_set))
# Now release the function generator and fg trigger (if possible)
try:
fg = self._assigned_fgs[chan]
self._assigned_fgs[chan].t_end = 0
self._assigned_triggers.pop(fg.fg)
except KeyError:
pass
# Remove a sync output, if one was assigned
if chan in self._syncoutputs:
self.channels[chan-1].sync.set(0)
# Now clear the assigned slope
self._slopes.pop(chan, None)
else:
self._slopes[chan] = slope
def _getslope(self, chan: int) -> Union[str, float]:
"""
get_cmd of the chXX_slope parameter
"""
return self._slopes.get(chan, 'Inf')
[docs]
def print_slopes(self) -> None:
"""
Print the finite slopes assigned to channels, sorted by channel number
"""
for chan, slope in sorted(self._slopes.items()):
print(f'Channel {chan}, slope: {slope} (V/s)')
def _get_minmax_outputvoltage(self, channel: int, vrange_int: int) -> Dict[str, float]:
"""
Returns a dictionary of the calibrated Min and Max output
voltages of 'channel' for the voltage given range (0,1) given by
'vrange_int'
"""
# For firmware 1.07 verbose mode and nn verbose mode give verbose
# result, So this is designed for verbose mode
if channel not in range(1, self.num_chans+1):
raise ValueError(
f'Channel number must be 1-{self.num_chans}.')
if vrange_int not in range(0, 2):
raise ValueError('Range must be 0 or 1.')
self.write(f'rang {channel} {vrange_int}')
fw_str = self._write_response
return {'Min': float(fw_str.split('MIN:')[1].split('MAX')[0].strip()),
'Max': float(fw_str.split('MAX:')[1].strip())}
def _update_voltage_ranges(self) -> None:
# Get all calibrated min/max output values, requires verbose on
# in firmware version 1.07
self.write('ver 1')
self.vranges = {}
for chan in self._chan_range:
self.vranges.update(
{chan: {0: self._get_minmax_outputvoltage(chan, 0),
1: self._get_minmax_outputvoltage(chan, 1)}})
self.write('ver 0')
[docs]
def write(self, cmd: str) -> None:
"""
QDac always returns something even from set commands, even when
verbose mode is off, so we'll override write to take this out
if you want to use this response, we put it in self._write_response
(but only for the very last write call)
In this method we expect to read one termination char per command. As
commands are concatenated by `;` we count the number of concatenated
commands as count(';') + 1 e.g. 'wav 1 1 1 0;fun 2 1 100 1 1' is two
commands. Note that only the response of the last command will be
available in `_write_response`
"""
LOG.debug(f"Writing to instrument {self.name}: {cmd}")
self.visa_handle.write(cmd)
for _ in range(cmd.count(';')+1):
self._write_response = self.visa_handle.read()
if self._write_response.startswith('Error: '):
LOG.warning(self._write_response)
[docs]
def read(self) -> str:
return self.visa_handle.read()
def _wait_and_clear(self, delay: float = 0.5) -> None:
time.sleep(delay)
self.visa_handle.clear()
[docs]
def clear_read_queue(self) -> Sequence[str]:
"""
Flush the VISA message queue of the instrument
Waits 1 ms between each read.
Returns:
Sequence[str]: Messages lingering in queue
"""
lingering = list()
with self.timeout.set_to(0.001):
while True:
try:
message = self.visa_handle.read()
except pyvisa.VisaIOError:
break
else:
lingering.append(message)
return lingering
[docs]
def connect_message(self,
idn_param: str = 'IDN',
begin_time: Optional[float] = None) -> None:
"""
Override of the standard Instrument class connect_message.
Usually, the response to `*IDN?` is printed. Here, the
software version is printed.
"""
self.visa_handle.write('version')
LOG.info('Connected to QDAC on {}, {}'.format(
self._address, self.visa_handle.read()))
def _get_firmware_version(self) -> float:
"""
Check if the "version" command reponds. If so we probably have a QDevil
QDAC, and the version number is returned. Otherwise 0.0 is returned.
"""
self.write('version')
fw_str = self._write_response
if ((not ("Unrecognized command" in fw_str))
and ("Software Version: " in fw_str)):
fw_version = float(
self._write_response.replace("Software Version: ", ""))
else:
fw_version = 0.0
return fw_version
def _get_number_of_channels(self) -> int:
"""
Returns the number of channels for the instrument
"""
self.write('boardNum')
fw_str = self._write_response
return 8*int(fw_str.strip("numberOfBoards:"))
[docs]
def print_overview(self, update_currents: bool = False) -> None:
"""
Pretty-prints the status of the QDac
"""
self._update_cache(update_currents=update_currents)
for ii in range(self.num_chans):
line = f"Channel {ii+1} \n"
line += " Voltage: {} ({}).\n".format(
self.channels[ii].v.cache(), self.channels[ii].v.unit
)
line += " Current: {} ({}).\n".format(
self.channels[ii].i.cache.get(get_if_invalid=False),
self.channels[ii].i.unit,
)
line += f" Mode: {self.channels[ii].mode.cache().get_label()}.\n"
line += " Slope: {} ({}).\n".format(
self.channels[ii].slope.cache(), self.channels[ii].slope.unit
)
if self.channels[ii].sync.cache() > 0:
line += ' Sync Out: {}, Delay: {} ({}), '\
'Duration: {} ({}).\n'.format(
self.channels[ii].sync.cache(),
self.channels[ii].sync_delay.cache(),
self.channels[ii].sync_delay.unit,
self.channels[ii].sync_duration.cache(),
self.channels[ii].sync_duration.unit,
)
print(line)
def _get_functiongenerator(self, chan: int) -> int:
"""
Function for getting a free generator (of 8 available) for a channel.
Used as helper function for ramp_voltages, but may also be used if the
user wants to use a function generator for something else.
If there are no free generators this function will wait for up to
fgs_timeout for one to be ready.
To mark a function generator as available for others set
self._assigned_fgs[chan].t_end = 0
Args:
chan: (1..24/48) the channel for which a function generator is
requested.
"""
fgs_timeout = 2 # Max time to wait for next available generator
if len(self._assigned_fgs) < 8:
fg = min(self._fgs.difference(
{g.fg for g in self._assigned_fgs.values()}))
self._assigned_fgs[chan] = Generator(fg)
else:
# If no available fgs, see if one is soon to be ready
# Nte, this does not handle if teh user has assigned the
# same fg to multiple channels cheating the driver
time_now = time.time()
available_fgs_chans = []
fgs_t_end_ok = [g.t_end for chan, g
in self._assigned_fgs.items()
if g.t_end < time_now+fgs_timeout]
if len(fgs_t_end_ok) > 0:
first_ready_t = min(fgs_t_end_ok)
available_fgs_chans = [chan for chan, g
in self._assigned_fgs.items()
if g.t_end == first_ready_t]
if first_ready_t > time_now:
LOG.warning('''
Trying to ramp more channels than there are generators.\n
Waiting for ramp generator to be released''')
time.sleep(first_ready_t - time_now)
if len(available_fgs_chans) > 0:
oldchan = available_fgs_chans[0]
fg = self._assigned_fgs[oldchan].fg
self._assigned_fgs.pop(oldchan)
self._assigned_fgs[chan] = Generator(fg)
# Set the old channel in DC mode
v_set = self.channels[oldchan-1].v.cache()
self.write('set {ch} {voltage:.6f};wav {ch} 0 0 0'
.format(ch=oldchan, voltage=v_set))
else:
raise RuntimeError('''
Trying to ramp more channels than there are generators
available. Please insert delays allowing channels to finish
ramping before trying to ramp other channels, or reduce the
number of ramped channels. Or increase fgs_timeout.''')
return fg
[docs]
def ramp_voltages(
self,
channellist: Sequence[int],
v_startlist: Sequence[float],
v_endlist: Sequence[float],
ramptime: float) -> float:
"""
Function for smoothly ramping one channel or more channels
simultaneously (max. 8). This is a shallow interface to
ramp_voltages_2d. Function generators and triggers are
are assigned automatically.
Args:
channellist: List (int) of channels to be ramped (1 indexed)\n
v_startlist: List (int) of voltages to ramp from.
MAY BE EMPTY. But if provided, time is saved by
NOT reading the present values from the instrument.
v_endlist: List (int) of voltages to ramp to.\n
ramptime: Total ramp time in seconds (min. 0.002). Has
to be an integer number of 0.001 secs).\n
Returns:
Estimated time of the excecution of the 2D scan.
NOTE: This function returns as the ramps are started. So you need
to wait for 'ramptime' until measuring....
"""
if ramptime < 0.002:
LOG.warning('Ramp time too short: {:.3f} s. Ramp time set to 2 ms.'
.format(ramptime))
ramptime = 0.002
steps = int(ramptime*1000)
return self.ramp_voltages_2d(
slow_chans=[], slow_vstart=[], slow_vend=[],
fast_chans=channellist, fast_vstart=v_startlist,
fast_vend=v_endlist, step_length=0.001,
slow_steps=1, fast_steps=steps)
[docs]
def ramp_voltages_2d(
self,
slow_chans: Sequence[int],
slow_vstart: Sequence[float],
slow_vend: Sequence[float],
fast_chans: Sequence[int],
fast_vstart: Sequence[float],
fast_vend: Sequence[float],
step_length: float,
slow_steps: int,
fast_steps: int) -> float:
"""
Function for smoothly ramping two channel groups simultaneously with
one slow (x) and one fast (y) group. used by 'ramp_voltages' where x is
empty. Function generators and triggers are assigned automatically.
Args:
slow_chans: List of channels to be ramped (1 indexed) in
the slow-group\n
slow_vstart: List of voltages to ramp from in the
slow-group.
MAY BE EMPTY. But if provided, time is saved by NOT
reading the present values from the instrument.\n
slow_vend: list of voltages to ramp to in the slow-group.
fast_chans: List of channels to be ramped (1 indexed) in
the fast-group.\n
fast_vstart: List of voltages to ramp from in the
fast-group.
MAY BE EMPTY. But if provided, time is saved by NOT
reading the present values from the instrument.\n
fast_vend: list of voltages to ramp to in the fast-group.
step_length: Time spent at each step in seconds
(min. 0.001) multiple of 1 ms.\n
slow_steps: number of steps in the slow direction.\n
fast_steps: number of steps in the fast direction.\n
Returns:
Estimated time of the excecution of the 2D scan.\n
NOTE: This function returns as the ramps are started.
"""
channellist = [*slow_chans, *fast_chans]
v_endlist = [*slow_vend, *fast_vend]
v_startlist = [*slow_vstart, *fast_vstart]
step_length_ms = int(step_length*1000)
if step_length < 0.001:
LOG.warning('step_length too short: {:.3f} s. \nstep_length set to'
.format(step_length_ms) + ' minimum (1ms).')
step_length_ms = 1
if any([ch in fast_chans for ch in slow_chans]):
raise ValueError(
'Channel cannot be in both slow_chans and fast_chans!')
no_channels = len(channellist)
if no_channels != len(v_endlist):
raise ValueError(
'Number of channels and number of voltages inconsistent!')
for chan in channellist:
if chan not in range(1, self.num_chans+1):
raise ValueError(
f'Channel number must be 1-{self.num_chans}.')
if not (chan in self._assigned_fgs):
self._get_functiongenerator(chan)
# Voltage validation
for i in range(no_channels):
self.channels[channellist[i]-1].v.validate(v_endlist[i])
if v_startlist:
for i in range(no_channels):
self.channels[channellist[i]-1].v.validate(v_startlist[i])
# Get start voltages if not provided
if not slow_vstart:
slow_vstart = [self.channels[ch-1].v.get() for ch in slow_chans]
if not fast_vstart:
fast_vstart = [self.channels[ch-1].v.get() for ch in fast_chans]
v_startlist = [*slow_vstart, *fast_vstart]
if no_channels != len(v_startlist):
raise ValueError(
'Number of start voltages do not match number of channels!')
# Find trigger not aleady uses (avoid starting other
# channels/function generators)
if no_channels == 1:
trigger = 0
else:
trigger = int(min(self._trigs.difference(
set(self._assigned_triggers.values()))))
# Make sure any sync outputs are configured
for chan in channellist:
if chan in self._syncoutputs:
sync = self._syncoutputs[chan]
sync_duration = int(
1000*self.channels[chan-1].sync_duration.get())
sync_delay = int(1000*self.channels[chan-1].sync_delay.get())
self.write('syn {} {} {} {}'.format(
sync, self._assigned_fgs[chan].fg,
sync_delay, sync_duration))
# Now program the channel amplitudes and function generators
msg = ''
for i in range(no_channels):
amplitude = v_endlist[i]-v_startlist[i]
# TODO: if amplitute is too large, then split into two parts.
# if abs(amplitude) > 10: ...
ch = channellist[i]
fg = self._assigned_fgs[ch].fg
if trigger > 0: # Trigger 0 is not a trigger
self._assigned_triggers[fg] = trigger
msg += f"wav {ch} {fg} {amplitude} {v_startlist[i]}"
# using staircase = function 4
nsteps = slow_steps if ch in slow_chans else fast_steps
repetitions = slow_steps if ch in fast_chans else 1
delay = step_length_ms \
if ch in fast_chans else fast_steps*step_length_ms
msg += ';fun {} {} {} {} {} {};'.format(
fg, Waveform.staircase, delay, int(nsteps),
repetitions, trigger)
# Update latest values to ramp end values
# (actually not necessary when called from _set_voltage)
self.channels[ch-1].v.cache.set(v_endlist[i])
self.write(msg[:-1]) # last semicolon is stripped
# Fire trigger to start generators simultaneously, saving communication
# time by not using triggers for single channel ramping
if trigger > 0:
self.write(f'trig {trigger}')
# Update fgs dict so that we know when the ramp is supposed to end
time_ramp = slow_steps * fast_steps * step_length_ms / 1000
time_end = time_ramp + time.time()
for chan in channellist:
self._assigned_fgs[chan].t_end = time_end
return time_ramp