import binascii
import logging
from functools import partial
from typing import Any, Tuple
import numpy as np
from pyvisa.errors import VisaIOError
from typing_extensions import TypedDict
from qcodes import validators as vals
from qcodes.instrument import ChannelList, InstrumentChannel, VisaInstrument
from qcodes.parameters import ArrayParameter, ParamRawDataType
log = logging.getLogger(__name__)
class TraceNotReady(Exception):
pass
class OutputDict(TypedDict):
no_of_bytes: int
no_of_bits: int
encoding: str
binary_format: str
byte_order: str
no_of_points: int
waveform_ID: str
point_format: str
x_incr: float
x_zero: float
x_unit: str
y_multiplier: float
y_zero: float
y_offset: float
y_unit: str
class ScopeArray(ArrayParameter):
def __init__(
self,
name: str,
instrument: "TektronixTPS2012Channel",
channel: int,
**kwargs: Any,
):
super().__init__(
name=name,
shape=(2500,),
label="Voltage",
unit="V ",
setpoint_names=("Time",),
setpoint_labels=("Time",),
setpoint_units=("s",),
docstring="holds an array from scope",
instrument=instrument,
**kwargs,
)
self.channel = channel
def calc_set_points(self) -> Tuple[np.ndarray, int]:
assert isinstance(self.instrument, TektronixTPS2012Channel)
message = self.instrument.ask('WFMPre?')
preamble = self._preambleparser(message)
xstart = preamble['x_zero']
xinc = preamble['x_incr']
no_of_points = preamble['no_of_points']
xdata = np.linspace(xstart, no_of_points * xinc + xstart, no_of_points)
return xdata, no_of_points
def prepare_curvedata(self) -> None:
"""
Prepare the scope for returning curve data
"""
# To calculate set points, we must have the full preamble
# For the instrument to return the full preamble, the channel
# in question must be displayed
assert isinstance(self.instrument, TektronixTPS2012Channel)
assert isinstance(self.root_instrument, TektronixTPS2012)
self.instrument.parameters['state'].set('ON')
self.root_instrument.data_source(f'CH{self.channel}')
xdata, no_of_points = self.calc_set_points()
self.setpoints = (tuple(xdata), )
self.shape = (no_of_points, )
self.root_instrument.trace_ready = True
def get_raw(self) -> ParamRawDataType:
assert isinstance(self.root_instrument, TektronixTPS2012)
if not self.root_instrument.trace_ready:
raise TraceNotReady('Please run prepare_curvedata to prepare '
'the scope for giving a trace.')
message = self._curveasker(self.channel)
_, ydata, _ = self._curveparameterparser(message)
# Due to the limitations in the current api the below solution
# to change setpoints does nothing because the setpoints have
# already been copied to the dataset when get is called.
# self.setpoints = (tuple(xdata),)
# self.shape = (npoints,)
return ydata
def _curveasker(self, ch: int) -> str:
assert isinstance(self.instrument, TektronixTPS2012Channel)
self.instrument.write(f'DATa:SOURce CH{ch}')
message = self.instrument.ask('WAVFrm?')
self.instrument.write('*WAI')
return message
@staticmethod
def _binaryparser(curve: str) -> np.ndarray:
"""
Helper function for parsing the curve data
Args:
curve: the return value of 'CURVe?' when
DATa:ENCdg is set to RPBinary.
Note: The header and final newline character
must be removed.
Returns:
The curve in units where the digitisation range
is mapped to (-32768, 32767).
"""
# TODO: Add support for data width = 1 mode?
output = np.zeros(int(len(curve)/2)) # data width 2
# output = np.zeros(int(len(curve))) # data width 1
for ii, _ in enumerate(output):
# casting FTWs
temp_1 = curve[2*ii:2*ii+1].encode('latin-1') # data width 2
temp_2 = binascii.b2a_hex(temp_1)
temp_3 = (int(temp_2, 16)-128)*256 # data width 2 (1)
output[ii] = temp_3
return output
@staticmethod
def _preambleparser(response: str) -> OutputDict:
"""
Parser function for the curve preamble
Args:
response: The response of WFMPre?
Returns:
A dictionary containing the following keys:
no_of_bytes, no_of_bits, encoding, binary_format,
byte_order, no_of_points, waveform_ID, point_format,
x_incr, x_zero, x_unit, y_multiplier, y_zero, y_offset, y_unit
"""
response_list = response.split(';')
outdict: OutputDict = {
'no_of_bytes': int(response_list[0]),
'no_of_bits': int(response_list[1]),
'encoding': response_list[2],
'binary_format': response_list[3],
'byte_order': response_list[4],
'no_of_points': int(response_list[5]),
'waveform_ID': response_list[6],
'point_format': response_list[7],
'x_incr': float(response_list[8]),
# outdict['point_offset'] = response_list[9] # Always zero
'x_zero': float(response_list[10]),
'x_unit': response_list[11],
'y_multiplier': float(response_list[12]),
'y_zero': float(response_list[13]),
'y_offset': float(response_list[14]),
'y_unit': response_list[15]
}
return outdict
def _curveparameterparser(
self,
waveform: str
) -> Tuple[np.ndarray, np.ndarray, int]:
"""
The parser for the curve parameter. Note that WAVFrm? is equivalent
to WFMPre?; CURVe?
Args:
waveform: The return value of WAVFrm?
Returns:
Two numpy arrays with the time axis in units
of s and curve values in units of V; (time, voltages) and
the number of points as an integer
"""
fulldata = waveform.split(';')
preamblestr = ';'.join(fulldata[:16])
curvestr = ';'.join(fulldata[16:])
preamble = self._preambleparser(preamblestr)
# the raw curve data starts with a header containing the char #
# followed by on digit giving the number of digits in the len of the
# array in bytes
# and the length of the array. I.e. the string #45000 is 5000 bytes
# represented by 4 digits.
total_number_of_bytes = preamble['no_of_bytes']*preamble['no_of_points']
raw_data_offset = 2 + len(str(total_number_of_bytes))
curvestr = curvestr[raw_data_offset:-1]
rawcurve = self._binaryparser(curvestr)
yoff = preamble['y_offset']
yoff -= 2**15 # data width 2
ymult = preamble['y_multiplier']
ydata = ymult*(rawcurve-yoff)
assert len(ydata) == preamble['no_of_points']
xstart = preamble['x_zero']
xinc = preamble['x_incr']
xdata = np.linspace(xstart, len(ydata)*xinc+xstart, len(ydata))
return xdata, ydata, preamble['no_of_points']
[docs]class TektronixTPS2012Channel(InstrumentChannel):
def __init__(
self, parent: "TektronixTPS2012", name: str, channel: int, **kwargs: Any
):
super().__init__(parent, name, **kwargs)
self.add_parameter('scale',
label=f'Channel {channel} Scale',
unit='V/div',
get_cmd=f'CH{channel}:SCAle?',
set_cmd='CH{}:SCAle {}'.format(channel, '{}'),
get_parser=float
)
self.add_parameter('position',
label=f'Channel {channel} Position',
unit='div',
get_cmd=f'CH{channel}:POSition?',
set_cmd='CH{}:POSition {}'.format(channel, '{}'),
get_parser=float
)
self.add_parameter('curvedata',
channel=channel,
parameter_class=ScopeArray,
)
self.add_parameter('state',
label=f'Channel {channel} display state',
set_cmd='SELect:CH{} {}'.format(channel, '{}'),
get_cmd=partial(self._get_state, channel),
val_mapping={'ON': 1, 'OFF': 0},
vals=vals.Enum('ON', 'OFF')
)
def _get_state(self, ch: int) -> int:
"""
get_cmd for the chX_state parameter
"""
# 'SELect?' returns a ';'-separated string of 0s and 1s
# denoting state display state of ch1, ch2, ?, ?, ?
# (maybe ch1, ch2, math, ref1, ref2 ..?)
selected = list(map(int, self.ask('SELect?').split(';')))
state = selected[ch - 1]
return state
TPS2012Channel = TektronixTPS2012Channel
[docs]class TektronixTPS2012(VisaInstrument):
"""
This is the QCoDeS driver for the Tektronix 2012B oscilloscope.
"""
def __init__(self, name: str, address: str,
timeout: float = 20, **kwargs: Any):
"""
Initialises the TPS2012.
Args:
name: Name of the instrument used by QCoDeS
address: Instrument address as used by VISA
timeout: visa timeout, in secs. long default (180)
to accommodate large waveforms
"""
super().__init__(name, address, timeout=timeout, **kwargs)
self.connect_message()
# Scope trace boolean
self.trace_ready = False
# functions
self.add_function('force_trigger',
call_cmd='TRIGger FORce',
docstring='Force trigger event')
self.add_function('run',
call_cmd='ACQuire:STATE RUN',
docstring='Start acquisition')
self.add_function('stop',
call_cmd='ACQuire:STATE STOP',
docstring='Stop acquisition')
# general parameters
self.add_parameter('trigger_type',
label='Type of the trigger',
get_cmd='TRIGger:MAIn:TYPe?',
set_cmd='TRIGger:MAIn:TYPe {}',
vals=vals.Enum('EDGE', 'VIDEO', 'PULSE')
)
self.add_parameter('trigger_source',
label='Source for the trigger',
get_cmd='TRIGger:MAIn:EDGE:SOURce?',
set_cmd='TRIGger:MAIn:EDGE:SOURce {}',
vals=vals.Enum('CH1', 'CH2')
)
self.add_parameter('trigger_edge_slope',
label='Slope for edge trigger',
get_cmd='TRIGger:MAIn:EDGE:SLOpe?',
set_cmd='TRIGger:MAIn:EDGE:SLOpe {}',
vals=vals.Enum('FALL', 'RISE')
)
self.add_parameter('trigger_level',
label='Trigger level',
unit='V',
get_cmd='TRIGger:MAIn:LEVel?',
set_cmd='TRIGger:MAIn:LEVel {}',
vals=vals.Numbers()
)
self.add_parameter('data_source',
label='Data source',
get_cmd='DATa:SOUrce?',
set_cmd='DATa:SOURce {}',
vals=vals.Enum('CH1', 'CH2')
)
self.add_parameter('horizontal_scale',
label='Horizontal scale',
unit='s',
get_cmd='HORizontal:SCAle?',
set_cmd=self._set_timescale,
get_parser=float,
vals=vals.Enum(5e-9, 10e-9, 25e-9, 50e-9, 100e-9,
250e-9, 500e-9, 1e-6, 2.5e-6, 5e-6,
10e-6, 25e-6, 50e-6, 100e-6, 250e-6,
500e-6, 1e-3, 2.5e-3, 5e-3, 10e-3,
25e-3, 50e-3, 100e-3, 250e-3, 500e-3,
1, 2.5, 5, 10, 25, 50))
# channel-specific parameters
channels = ChannelList(
self, "ScopeChannels", TektronixTPS2012Channel, snapshotable=False
)
for ch_num in range(1, 3):
ch_name = f"ch{ch_num}"
channel = TektronixTPS2012Channel(self, ch_name, ch_num)
channels.append(channel)
self.add_submodule(ch_name, channel)
self.add_submodule("channels", channels.to_channel_tuple())
# Necessary settings for parsing the binary curve data
self.visa_handle.encoding = 'latin-1'
log.info('Set VISA encoding to latin-1')
self.write('DATa:ENCdg RPBinary')
log.info('Set TPS2012 data encoding to RPBinary' +
' (Positive Integer Binary)')
self.write('DATa:WIDTh 2')
log.info('Set TPS2012 data width to 2')
# Note: using data width 2 has been tested to not add
# significantly to transfer times. The maximal length
# of an array in one transfer is 2500 points.
def _set_timescale(self, scale: float) -> None:
"""
set_cmd for the horizontal_scale
"""
self.trace_ready = False
self.write(f'HORizontal:SCAle {scale}')
##################################################
# METHODS FOR THE USER #
##################################################
[docs] def clear_message_queue(self, verbose: bool = False) -> None:
"""
Function to clear up (flush) the VISA message queue of the AWG
instrument. Reads all messages in the queue.
Args:
verbose: If True, the read messages are printed.
Default: False.
"""
original_timeout = self.visa_handle.timeout
self.visa_handle.timeout = 1000 # 1 second as VISA counts in ms
gotexception = False
while not gotexception:
try:
message = self.visa_handle.read()
if verbose:
print(message)
except VisaIOError:
gotexception = True
self.visa_handle.timeout = original_timeout
class TPS2012(TektronixTPS2012):
"""
Deprecated alias for ``TektronixTPS2012``
"""
pass