"""Interface for the tinySA Basic spectrum analyser.
Provides a Python driver for the tinySA Basic spectrum analyser.
Documentation:
- tinySA: https://www.tinysa.org/wiki/
- Unofficial Python API: https://github.com/LC-Linkous/tinySA_python
See `docs/examples/` for example notebooks.
Written by Edward Laird (http://wp.lancs.ac.uk/laird-group/).
"""
from __future__ import annotations
import re
import time
from typing import Any, Protocol, Sequence, cast
import numpy as np
from qcodes.instrument import Instrument
from qcodes.parameters import Parameter, ParameterWithSetpoints
from qcodes.validators import Arrays, Enum, Numbers
[docs]
class SerialHandle(Protocol):
"""Protocol for the subset of pyserial's Serial API used by this driver."""
[docs]
def close(self) -> None: ...
[docs]
def reset_output_buffer(self) -> None: ...
[docs]
def write(self, data: bytes) -> int: ...
[docs]
def flush(self) -> None: ...
[docs]
def read(self, size: int = 1) -> bytes: ...
[docs]
class ListPortsModule(Protocol):
"""Protocol for the list_ports module used for USB autodetection."""
[docs]
def comports(self) -> Sequence[Any]: ...
serial: Any | None = None
list_ports: ListPortsModule | None = None
def _ensure_pyserial() -> None:
"""Import pyserial lazily so the module can load without it installed."""
global serial, list_ports
if serial is not None and list_ports is not None:
return
try:
import serial as _serial
from serial.tools import list_ports as _list_ports
except ImportError as exc:
raise ImportError(
"TinySABasic requires the optional dependency 'pyserial'. "
"Install it in the active environment to use this driver."
) from exc
serial = _serial
list_ports = _list_ports
VID = 0x0483 # Vendor ID
PID = 0x5740 # Product ID
[docs]
class TinySASerialBackend:
"""
Minimal serial backend for tinySA.
The command semantics follow the documented tinySA_python API:
- `mode {low|high} {input|output}`
- `scan {start} {stop} [npts] [outmask]`
- `output on|off`
"""
PROMPT = b"ch>"
def __init__(
self,
port: str | None = None,
*,
timeout: float = 5.0,
vid: int = VID,
pid: int = PID,
) -> None:
self._port = port or self.autodetect_port(vid=vid, pid=pid)
self._timeout = timeout
self._serial: SerialHandle | None = None
@property
def port(self) -> str:
return self._port
[docs]
@staticmethod
def autodetect_port(*, vid: int = VID, pid: int = PID) -> str:
_ensure_pyserial()
assert list_ports is not None
for device in list_ports.comports():
if device.vid == vid and device.pid == pid:
return device.device
raise OSError("tinySA device not found")
[docs]
def connect(self) -> None:
_ensure_pyserial()
if self._serial is None:
assert serial is not None
self._serial = cast(
SerialHandle,
serial.Serial(self._port, timeout=self._timeout),
)
self._serial.reset_input_buffer()
self._serial.reset_output_buffer()
[docs]
def disconnect(self) -> None:
if self._serial is not None:
self._serial.close()
self._serial = None
def _serial_handle(self) -> SerialHandle:
self.connect()
assert self._serial is not None
return self._serial
def _write_command(self, command: str) -> None:
handle = self._serial_handle()
handle.reset_input_buffer()
handle.reset_output_buffer()
handle.write((command + "\r").encode("ascii"))
handle.flush()
def _read_until_prompt(self) -> bytes:
handle = self._serial_handle()
buffer = bytearray()
deadline = time.monotonic() + self._timeout
while True:
chunk = handle.read(1)
if chunk:
buffer.extend(chunk)
if buffer.endswith(self.PROMPT):
return bytes(buffer)
deadline = time.monotonic() + self._timeout
continue
if time.monotonic() >= deadline:
raise TimeoutError("Timed out waiting for tinySA prompt")
@staticmethod
def _strip_prompt(payload: bytes) -> bytes:
cleaned = payload.replace(b"\r", b"")
if cleaned.endswith(TinySASerialBackend.PROMPT):
cleaned = cleaned[: -len(TinySASerialBackend.PROMPT)]
return cleaned.strip(b"\n")
[docs]
def command_bytes(self, command: str) -> bytes:
self._write_command(command)
cleaned = self._strip_prompt(self._read_until_prompt())
lines = cleaned.splitlines()
if lines and lines[0].strip() == command.encode("ascii"):
cleaned = b"\n".join(lines[1:])
return cleaned.strip(b"\n")
[docs]
def command_text(self, command: str) -> str:
return (
self.command_bytes(command)
.decode(
"utf-8",
errors="replace",
)
.strip()
)
[docs]
def version(self) -> str:
return self.command_text("version")
[docs]
def set_mode(self, rf_path: str, io_mode: str) -> None:
self.command_bytes(f"mode {rf_path} {io_mode}")
[docs]
def set_output(self, enabled: bool) -> None:
self.command_bytes(f"output {'on' if enabled else 'off'}")
[docs]
def set_level(self, value: float) -> None:
self.command_bytes(f"level {value:g}")
[docs]
def set_frequency(self, value: float) -> None:
self.command_bytes(f"freq {int(round(value))}")
[docs]
def set_rbw(self, value: str | int) -> None:
"""
Set RBW using the driver-facing unit convention.
The QCoDeS layer uses Hz, but the tinySA command expects kHz, so
numeric values are converted here before being sent to the instrument.
"""
if isinstance(value, str):
text = value.strip().lower()
if text != "auto":
raise ValueError("rbw string value must be 'auto'")
self.command_bytes("rbw auto")
return
self.command_bytes(f"rbw {int(round(value / 1000))}")
[docs]
def get_rbw(self) -> str | int:
"""
Read RBW from the instrument and return it in Hz.
The tinySA reports numeric RBW values in kHz. This method normalises
the response to the driver-facing Hz convention.
"""
response = self.command_text("rbw").lower()
marked_match = re.search(r"\[\s*(auto|[-+]?\d*\.?\d+)\s*\]", response)
if marked_match is not None:
token = marked_match.group(1)
else:
tokens = re.findall(r"auto|[-+]?\d*\.?\d+", response)
if len(tokens) != 1:
raise ValueError(f"Could not parse rbw response: {response!r}")
token = tokens[0]
if token == "auto":
return "auto"
return int(round(float(token) * 1000))
[docs]
def set_sweep_start(self, value: float) -> None:
self.command_bytes(f"sweep start {int(round(value))}")
[docs]
def set_sweep_stop(self, value: float) -> None:
self.command_bytes(f"sweep stop {int(round(value))}")
[docs]
def pause(self) -> None:
self.command_bytes("pause")
[docs]
def resume(self) -> None:
self.command_bytes("resume")
@staticmethod
def _coerce_trace_length(
values: Sequence[float],
expected_npts: int,
) -> np.ndarray:
"""Coerce a trace to the expected number of points by padding or truncating."""
array = np.asarray(values, dtype=float)
if array.size == expected_npts:
return array
if array.size > expected_npts:
return array[:expected_npts]
if array.size == 0:
return np.full(expected_npts, np.nan, dtype=float)
padding = np.full(expected_npts - array.size, array[-1], dtype=float)
return np.concatenate((array, padding))
@staticmethod
def _parse_scan_column(payload: bytes, expected_npts: int) -> np.ndarray:
# tinySA occasionally returns a malformed token `-:.0`; tinySA_python
# replaces it with `-10.0` before parsing.
fixed = payload.replace(b"-:.0", b"-10.0")
return TinySASerialBackend._parse_numeric_column(
fixed,
expected_npts,
)
@staticmethod
def _parse_numeric_column(
payload: bytes,
expected_npts: int,
) -> np.ndarray:
values: list[float] = []
for raw_line in payload.replace(b"\r", b"").splitlines():
line = raw_line.strip()
if not line:
continue
for token in line.split():
try:
values.append(float(token))
break
except ValueError:
continue
return TinySASerialBackend._coerce_trace_length(
values,
expected_npts,
)
[docs]
def scan(
self,
start: float,
stop: float,
npts: int,
*,
outmask: int = 2,
) -> np.ndarray:
"""Run a sweep and return the first numeric column of the response."""
command = (
f"scan {int(round(start))} {int(round(stop))} {int(npts)} {int(outmask)}"
)
payload = self.command_bytes(command)
try:
return self._parse_scan_column(payload, expected_npts=npts)
finally:
self.resume()
[docs]
class TinySABasic(Instrument):
"""
QCoDeS driver for the tinySA Basic spectrum analyser.
This driver communicates with the instrument over its serial command
interface via `pyserial`. It provides QCoDeS parameters for sweep
configuration, output configuration, and trace acquisition.
Reading `measurement_trace` triggers a fresh sweep and returns the trace
data with `frequency` as the corresponding setpoints.
"""
ALLOWED_SWEEP_NPTS = (51, 101, 145, 290)
ALLOWED_RBW_HZ = ("auto", 3000, 10000, 30000, 100000, 300000, 600000)
MODE_VALUES = {
"low_input": ("low", "input"),
"high_input": ("high", "input"),
"low_output": ("low", "output"),
"high_output": ("high", "output"),
}
def __init__(
self,
name: str,
port: str | None = None,
*,
timeout: float = 5.0,
start: float = 1e6,
stop: float = 300e6,
npts: int = 290,
**kwargs,
) -> None:
# Import the optional serial dependency only when the driver is used.
_ensure_pyserial()
self._backend = TinySASerialBackend(port=port, timeout=timeout)
self._backend.connect()
self._start_hz = float(start)
self._stop_hz = float(stop)
self._npts = int(npts)
self._mode_state = "unknown"
self._rf_output_state = "unknown"
self._rbw_state: str | int = "auto"
self._level_dbm = 0.0
self._output_frequency_hz = 1e6
self._frequency_cache: np.ndarray | None = None
self._measurement_trace_cache: np.ndarray | None = None
super().__init__(name, **kwargs)
self.mode: Parameter = self.add_parameter(
"mode",
label="tinySA Mode",
get_cmd=self._get_mode,
set_cmd=self._set_mode,
vals=Enum(*self.MODE_VALUES),
)
"""Which port to use and whether it's an input or output mode."""
self.rf_output: Parameter = self.add_parameter(
"rf_output",
label="RF Output",
get_cmd=self._get_rf_output,
set_cmd=self._set_rf_output,
vals=Enum("on", "off", "unknown"),
)
""" Whether the RF output is enabled. Only meaningful in output modes, but can be read in any mode. """
self.rbw: Parameter = self.add_parameter(
"rbw",
label="Resolution Bandwidth",
unit="Hz",
get_cmd=self._get_rbw,
set_cmd=self._set_rbw,
vals=Enum(*self.ALLOWED_RBW_HZ),
)
"""Resolution Bandwidth."""
self.level: Parameter = self.add_parameter(
"level",
label="Output Level",
unit="dBm",
get_cmd=self._get_level,
set_cmd=self._set_level,
vals=Numbers(),
)
"""RF output level."""
self.output_frequency: Parameter = self.add_parameter(
"output_frequency",
label="Output Frequency",
unit="Hz",
get_cmd=self._get_output_frequency,
set_cmd=self._set_output_frequency,
vals=Numbers(min_value=0),
)
"""RF output frequency."""
self.start: Parameter = self.add_parameter(
"start",
label="Start Frequency",
unit="Hz",
get_cmd=self._get_start,
set_cmd=self._set_start,
vals=Numbers(min_value=0),
)
"""Start frequency for the sweep."""
self.stop: Parameter = self.add_parameter(
"stop",
label="Stop Frequency",
unit="Hz",
get_cmd=self._get_stop,
set_cmd=self._set_stop,
vals=Numbers(min_value=0),
)
"""Stop frequency for the sweep."""
self.npts: Parameter = self.add_parameter(
"npts",
label="Sweep Points",
get_cmd=self._get_npts,
set_cmd=self._set_npts,
vals=Enum(*self.ALLOWED_SWEEP_NPTS),
)
"""Number of points for the sweep."""
self.frequency: Parameter = self.add_parameter(
"frequency",
label="Frequency",
unit="Hz",
get_cmd=self._get_frequency_axis,
set_cmd=False,
vals=Arrays(shape=(self._point_count,)),
)
"""Parameter frequency, which serves as the setpoints for measurement_trace."""
self.measurement_trace: ParameterWithSetpoints = self.add_parameter(
"measurement_trace",
label="Measurement Trace",
unit="dBm",
get_cmd=self._get_measurement_trace,
set_cmd=False,
parameter_class=ParameterWithSetpoints,
setpoints=(self.frequency,),
vals=Arrays(shape=(self._point_count,)),
)
"""The measurement trace acquired from the instrument. Reading this parameter triggers a new sweep, and the frequency parameter is updated as the corresponding setpoints."""
self.connect_message()
def _point_count(self) -> int:
return int(self._npts)
def _invalidate_trace_cache(self) -> None:
self._frequency_cache = None
self._measurement_trace_cache = None
@staticmethod
def _normalise_mode(value: str) -> str:
text = value.strip().lower().replace("-", "_").replace(" ", "_")
if text not in TinySABasic.MODE_VALUES:
valid = ", ".join(sorted(TinySABasic.MODE_VALUES))
raise ValueError(
f"Unsupported tinySA mode {value!r}. Valid modes: {valid}",
)
return text
def _mode_is_output(self) -> bool:
return self._mode_state.endswith("_output")
def _require_input_mode(self, operation: str) -> None:
if self._mode_is_output():
raise RuntimeError(
f"{operation} requires an input mode. "
"Set sa.mode('low_input') or sa.mode('high_input') first."
)
def _set_mode(self, value: str) -> None:
mode = self._normalise_mode(value)
rf_path, io_mode = self.MODE_VALUES[mode]
self._backend.set_mode(rf_path, io_mode)
self._mode_state = mode
self._invalidate_trace_cache()
def _get_mode(self) -> str:
return self._mode_state
def _set_rf_output(self, value: str) -> None:
state = value.strip().lower()
if state not in {"on", "off"}:
raise ValueError("rf_output must be 'on' or 'off'")
self._backend.set_output(state == "on")
self._rf_output_state = state
def _get_rf_output(self) -> str:
return self._rf_output_state
def _set_rbw(self, value: str | int) -> None:
rbw: str | int
if isinstance(value, str):
rbw = value.strip().lower()
else:
rbw = int(value)
if rbw not in self.ALLOWED_RBW_HZ:
raise ValueError(
"tinySA supports only these RBW values: "
f"{', '.join(str(v) for v in self.ALLOWED_RBW_HZ)}. "
f"Requested: {rbw}."
)
self._backend.set_rbw(rbw)
self._rbw_state = rbw
self._invalidate_trace_cache()
def _get_rbw(self) -> str | int:
try:
self._rbw_state = self._backend.get_rbw()
except ValueError:
pass
return self._rbw_state
def _set_level(self, value: float) -> None:
self._backend.set_level(float(value))
self._level_dbm = float(value)
def _get_level(self) -> float:
return self._level_dbm
def _set_output_frequency(self, value: float) -> None:
self._backend.set_frequency(float(value))
self._output_frequency_hz = float(value)
def _get_output_frequency(self) -> float:
return self._output_frequency_hz
def _set_start(self, value: float) -> None:
start_hz = float(value)
self._start_hz = start_hz
self._invalidate_trace_cache()
def _get_start(self) -> float:
return self._start_hz
def _set_stop(self, value: float) -> None:
stop_hz = float(value)
self._stop_hz = stop_hz
self._invalidate_trace_cache()
def _get_stop(self) -> float:
return self._stop_hz
def _set_npts(self, value: int) -> None:
npts = int(value)
if npts not in self.ALLOWED_SWEEP_NPTS:
raise ValueError(
"tinySA supports only these sweep point counts: "
f"{', '.join(str(p) for p in self.ALLOWED_SWEEP_NPTS)}. "
f"Requested: {npts}."
)
self._npts = npts
self._invalidate_trace_cache()
def _get_npts(self) -> int:
return self._npts
def _validate_sweep_range(self) -> None:
if self._start_hz >= self._stop_hz:
raise ValueError(
"Invalid sweep range: "
f"start={self._start_hz:g} Hz, stop={self._stop_hz:g} Hz. "
"Set start lower than stop before acquiring data."
)
def _make_frequency_axis(self) -> np.ndarray:
self._validate_sweep_range()
return np.linspace(
self._start_hz,
self._stop_hz,
num=self._npts,
dtype=float,
)
def _get_frequency_axis(self) -> np.ndarray:
if self._frequency_cache is None or self._frequency_cache.size != self._npts:
self._frequency_cache = self._make_frequency_axis()
return self._frequency_cache.copy()
def _update_parameter_caches(self) -> None:
if self._frequency_cache is not None:
self.frequency.cache.set(self._frequency_cache.copy())
if self._measurement_trace_cache is not None:
self.measurement_trace.cache.set(
self._measurement_trace_cache.copy(),
)
[docs]
def refresh_sweep(self) -> np.ndarray:
"""
Acquire a fresh trace and update the parameter caches.
The caches are kept only so QCoDeS can snapshot the trace and matching
frequency setpoints consistently after the acquisition.
"""
self._require_input_mode("refresh_sweep")
self._frequency_cache = self._make_frequency_axis()
self._measurement_trace_cache = self._backend.scan(
self._start_hz,
self._stop_hz,
self._npts,
outmask=2,
)
self._update_parameter_caches()
return self._measurement_trace_cache.copy()
def _get_measurement_trace(self) -> np.ndarray:
"""Return a newly acquired trace on every call."""
return self.refresh_sweep()
[docs]
def ask_raw(self, command: str) -> str:
return self._backend.command_text(command)
[docs]
def write_raw(self, command: str) -> None:
self._backend.command_bytes(command)
[docs]
def get_idn(self) -> dict[str, str | None]:
firmware = self.ask_raw("version").splitlines()[0]
return {
"vendor": "tinySA",
"model": "tinySA",
"serial": None,
"firmware": firmware,
}
[docs]
def close(self) -> None:
try:
self._backend.disconnect()
finally:
super().close()