Source code for qcodes_contrib_drivers.drivers.Rigol.Rigol_DSA800

"""Driver for Rigol DSA800 series spectrum analyzers.

Programming reference:
https://m.testlink.co.kr/download/rigol/DSA800_ProgrammingGuide_EN.pdf

I have tested this driver on a DSA815-TG, but it should be compatible with other DSA800 models as well.

See `docs/examples/` for example notebooks.

Written by Edward Laird (http://wp.lancs.ac.uk/laird-group/).
"""

from __future__ import annotations

import logging
import re
from typing import Any, cast

import numpy as np
import numpy.typing as npt
from qcodes.instrument import InstrumentChannel, VisaInstrument
from qcodes.parameters import (
    ParameterWithSetpoints,
    create_on_off_val_mapping,
)
from qcodes.validators import Arrays, Enum, Ints, Numbers

log = logging.getLogger(__name__)


_MODEL_MAX_FREQUENCY_HZ = {
    815: 1.5e9,
    832: 3.2e9,
    875: 7.5e9,
}

_ON_OFF_MAPPING = create_on_off_val_mapping(on_val=1, off_val=0)


def _strip_ieee_block_header(payload: str) -> str:
    """Remove a definite-length IEEE 488.2 block header when present."""
    text = payload.strip()
    if not text.startswith("#") or len(text) < 2 or not text[1].isdigit():
        return text

    digits = int(text[1])
    if len(text) < 2 + digits:
        raise ValueError(f"Malformed IEEE block header: {payload!r}")

    header_end = 2 + digits
    data_length = int(text[2:header_end])
    data_end = header_end + data_length
    return text[header_end:data_end].strip()


def _parse_ascii_trace_data(payload: str) -> npt.NDArray[np.float64]:
    """Parse an ASCII trace returned by ``:TRACe:DATA?``."""
    body = _strip_ieee_block_header(payload)
    if not body:
        return np.array([], dtype=float)
    return np.fromstring(body, sep=",", dtype=float)


def _infer_max_frequency_from_model(model: str) -> float:
    """Infer the analyzer frequency limit from the model string."""
    match = re.search(r"DSA(\d{3})", model.upper())
    if match is None:
        log.warning(
            "Could not infer maximum frequency from model %r. "
            "Falling back to 7.5 GHz.",
            model,
        )
        return 7.5e9

    model_code = int(match.group(1))
    return _MODEL_MAX_FREQUENCY_HZ.get(model_code, 7.5e9)


def _power_unit_label(power_unit: str) -> str:
    """Return a display-friendly unit label for trace-like parameters."""
    return power_unit


