import time
import logging
import numpy as np
import pyvisa # used for the parity constant
import pyvisa.constants
import traceback
import threading
import math
from qcodes import validators as vals
from qcodes.validators import Bool, Numbers
from qcodes.instrument import VisaInstrument
[docs]
class IVVI(VisaInstrument):
'''
Status: Alpha version, tested for basic get-set commands
TODO:
- Add adjustable range and rate protection per channel
- Add error handling for the specific error messages in the
protocol
- Remove/fine-tune manual sleep statements
This is the python driver for the D5 module of the IVVI-rack
see: http://qtwork.tudelft.nl/~schouten/ivvi/doc-d5/index-d5.htm
A descriptor for the data protocol can be found at
http://qtwork.tudelft.nl/~schouten/ivvi/doc-d5/rs232linkformat.txt
A copy of this file can be found at the bottom of this file.
'''
full_range = 4000.0
half_range = full_range / 2
resolution = 16
dac_quata = full_range / 2**resolution
[docs]
def __init__(self, name, address, reset=False, numdacs=16, dac_step=10,
dac_delay=.1, safe_version=True,
polarity=['BIP', 'BIP', 'BIP', 'BIP'],
use_locks=False, **kwargs):
'''
Initialzes the IVVI, and communicates with the wrapper
Args:
name (str) : name of the instrument
address (str) : ASRL address
reset (bool) : resets to default values, default=false
numdacs (int) : number of dacs, multiple of 4, default=16
polarity (List[str]) : list of polarities of each set of 4 dacs
choose from 'BIP', 'POS', 'NEG',
default=['BIP', 'BIP', 'BIP', 'BIP']
dac_step (float) : max step size for dac parameter
dac_delay (float) : delay (in seconds) for dac
safe_version (bool) : if True then do not send version commands
to the IVVI controller
use_locks (bool) : if True then locks are used in the `ask`
function of the driver. The IVVI driver is not
thread safe, this locking mechanism makes it
thread safe at the cost of making the call to ask
blocking.
'''
t0 = time.time()
super().__init__(name, address, **kwargs)
if use_locks:
self.lock = threading.Lock()
else:
self.lock = None
self.safe_version = safe_version
if numdacs % 4 == 0 and numdacs > 0:
self._numdacs = int(numdacs)
else:
raise ValueError('numdacs must be a positive multiple of 4, '
'not {}'.format(numdacs))
# values based on descriptor
self.visa_handle.baud_rate = 115200
self.visa_handle.parity = pyvisa.constants.Parity(1) # odd parity
self.visa_handle.write_termination = ''
self.visa_handle.read_termination = ''
self.add_parameter('version',
get_cmd=self._get_version)
self.add_parameter('check_setpoints',
get_cmd=None, set_cmd=None,
initial_value=False,
label='Check setpoints',
vals=Bool(),
docstring=('Whether to check if the setpoint is the'
' same as the current DAC value to '
'prevent an unnecessary set command.'))
# Time to wait before sending a set DAC command to the IVVI
self.add_parameter('dac_set_sleep',
get_cmd=None, set_cmd=None,
initial_value=0.05,
label='DAC set sleep',
unit='s',
vals=Numbers(0),
docstring=('When check_setpoints is set to True, '
'this is the waiting time between the'
'command that checks the current DAC '
'values and the final set DAC command'))
# Minimum time to wait before the read buffer contains data
self.add_parameter('dac_read_buffer_sleep',
get_cmd=None, set_cmd=None,
initial_value=0.025,
label='DAC read buffer sleep',
unit='s',
vals=Numbers(0),
docstring=('While receiving bytes from the IVVI, '
'sleeping is done in multiples of this '
'value. Change to a lower value for '
'a shorter minimum time to wait.'))
self.add_parameter('dac_voltages',
label='Dac voltages',
get_cmd=self._get_dacs)
self.add_function(
'trigger',
call_cmd=self._send_trigger
)
# initialize pol_num, the voltage offset due to the polarity
self.pol_num = np.zeros(self._numdacs)
for i in range(1, numdacs + 1):
self.add_parameter(
'dac{}'.format(i),
label='Dac {}'.format(i),
unit='mV',
get_cmd=self._gen_ch_get_func(self._get_dac, i),
set_cmd=self._gen_ch_set_func(self._set_dac, i),
vals=vals.Numbers(self.pol_num[i - 1],
self.pol_num[i - 1] + self.full_range),
step=dac_step,
inter_delay=dac_delay,
max_val_age=10)
for i in range(int(self._numdacs / 4)):
self.set_pol_dacrack(polarity[i], np.arange(1 + i * 4, 1 + (i + 1) * 4),
get_all=False)
self._update_time = 5 # seconds
self._time_last_update = 0 # ensures first call will always update
t1 = time.time()
# make sure we ignore termination characters
# See http://www.ni.com/tutorial/4256/en/#toc2 on Termination Character
# Enabled
v = self.visa_handle
v.set_visa_attribute(pyvisa.constants.VI_ATTR_TERMCHAR_EN, 0)
v.set_visa_attribute(pyvisa.constants.VI_ATTR_ASRL_END_IN, 0)
v.set_visa_attribute(pyvisa.constants.VI_ATTR_ASRL_END_OUT, 0)
v.set_visa_attribute(pyvisa.constants.VI_ATTR_SEND_END_EN, 0)
# basic test to confirm we are properly connected
try:
self.get_all()
except Exception as ex:
print('IVVI: get_all() failed, maybe connected to wrong port?')
print(traceback.format_exc())
print('Initialized IVVI-rack in %.2fs' % (t1 - t0))
[docs]
def get_idn(self):
r"""
Overwrites the get_idn function using constants as the hardware
does not have a proper \*IDN function.
"""
# not all IVVI racks support the version command, so return a dummy
return -1
idparts = ['QuTech', 'IVVI', 'None', self.version()]
return dict(zip(('vendor', 'model', 'serial', 'firmware'), idparts))
def _get_version(self):
if self.safe_version:
return -1
else:
# ask for the version of more recent modules
# some of the older modules cannot handle this command
mes = self.ask(bytes([3, 4]))
ver = mes[2]
return ver
[docs]
def get_all(self):
return self.snapshot(update=True)
[docs]
def set_dacs_zero(self):
for i in range(self._numdacs):
self.parameters['dac{}'.format(i + 1)](0)
[docs]
def linspace(self, start: float, end: float, samples: int, flexible: bool = False, bip: bool = True):
"""
Creates array of voltages, with correct alignment to the DAC
quantisation, in a similar manner to numpy.linspace.
This guarantees an even spacing and no double sampling inside
the requested range.
Args:
start: the start of the voltage range, in millivolts
end: the end of the voltage range, in millivolts
samples: number of sample voltages
flexible: occasionally get a different number of samples if
they can still fit inside the range.
bip: if the dac set to bi-polar (-2V to +2V) or
not (-4 to -0, or 0 to +4),
Returns:
list of voltages in millivolts suitable for the ivvi DAC.
Voltages are inside [start:end]
Voltages are evenly spaced
Voltages align with the DAC quantisation.
Examples:
normal usage::
linspace(-100,100,8) -> [-99.88555733577478, .. 6 more ..
, 99.64141298542764]
linspace(-1000, 1000, 2000) ->
[-976.4858472571908, .. 1998 more .., 975.6923781185626 ]
A flexable number of points::
linspace(-1000, 1000, 2000, True) ->
[-999.9237048905165, .. 2046 more .., 999.1302357518883]
4 bits is the optimal spacing, so this gives 2048 (= 2^11)
points in a 2 V range
Insufficient resolution::
linspace(500, 502, 100) -> ValueError: Insufficient resolution
for 100 samples in the range 500 to 502. Maximum :16
This prevents oversampling. Use flexable = True to adapt the number
of points.
Resolution limited sweep using the flexable option::
linspace(500, 502, 100, True) -> [500.0991836423285, .. 14 more ..
, 501.9302662699321]
A too narrow range::
linspace(0, 0.01, 100, True) # -> ValueError: No DAC values exist
in the range 0 : 0.01
Even using the flexable option can not help if there are no
valid values in the requested range.
"""
if not isinstance(samples, (int)):
raise ValueError('samples: must be an integer')
if not isinstance(start, (int, float)):
raise ValueError('start: must be a number')
if not isinstance(end, (int, float)):
raise ValueError('end: must be a number')
if samples < 2:
raise ValueError('points: needs to be 2 or more')
use_reversed = end < start
if use_reversed:
start,end = end,start
half = 0.5 if bip else 0.0 # half bit difference between bip and neg,pos
byte_start = int(math.ceil(half + start/self.dac_quata))
byte_end = int(math.floor(half + end/self.dac_quata))
delta_bytes = abs(byte_end - byte_start)-1
spacing = max(int(math.floor(delta_bytes / (samples-1))),2)
l = [(el+half)*self.dac_quata
for el in range(byte_start, byte_end,spacing)]
# Adjust the points until the length is correct
if not flexible:
if len(l) > samples:
if (len(l) - samples)%2==1:
l = l[1:]
s = int((len(l) - samples) / 2)
if s > 0:
l = l[s:-s]
if len(l) < samples:
msg = ( 'Insufficient resolution for '+ str(samples)
+ ' samples in the range '
+ str(start)+' to ' + str(end) )
msg += '. Maximum :' + str(len(l))
raise ValueError(msg)
if len(l) == 0:
msg = ('No DAC values exist in the range ' +
str(start) + ' : ' + str(end)
)
raise ValueError(msg)
if use_reversed:
l = list(reversed(l))
return l
# Conversion of data
def _mvoltage_to_bytes(self, mvoltage):
'''
Converts a mvoltage on a 0mV-4000mV scale to a 16-bit integer
equivalent
output is a list of two bytes
Input:
mvoltage (float) : a mvoltage in the 0mV-4000mV range
Output:
(dataH, dataL) (int, int) : The high and low value byte equivalent
'''
bytevalue = int(round(mvoltage / self.full_range * 65535))
return bytevalue.to_bytes(length=2, byteorder='big')
def _bytes_to_mvoltages(self, byte_mess):
'''
Converts a list of bytes to a list containing
the corresponding mvoltages
'''
values = list(range(self._numdacs))
for i in range(self._numdacs):
# takes two bytes, converts it to a 16 bit int and then divides by
# the range and adds the offset due to the polarity
values[i] = ((byte_mess[2 + 2 * i] * 256 + byte_mess[3 + 2 * i]) /
65535.0 * self.full_range) + self.pol_num[i]
return values
# Communication with device
def _get_dac(self, channel):
"""
Returns dac channel in mV
channels range from 1-numdacs
this version is a wrapper around the IVVI get function.
it only updates
"""
return self._get_dacs()[channel - 1]
def _set_dac(self, channel, mvoltage):
"""
Sets the specified dac to the specified voltage.
A check to prevent setting the same value is performed if
the check_setpoints flag was set.
Input:
mvoltage (float) : output voltage in mV
channel (int) : 1 based index of the dac
Output:
reply (string) : errormessage
Private version of function
"""
proceed = True
if self.check_setpoints():
cur_val = self.parameters['dac{}'.format(channel)]()
# dac range in mV / 16 bits FIXME make range depend on polarity
byte_res = self.full_range / 2**16
# eps is a magic number to correct for an offset in the values
# the IVVI returns (i.e. setting 0 returns byte_res/2 = 0.030518
# with rounding
eps = 0.0001
proceed = False
if (mvoltage > (cur_val + byte_res / 2 + eps) or
mvoltage < (cur_val - byte_res / 2 - eps)):
proceed = True
if self.dac_set_sleep() > 0.0:
time.sleep(self.dac_set_sleep())
# only update the value if it is different from the previous one
# this saves time in setting values, set cmd takes ~650ms
if proceed:
polarity_corrected = mvoltage - self.pol_num[channel - 1]
byte_val = self._mvoltage_to_bytes(polarity_corrected)
message = bytes([2, 1, channel]) + byte_val
reply = self.ask(message)
self._time_last_update = 0 # ensures get command will update
return reply
def _get_dacs(self):
'''
Reads from device and returns all dacvoltages in a list
Input:
None
Output:
voltages (float[]) : list containing all dacvoltages (in mV)
get dacs command takes ~450ms according to ipython timeit
'''
if (time.time() - self._time_last_update) > self._update_time:
message = bytes([self._numdacs * 2 + 2, 2])
# workaround for an error in the readout that occurs sometimes
max_tries = 10
for i in range(max_tries):
try:
reply = self.ask(message)
self._mvoltages = self._bytes_to_mvoltages(reply)
self._time_last_update = time.time()
break
except Exception as ex:
logging.warning('IVVI communication error trying again')
if i + 1 == max_tries: # +1 because range goes stops before end
raise ex
return self._mvoltages
[docs]
def write(self, message, raw=False):
'''
Protocol specifies that a write consists of
descriptor size, error_code, message
returns message_len
'''
# This is used when write is used in the ask command
expected_answer_length = None
if not raw:
expected_answer_length = message[0]
message_len = len(message) + 2
error_code = bytes([0])
message = bytes([message_len]) + error_code + message
self.visa_handle.write_raw(message)
return expected_answer_length
[docs]
def ask(self, message, raw=False):
'''
Send <message> to the device and read answer.
Raises an error if one occurred
Returns a list of bytes
'''
if self.lock:
max_tries = 10
for i in range(max_tries):
if self.lock.acquire(timeout=.05):
break
else:
logging.warning('IVVI: cannot acquire the lock')
if i + 1 == max_tries:
raise Exception('IVVI: lock is stuck')
# Protocol knows about the expected length of the answer
message_len = self.write(message, raw=raw)
reply = self.read(message_len=message_len)
if self.lock:
self.lock.release()
return reply
def _read_raw_bytes_direct(self, size):
""" Read raw data using the visa lib """
with(self.visa_handle.ignore_warning(pyvisa.constants.VI_SUCCESS_MAX_CNT)):
data, statuscode = self.visa_handle.visalib.read(
self.visa_handle.session, size)
return data
def _read_raw_bytes_multiple(self, size, maxread=512, verbose=0):
""" Read raw data in blocks using the visa lib
Arguments:
size (int) : number of bytes to read
maxread (int) : maximum size of block to read
verbose (int): verbosity level
Returns:
ret (bytes): bytes read from the device
The pyvisa visalib.read does not always terminate at a newline, this
is a workaround.
Also see: https://github.com/qdev-dk/Qcodes/issues/276
https://github.com/hgrecco/pyvisa/issues/225
Setting both VI_ATTR_TERMCHAR_EN and VI_ATTR_ASRL_END_IN to zero
should allow the driver to ignore termination characters, this
function is an additional safety mechanism.
"""
ret = []
instr = self.visa_handle
with self.visa_handle.ignore_warning(pyvisa.constants.VI_SUCCESS_MAX_CNT):
nread = 0
while nread < size:
nn = min(maxread, size - nread)
chunk, status = instr.visalib.read(instr.session, nn)
ret += [chunk]
nread += len(chunk)
if verbose:
print('_read_raw: %d/%d bytes' % (len(chunk), nread))
ret = b''.join(ret)
return ret
[docs]
def read(self, message_len=None):
# because protocol has no termination chars the read reads the number
# of bytes in the buffer
bytes_in_buffer = 0
timeout = 1
t0 = time.time()
t1 = t0
bytes_in_buffer = 0
if message_len is None:
message_len = 1 # ensures at least 1 byte in buffer
while bytes_in_buffer < message_len:
t1 = time.time()
if self.dac_read_buffer_sleep() > 0.0:
time.sleep(self.dac_read_buffer_sleep())
bytes_in_buffer = self.visa_handle.bytes_in_buffer
if t1 - t0 > timeout:
raise TimeoutError()
# a workaround for a timeout error in the pyvsia read_raw() function
mes = self._read_raw_bytes_multiple(bytes_in_buffer)
# if mes[1] != 0:
# see protocol descriptor for error codes
# raise Exception('IVVI rack exception "%s"' % mes[1])
return mes
[docs]
def set_pol_dacrack(self, flag, channels, get_all=True):
'''
Changes the polarity of the specified set of dacs
Input:
flag (str) : 'BIP', 'POS' or 'NEG'
channel (int) : 0 based index of the rack
get_all (bool): if True (default) perform a get_all
Output:
None
'''
flagmap = {'NEG': -self.full_range, 'BIP': -self.half_range, 'POS': 0}
if flag.upper() not in flagmap:
raise KeyError('Tried to set invalid dac polarity %s', flag)
val = flagmap[flag.upper()]
for ch in channels:
self.pol_num[ch - 1] = val
name = "dac" + str(ch)
self.set_parameter_bounds(name, val,
val + self.full_range)
if get_all:
self.get_all()
[docs]
def get_pol_dac(self, channel):
'''
Returns the polarity of the dac channel specified
Input:
channel (int) : 1 based index of the dac
Output:
polarity (str) : 'BIP', 'POS' or 'NEG'
'''
val = self.pol_num[channel - 1]
if (val == -self.full_range):
return 'NEG'
elif (val == -self.half_range):
return 'BIP'
elif (val == 0):
return 'POS'
else:
return 'Invalid polarity in memory'
[docs]
def set_parameter_bounds(self, name, min_value, max_value):
parameter = self.parameters[name]
if not isinstance(parameter.vals, Numbers):
raise Exception('Only the Numbers validator is supported.')
parameter.vals._min_value = min_value
parameter.vals._max_value = max_value
def _gen_ch_set_func(self, fun, ch):
def set_func(val):
return fun(ch, val)
return set_func
def _gen_ch_get_func(self, fun, ch):
def get_func():
return fun(ch)
return get_func
def _send_trigger(self):
msg = bytes([2, 6])
self.write(msg)
self.read() # Flush the buffer, else the command will only work the first time.
[docs]
def round_dac(self, value, dacname=None):
""" Round a value to the interal precision of the instrument
Args:
value (float): value to be rounded
dacname (str or int or None): name or index of dac channel
Returns:
float: rounded value
"""
if dacname is None:
dacidx = 0 # assume all dacs have the same pol_num
elif isinstance(dacname, str):
dacidx = int(dacname[3:]) - 1
else:
dacidx = dacname
value_pol_corr = value - self.pol_num[dacidx]
value_bytes = self._mvoltage_to_bytes(value_pol_corr)
value_round = (value_bytes[0] * 256 + value_bytes[1]) / \
65535.0 * self.full_range + self.pol_num[dacidx]
return value_round
[docs]
def adjust_parameter_validator(self, param):
"""Adjust the parameter validator range based on the dac resolution.
The dac's of the IVVI have a finite resolution. If the validator range
min and max values are not values the dac's can actually have, then it
can occur that a set command results in the dac's going to a value just
outside the validator range. Adjusting the validators with this
function prevents that.
Args:
param (Parameter): a dac of the IVVI instrument
"""
if not isinstance(param.vals, Numbers):
raise Exception('Only the Numbers validator is supported.')
min_val = param.vals._min_value
max_val = param.vals._max_value
min_val_upd = self.round_dac(min_val, param.name)
max_val_upd = self.round_dac(max_val, param.name)
param.vals = Numbers(min_val_upd, max_val_upd)
'''
RS232 PROTOCOL
-----------------------
BAUTRATE 115200
DATA BITS 8
PARITY ODD
STOPBITS 1
Descriptor data PC-> MC
Byte Name Description value
--------------------------------------------------------------------------------------------------------
0 Descriptor size Size of this descriptor 4 (action 2,4,6,7)
5 (action 7)
7 (action 1,3)
11 (action 5)
1 Error 0
2 Data out size Number of bytes that has to be 2 (action 0,1,3,5,6)
send by the MC after receiving 3 (action 4)
descriptor 4 (action 7)
34 (action 2)
3 Action 0= no operation
1= set Dac value
2= request DAC data
3= continues send data to DAC
4= ask for Program ion
5= set bits interface
6= generate trigger output
7= request data from specified DAC
4 Dac nr Nr of DAC to be updated 1 to 16
5 DataH High byte to DAC 0 to $ff
6 DataL Low byte to DAC 0 to $ff
7 data bit 24-31 interfaceBit24_31 0 to $ff
8 data bit 16-23 interfaceBit16_23 0 to $ff
9 data bit 08-15 interfaceBit08_15 0 to $ff
10 data bit 00-07 interfaceBit00_07 0 to $ff
--------------------------------------------------------------------------------------------------------
Descriptor data MC-> PC
--------------------------------------------------------------------------------------------------------
0 Descriptor size Size of this descriptor 1
1 Error 0x00 = no Error detected 1
0x01 =
0x02 =
0x04 = Parity
0x08 = Overrun
0x10 = Frame Error
0x20 = WatchDog reset detected (32)
0x40 = DAC does not exist(64)
0x80 = WrongAction (128)
2 Version program version 1
2 DAC1 Value of DAC1 2
4 DAC2 Value of DAC2 2
..
32 DAC16 Value of DAC16 2
--------------------------------------------------------------------------------------------------------
'''