from __future__ import annotations
import logging
import re
import time
import warnings
from collections import namedtuple
from typing import Any
import numpy as np
from packaging import version
from qcodes import validators as vals
from qcodes.instrument import ChannelList, InstrumentChannel, VisaInstrument
from qcodes.parameters import ArrayParameter, ParamRawDataType
log = logging.getLogger(__name__)
class RigolDS4000TraceNotReady(Exception):
pass
class ScopeArray(ArrayParameter):
def __init__(
self,
name: str,
instrument: RigolDS4000Channel,
channel: int,
raw: bool = False,
):
super().__init__(
name=name,
shape=(1400,),
label="Voltage",
unit="V",
setpoint_names=("Time",),
setpoint_labels=("Time",),
setpoint_units=("s",),
docstring="holds an array from scope",
instrument=instrument,
)
self.channel = channel
self.raw = raw
self.max_read_step = 50
self.trace_ready = False
def prepare_curvedata(self) -> None:
"""
Prepare the scope for returning curve data
"""
assert isinstance(self.instrument, RigolDS4000Channel)
if self.raw:
self.instrument.write(":STOP") # Stop acquisition
self.instrument.write(":WAVeform:MODE RAW") # Set RAW mode
else:
self.instrument.write(":WAVeform:MODE NORM") # Set normal mode
self.get_preamble()
p = self.preamble
# Generate time axis data
xdata = np.linspace(p.xorigin, p.xorigin + p.xincrement * p.points, p.points)
self.setpoints = (tuple(xdata),)
self.shape = (p.points,)
self.trace_ready = True
def get_raw(self) -> ParamRawDataType:
assert isinstance(self.instrument, RigolDS4000Channel)
assert isinstance(self.root_instrument, RigolDS4000)
if not self.trace_ready:
raise RigolDS4000TraceNotReady(
"Please run prepare_curvedata to prepare "
"the scope for giving a trace."
)
else:
self.trace_ready = False
# Set the data type for waveforms to "BYTE"
self.instrument.write(":WAVeform:FORMat BYTE")
# Set read channel
self.instrument.write(f":WAVeform:SOURce CHAN{self.channel}")
data_bin = bytearray()
if self.raw:
log.info("Readout of raw waveform started, %g points", self.shape[0])
# Ask for the right number of points
self.instrument.write(f":WAVeform:POINts {self.shape[0]}")
# Resets the waveform data reading
self.instrument.write(":WAVeform:RESet")
# Starts the waveform data reading
self.instrument.write(":WAVeform:BEGin")
for i in range(self.max_read_step):
status = self.instrument.ask(":WAVeform:STATus?").split(",")[0]
# Ask and retrieve waveform data
# It uses .read_raw() to get a byte
# string since our data is binary
self.instrument.write(":WAVeform:DATA?")
data_chunk = self.root_instrument.visa_handle.read_raw()
data_chuck = self._validate_strip_block(data_chunk)
data_bin.extend(data_chuck)
if status == "IDLE":
self.instrument.write(":WAVeform:END")
break
else:
# Wait some time to have the buffer re-filled
time.sleep(0.3)
log.info(
"chucks read: %d, last chuck points: " "%g, total read size: %g",
i,
len(data_chuck),
len(data_bin),
)
else:
raise ValueError("Communication error")
else:
# Ask and retrieve waveform data
# It uses .read_raw() to get a byte string since our data is binary
log.info("Readout of display waveform started, %d points", self.shape[0])
self.instrument.write(":WAVeform:DATA?") # Query data
data_chunk = self.root_instrument.visa_handle.read_raw()
data_bin.extend(self._validate_strip_block(data_chunk))
log.info("Readout ended, total read size: %g", len(data_bin))
log.info("Data conversion")
# Convert data to byte array
data_raw = np.frombuffer(data_bin, dtype=np.uint8).astype(float)
# Convert byte array to real data
p = self.preamble
data = (data_raw - p.yreference - p.yorigin) * p.yincrement
log.info("Data conversion done")
return data
@staticmethod
def _validate_strip_block(block: bytes) -> bytes:
"""
Given a block of raw data from the instrument, validate and
then strip the header with
size information. Raise ValueError if the sizes don't match.
Args:
block: The data block
Returns:
The stripped data
"""
# Validate header
header = block[:11].decode("ascii")
match = re.match(r"#9(\d{9})", header)
if match:
size = int(match[1])
block_nh = block[11:] # Strip header
block_nh = block_nh.strip() # Strip \n
if size == len(block_nh):
return block_nh
raise ValueError("Malformed data")
def get_preamble(self) -> None:
assert isinstance(self.instrument, RigolDS4000Channel)
preamble_nt = namedtuple(
"preamble_nt",
[
"format",
"mode",
"points",
"count",
"xincrement",
"xorigin",
"xreference",
"yincrement",
"yorigin",
"yreference",
],
)
def conv(x: str) -> float:
return int(x) if x.isdigit() else float(x)
preamble_raw = self.instrument.ask(":WAVeform:PREamble?")
preamble_num = [conv(x) for x in preamble_raw.strip().split(",")]
self.preamble = preamble_nt(*preamble_num)
[docs]class RigolDS4000Channel(InstrumentChannel):
def __init__(self, parent: RigolDS4000, name: str, channel: int):
super().__init__(parent, name)
self.add_parameter(
"amplitude", get_cmd=f":MEASure:VAMP? chan{channel}", get_parser=float
)
self.add_parameter(
"vertical_scale",
get_cmd=f":CHANnel{channel}:SCALe?",
set_cmd=":CHANnel{}:SCALe {}".format(channel, "{}"),
get_parser=float,
)
# Return the waveform displayed on the screen
self.add_parameter(
"curvedata", channel=channel, parameter_class=ScopeArray, raw=False
)
# Return the waveform in the internal memory
self.add_parameter(
"curvedata_raw", channel=channel, parameter_class=ScopeArray, raw=True
)
[docs]class RigolDS4000(VisaInstrument):
"""
This is the QCoDeS driver for the Rigol DS4000 series oscilloscopes.
"""
def __init__(self, name: str, address: str, timeout: float = 20, **kwargs: Any):
"""
Initialises the DS4000.
Args:
name: Name of the instrument used by QCoDeS
address: Instrument address as used by VISA
timeout: visa timeout, in secs. long default (180)
to accommodate large waveforms
"""
# Init VisaInstrument. device_clear MUST NOT be issued, otherwise communications hangs
# due a bug in firmware
super().__init__(name, address, device_clear=False, timeout=timeout, **kwargs)
self.connect_message()
self._check_firmware_version()
# functions
self.add_function("run", call_cmd=":RUN", docstring="Start acquisition")
self.add_function("stop", call_cmd=":STOP", docstring="Stop acquisition")
self.add_function(
"single", call_cmd=":SINGle", docstring="Single trace acquisition"
)
self.add_function(
"force_trigger", call_cmd="TFORce", docstring="Force trigger event"
)
self.add_function(
"auto_scale", call_cmd=":AUToscale", docstring="Perform autoscale"
)
# general parameters
self.add_parameter(
"trigger_type",
label="Type of the trigger",
get_cmd=":TRIGger:MODE?",
set_cmd=":TRIGger:MODE {}",
vals=vals.Enum(
"EDGE",
"PULS",
"RUNT",
"NEDG",
"SLOP",
"VID",
"PATT",
"RS232",
"IIC",
"SPI",
"CAN",
"FLEX",
"USB",
),
)
self.add_parameter(
"trigger_mode",
label="Mode of the trigger",
get_cmd=":TRIGger:SWEep?",
set_cmd=":TRIGger:SWEep {}",
vals=vals.Enum("AUTO", "NORM", "SING"),
)
self.add_parameter(
"time_base",
label="Horizontal time base",
get_cmd=":TIMebase:MAIN:SCALe?",
set_cmd=":TIMebase:MAIN:SCALe {}",
get_parser=float,
unit="s/div",
)
self.add_parameter(
"sample_point_count",
label="Number of the waveform points",
get_cmd=":WAVeform:POINts?",
set_cmd=":WAVeform:POINts {}",
get_parser=int,
vals=vals.Ints(min_value=1),
)
self.add_parameter(
"enable_auto_scale",
label="Enable or disable autoscale",
get_cmd=":SYSTem:AUToscale?",
set_cmd=":SYSTem:AUToscale {}",
get_parser=bool,
vals=vals.Bool(),
)
channels = ChannelList(self, "Channels", RigolDS4000Channel, snapshotable=False)
for channel_number in range(1, 5):
channel = RigolDS4000Channel(self, f"ch{channel_number}", channel_number)
channels.append(channel)
self.add_submodule("channels", channels.to_channel_tuple())
def _check_firmware_version(self) -> None:
# Require version 00.02.03
idn = self.get_idn()
verstr = idn["firmware"]
if verstr is None:
raise RuntimeError("Could not determine firmware version of DS4000.")
ver = version.parse(verstr)
if ver < version.parse("00.02.03"):
warnings.warn(
"Firmware version should be at least 00.02.03,"
"data transfer may not work correctly"
)