[docs] class RigolDSA800Trace(ParameterWithSetpoints): """Trace parameter with analyzer axis setpoints.""" def __init__( self, *args: Any, trace_index: int, **kwargs: Any, ) -> None: self._trace_index = trace_index super().__init__(*args, **kwargs)
[docs] def get_raw(self) -> npt.NDArray[np.float64]: instrument = cast(RigolDSA800, self.root_instrument) self.unit = _power_unit_label(str(instrument.power_unit())) return instrument._query_trace_data(self._trace_index)
[docs] class RigolDSA800Marker(InstrumentChannel): """Marker control and readout for a single analyzer marker.""" def __init__(self, parent: "RigolDSA800", name: str, marker_index: int) -> None: super().__init__(parent, name) self._marker_index = marker_index self.enabled = self.add_parameter( "enabled", label=f"Marker {marker_index} enabled", set_cmd=f":CALCulate:MARKer{marker_index}:STATe {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:STATe?", val_mapping=_ON_OFF_MAPPING, ) self.mode = self.add_parameter( "mode", label=f"Marker {marker_index} mode", set_cmd=f":CALCulate:MARKer{marker_index}:MODE {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:MODE?", vals=Enum("POS", "DELT", "BAND", "SPAN"), ) self.trace = self.add_parameter( "trace", label=f"Marker {marker_index} trace", set_cmd=f":CALCulate:MARKer{marker_index}:TRACe {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:TRACe?", get_parser=int, vals=Ints(1, 3), ) self.trace_auto = self.add_parameter( "trace_auto", label=f"Marker {marker_index} auto trace assignment", set_cmd=f":CALCulate:MARKer{marker_index}:TRACe:AUTO {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:TRACe:AUTO?", val_mapping=_ON_OFF_MAPPING, ) self.x = self.add_parameter( "x", label=f"Marker {marker_index} x-value", set_cmd=f":CALCulate:MARKer{marker_index}:X {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:X?", get_parser=float, ) self.y = self.add_parameter( "y", label=f"Marker {marker_index} y-value", get_cmd=f":CALCulate:MARKer{marker_index}:Y?", get_parser=float, ) self.peak_excursion = self.add_parameter( "peak_excursion", label=f"Marker {marker_index} peak excursion", unit="dB", set_cmd=f":CALCulate:MARKer{marker_index}:PEAK:EXCursion {{}}", get_cmd=f":CALCulate:MARKer{marker_index}:PEAK:EXCursion?", get_parser=float, vals=Numbers(0, 200), )
[docs] def next_peak(self) -> None: """Move the marker to the next peak.""" self.write(f":CALCulate:MARKer{self._marker_index}:MAXimum:NEXT")
[docs] def peak_left(self) -> None: """Move the marker to the nearest peak on the left.""" self.write(f":CALCulate:MARKer{self._marker_index}:MAXimum:LEFT")
[docs] def peak_right(self) -> None: """Move the marker to the nearest peak on the right.""" self.write(f":CALCulate:MARKer{self._marker_index}:MAXimum:RIGHt")
[docs] class RigolDSA800TraceChannel(InstrumentChannel): """Trace configuration and readout for a single displayed trace.""" def __init__(self, parent: "RigolDSA800", name: str, trace_index: int) -> None: super().__init__(parent, name) self._trace_index = trace_index self.mode = self.add_parameter( "mode", label=f"Trace {trace_index} mode", set_cmd=f":TRACe{trace_index}:MODE {{}}", get_cmd=f":TRACe{trace_index}:MODE?", vals=Enum("WRIT", "MAXH", "MINH", "VIEW", "BLANK", "VID", "POW"), ) self.average_type = self.add_parameter( "average_type", label=f"Trace {trace_index} average type", set_cmd=f":TRACe{trace_index}:AVERage:TYPE {{}}", get_cmd=f":TRACe{trace_index}:AVERage:TYPE?", vals=Enum("VID", "RMS"), ) self.data = self.add_parameter( "data", label=f"Trace {trace_index} data", unit="dBm", parameter_class=RigolDSA800Trace, trace_index=trace_index, setpoints=(parent.trace_axis,), vals=Arrays(shape=(parent._point_count,)), )
[docs] class RigolDSA800TrackingGenerator(InstrumentChannel): """Tracking-generator control for TG-equipped DSA800 analyzers.""" def __init__(self, parent: "RigolDSA800", name: str) -> None: super().__init__(parent, name) self.enabled = self.add_parameter( "enabled", label="Tracking generator output", set_cmd=":OUTPut:STATe {}", get_cmd=":OUTPut:STATe?", val_mapping=_ON_OFF_MAPPING, ) self.amplitude_offset = self.add_parameter( "amplitude_offset", label="Tracking generator amplitude offset", unit="dB", set_cmd=":SOURce:CORRection:OFFSet {}", get_cmd=":SOURce:CORRection:OFFSet?", get_parser=float, vals=Numbers(-200, 200), ) self.fixed_power_amplitude = self.add_parameter( "fixed_power_amplitude", label="Tracking generator fixed power amplitude", unit="dBm", set_cmd=":SOURce:POWer:LEVel:IMMediate:AMPLitude {}", get_cmd=":SOURce:POWer:LEVel:IMMediate:AMPLitude?", get_parser=float, vals=Numbers(-40, 0), ) self.power_mode = self.add_parameter( "power_mode", label="Tracking generator power mode", set_cmd=":SOURce:POWer:MODE {}", get_cmd=":SOURce:POWer:MODE?", vals=Enum("FIX", "SWE"), ) self.fixed_power_span = self.add_parameter( "fixed_power_span", label="Tracking generator fixed power span", unit="dB", set_cmd=":SOURce:POWer:SPAN {}", get_cmd=":SOURce:POWer:SPAN?", get_parser=float, vals=Numbers(0, 20), ) self.sweep_start_amplitude = self.add_parameter( "sweep_start_amplitude", label="Tracking generator sweep start amplitude", unit="dBm", set_cmd=":SOURce:POWer:STARt {}", get_cmd=":SOURce:POWer:STARt?", get_parser=float, vals=Numbers(-40, 0), ) self.sweep_amplitude_range = self.add_parameter( "sweep_amplitude_range", label="Tracking generator sweep amplitude range", unit="dB", set_cmd=":SOURce:POWer:SWEep {}", get_cmd=":SOURce:POWer:SWEep?", get_parser=float, vals=Numbers(0, 20), ) self.reference_trace_enabled = self.add_parameter( "reference_trace_enabled", label="Tracking generator normalization reference trace", set_cmd=":SOURce:TRACe:REF:STATe {}", get_cmd=":SOURce:TRACe:REF:STATe?", val_mapping=_ON_OFF_MAPPING, )
[docs] def enable(self) -> None: """Enable the tracking-generator output.""" self.enabled("ON")
[docs] def disable(self) -> None: """Disable the tracking-generator output.""" self.enabled("OFF")
[docs] def store_reference_trace(self) -> None: """Store the normalization reference trace.""" self.write(":SOURce:TRACe:STORref")
[docs] class RigolDSA800(VisaInstrument): """QCoDeS driver for Rigol DSA800 series spectrum analyzers.""" def __init__(self, name: str, address: str, **kwargs: Any) -> None: kwargs.setdefault("terminator", "\n") kwargs.setdefault("device_clear", False) super().__init__(name, address, **kwargs) idn = self.get_idn() self._model = idn.get("model") or "DSA800" self.max_frequency_hz = _infer_max_frequency_from_model(self._model) self.frequency_start = self.add_parameter( "frequency_start", label="Start frequency", unit="Hz", set_cmd=":SENSe:FREQuency:STARt {}", get_cmd=":SENSe:FREQuency:STARt?", get_parser=float, vals=Numbers(0, self.max_frequency_hz), ) self.frequency_stop = self.add_parameter( "frequency_stop", label="Stop frequency", unit="Hz", set_cmd=":SENSe:FREQuency:STOP {}", get_cmd=":SENSe:FREQuency:STOP?", get_parser=float, vals=Numbers(0, self.max_frequency_hz), ) self.frequency_center = self.add_parameter( "frequency_center", label="Center frequency", unit="Hz", set_cmd=":SENSe:FREQuency:CENTer {}", get_cmd=":SENSe:FREQuency:CENTer?", get_parser=float, vals=Numbers(0, self.max_frequency_hz), ) self.frequency_span = self.add_parameter( "frequency_span", label="Frequency span", unit="Hz", set_cmd=":SENSe:FREQuency:SPAN {}", get_cmd=":SENSe:FREQuency:SPAN?", get_parser=float, vals=Numbers(0, self.max_frequency_hz), ) self.resolution_bandwidth = self.add_parameter( "resolution_bandwidth", label="Resolution bandwidth", unit="Hz", set_cmd=":SENSe:BANDwidth:RESolution {}", get_cmd=":SENSe:BANDwidth:RESolution?", get_parser=float, vals=Numbers(10, 1e6), ) self.resolution_bandwidth_auto = self.add_parameter( "resolution_bandwidth_auto", label="Auto RBW", set_cmd=":SENSe:BANDwidth:RESolution:AUTO {}", get_cmd=":SENSe:BANDwidth:RESolution:AUTO?", val_mapping=_ON_OFF_MAPPING, ) self.video_bandwidth = self.add_parameter( "video_bandwidth", label="Video bandwidth", unit="Hz", set_cmd=":SENSe:BANDwidth:VIDeo {}", get_cmd=":SENSe:BANDwidth:VIDeo?", get_parser=float, vals=Numbers(1, 3e6), ) self.video_bandwidth_auto = self.add_parameter( "video_bandwidth_auto", label="Auto VBW", set_cmd=":SENSe:BANDwidth:VIDeo:AUTO {}", get_cmd=":SENSe:BANDwidth:VIDeo:AUTO?", val_mapping=_ON_OFF_MAPPING, ) self.detector = self.add_parameter( "detector", label="Detector", set_cmd=":SENSe:DETector:FUNCtion {}", get_cmd=":SENSe:DETector:FUNCtion?", vals=Enum("NEG", "NORM", "POS", "RMS", "SAMP", "VAV", "QPEAK"), ) self.attenuation = self.add_parameter( "attenuation", label="Input attenuation", unit="dB", set_cmd=":SENSe:POWer:RF:ATTenuation {}", get_cmd=":SENSe:POWer:RF:ATTenuation?", get_parser=float, vals=Numbers(0, 30), ) self.attenuation_auto = self.add_parameter( "attenuation_auto", label="Auto attenuation", set_cmd=":SENSe:POWer:RF:ATTenuation:AUTO {}", get_cmd=":SENSe:POWer:RF:ATTenuation:AUTO?", val_mapping=_ON_OFF_MAPPING, ) self.preamplifier_enabled = self.add_parameter( "preamplifier_enabled", label="Preamplifier", set_cmd=":SENSe:POWer:RF:GAIN:STATe {}", get_cmd=":SENSe:POWer:RF:GAIN:STATe?", val_mapping=_ON_OFF_MAPPING, ) self.reference_level = self.add_parameter( "reference_level", label="Reference level", unit="dBm", set_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:RLEVel {}", get_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:RLEVel?", get_parser=float, vals=Numbers(-100, 20), ) self.reference_level_offset = self.add_parameter( "reference_level_offset", label="Reference level offset", unit="dB", set_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:RLEVel:OFFSet {}", get_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:RLEVel:OFFSet?", get_parser=float, vals=Numbers(-300, 300), ) self.scale_per_division = self.add_parameter( "scale_per_division", label="Y scale per division", unit="dB", set_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:PDIVision {}", get_cmd=":DISPlay:WINdow:TRACe:Y:SCALe:PDIVision?", get_parser=float, vals=Numbers(0.1, 20), ) self.power_unit = self.add_parameter( "power_unit", label="Y-axis unit", set_cmd=":UNIT:POWer {}", get_cmd=":UNIT:POWer?", vals=Enum("DBM", "DBMV", "DBUV", "V", "W"), ) self.sweep_points = self.add_parameter( "sweep_points", label="Sweep points", set_cmd=":SENSe:SWEep:POINts {}", get_cmd=":SENSe:SWEep:POINts?", get_parser=int, vals=Ints(101, 3001), ) self.sweep_count = self.add_parameter( "sweep_count", label="Sweeps per single acquisition", set_cmd=":SENSe:SWEep:COUNt {}", get_cmd=":SENSe:SWEep:COUNt?", get_parser=int, vals=Ints(1, 9999), ) self.sweep_time = self.add_parameter( "sweep_time", label="Sweep time", unit="s", set_cmd=":SENSe:SWEep:TIME {}", get_cmd=":SENSe:SWEep:TIME?", get_parser=float, vals=Numbers(20e-6, 7500), ) self.sweep_time_auto = self.add_parameter( "sweep_time_auto", label="Auto sweep time", set_cmd=":SENSe:SWEep:TIME:AUTO {}", get_cmd=":SENSe:SWEep:TIME:AUTO?", val_mapping=_ON_OFF_MAPPING, ) self.continuous_sweep = self.add_parameter( "continuous_sweep", label="Continuous sweep", set_cmd=":INITiate:CONTinuous {}", get_cmd=":INITiate:CONTinuous?", val_mapping=_ON_OFF_MAPPING, ) self.trace_average_count = self.add_parameter( "trace_average_count", label="Trace average count", set_cmd=":TRACe:AVERage:COUNt {}", get_cmd=":TRACe:AVERage:COUNt?", get_parser=int, vals=Ints(1, 1000), ) self.trigger_source = self.add_parameter( "trigger_source", label="Trigger source", set_cmd=":TRIGger:SEQuence:SOURce {}", get_cmd=":TRIGger:SEQuence:SOURce?", vals=Enum("IMM", "VID", "EXT"), ) self.external_trigger_slope = self.add_parameter( "external_trigger_slope", label="External trigger slope", set_cmd=":TRIGger:SEQuence:EXTernal:SLOPe {}", get_cmd=":TRIGger:SEQuence:EXTernal:SLOPe?", vals=Enum("POS", "NEG"), ) self.video_trigger_level = self.add_parameter( "video_trigger_level", label="Video trigger level", unit="dBm", set_cmd=":TRIGger:SEQuence:VIDeo:LEVel {}", get_cmd=":TRIGger:SEQuence:VIDeo:LEVel?", get_parser=float, vals=Numbers(-300, 50), ) self.trace_data_format = self.add_parameter( "trace_data_format", label="Trace data format", set_cmd=":FORMat:TRACe:DATA {}", get_cmd=":FORMat:TRACe:DATA?", vals=Enum("ASCII", "REAL,32"), ) self.binary_byte_order = self.add_parameter( "binary_byte_order", label="Binary byte order", set_cmd=":FORMat:BORDer {}", get_cmd=":FORMat:BORDer?", vals=Enum("NORM", "SWAP"), ) self.trace_axis = self.add_parameter( "trace_axis", label="Trace axis", unit="Hz", get_cmd=self._get_trace_axis, vals=Arrays(shape=(self._point_count,)), ) for trace_index in range(1, 4): trace_channel = RigolDSA800TraceChannel( self, f"trace{trace_index}", trace_index, ) self.add_submodule(f"trace{trace_index}", trace_channel) tracking_generator = RigolDSA800TrackingGenerator( self, "tracking_generator", ) self.add_submodule("tracking_generator", tracking_generator) for marker_index in range(1, 5): marker = RigolDSA800Marker(self, f"marker{marker_index}", marker_index) self.add_submodule(f"marker{marker_index}", marker) self.connect_message() def _point_count(self) -> int: """Return the current trace length for dynamic array validators.""" return int(self.sweep_points()) def _get_trace_axis(self) -> npt.NDArray[np.float64]: """Return the current setpoint axis for trace acquisition.""" span_hz = float(self.frequency_span()) points = int(self.sweep_points()) if span_hz == 0: self.trace_axis.label = "Time axis" self.trace_axis.unit = "s" sweep_time = float(self.sweep_time()) return np.linspace(0, sweep_time, num=points, dtype=float) self.trace_axis.label = "Frequency axis" self.trace_axis.unit = "Hz" start = float(self.frequency_start()) stop = float(self.frequency_stop()) return np.linspace(start, stop, num=points, dtype=float) def _query_trace_data(self, trace_index: int) -> npt.NDArray[np.float64]: """Query trace data using the instrument's currently selected transfer format.""" trace_format = str(self.trace_data_format()).upper() command = f":TRACe:DATA? TRACE{trace_index}" if trace_format == "ASCII": return _parse_ascii_trace_data(self.ask_raw(command)) is_big_endian = str(self.binary_byte_order()).upper() == "NORM" data = self.visa_handle.query_binary_values( command, datatype="f", is_big_endian=is_big_endian, container=list, ) return np.asarray(data, dtype=float)
[docs] def abort(self) -> None: """Abort the current operation.""" self.write(":ABORt")
[docs] def reset(self) -> None: """Reset the analyzer to its default state.""" self.write("*RST")
[docs] def autoscale(self) -> None: """Adjust reference level and scale for an easy-to-read display.""" self.write(":SENSe:POWer:ASCale")
[docs] def autorange(self) -> None: """Adjust amplitude-related parameters within the current span.""" self.write(":SENSe:POWer:ARANge")
[docs] def autotune(self) -> None: """Search the full frequency range and optimize the display.""" self.write(":SENSe:POWer:ATUNe")
[docs] def clear_all_markers(self) -> None: """Disable all active markers.""" self.write(":CALCulate:MARKer:AOFF")
[docs] def restart_trace_average(self) -> None: """Restart the averaging accumulation for averaged traces.""" self.write(":TRACe:AVERage:RESet")
[docs] def run_continuous(self) -> None: """Enable continuous sweeping.""" self.continuous_sweep("ON")
[docs] def hold(self) -> None: """Disable continuous sweeping.""" self.continuous_sweep("OFF")
def _recommended_sweep_timeout(self) -> float: """Estimate a safe timeout for one initiated acquisition. The estimate is based on the instrument-reported sweep time and sweep count, with extra headroom for analyzer overhead and command latency. """ sweep_duration = float(self.sweep_time()) * int(self.sweep_count()) return max(5.0, sweep_duration * 1.5 + 1.0)
[docs] def wait_for_operation_complete(self, timeout: float | None = None) -> None: """Block until the current operation is complete. Args: timeout: Optional timeout in seconds for the wait. """ visa_timeout = self.visa_handle.timeout resolved_timeout = timeout if resolved_timeout is None and visa_timeout is not None: resolved_timeout = max( self._recommended_sweep_timeout(), float(visa_timeout) / 1000, ) if resolved_timeout is not None: self.visa_handle.timeout = int(resolved_timeout * 1000) try: self.ask("*OPC?") finally: self.visa_handle.timeout = visa_timeout
[docs] def trigger_sweep(self, timeout: float | None = None) -> None: """Initiate a sweep without changing the current sweep mode. Args: timeout: Optional timeout in seconds for the sweep. """ self.write(":INITiate:IMMediate") self.wait_for_operation_complete(timeout=timeout)
[docs] def single_sweep(self, timeout: float | None = None) -> None: """Run a single sweep and wait for completion. Args: timeout: Optional timeout in seconds for the sweep. """ self.hold() self.trigger_sweep(timeout=timeout)
[docs] def acquire_trace( self, trace_index: int = 1, timeout: float | None = None, ) -> npt.NDArray[np.float64]: """Run a single sweep and return the requested trace. Args: trace_index: Trace number to read back. timeout: Optional timeout in seconds for the sweep. Returns: The acquired trace data. """ if trace_index not in (1, 2, 3): raise ValueError("trace_index must be 1, 2, or 3") self.single_sweep(timeout=timeout) return self._query_trace_data(trace_index)