Source code for qcodes_contrib_drivers.drivers.Attocube.AMC100

from __future__ import annotations

import dataclasses
import os
import sys
from collections.abc import Mapping, Sequence
from functools import partial
from itertools import compress, zip_longest
from typing import Any, overload

import numpy as np
from qcodes import validators
from qcodes.instrument import InstrumentChannel, Instrument, ChannelTuple
from qcodes.parameters import (Parameter, MultiParameter, create_on_off_val_mapping,
                               ParamRawDataType)

_POSITION_SCALE = 10 ** 6


[docs] @dataclasses.dataclass(frozen=True) class MultiAxisPosition(Sequence[float]): """A tuple-like representation of (a subset of) axis positions. Use this class to set any number of positions simultaneously using :attr:`AttocubeAMC100.multi_axis_position`. """ axis_1: float = np.nan axis_2: float = np.nan axis_3: float = np.nan @overload def __getitem__(self, index: int, /) -> float: ... @overload def __getitem__(self, index: slice, /) -> Sequence[float]: ... def __getitem__(self, index): return dataclasses.astuple(self)[index] def __len__(self) -> int: return 3 def _JSONEncoder(self) -> dict[str, Any]: return dataclasses.asdict(self)
[docs] class MultiAxisPositionParameter(MultiParameter): """A parameter that simulatenously sets multiple axis positions. Always returns all three axes, but accepts a variable number for setting. The following are allowed for the single value argument: - an instance of :class:`MultiAxisPosition` - a mapping with possible keys ``axis_1``, ``axis_2``, ``axis_3`` and float values - a sequence of floats which will be interpreted as `(axis_1, axis_2, ...)` """
[docs] def get_raw(self) -> ParamRawDataType: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") try: x, y, z, *_ = self.instrument.device.control.getPositionsAndVoltages() except self.instrument.exception_type as err: raise NotImplementedError from err else: self._update_cache(x, y, z) return MultiAxisPosition(x / _POSITION_SCALE, y / _POSITION_SCALE, z / _POSITION_SCALE)
[docs] def set_raw(self, value: ParamRawDataType) -> None: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") value_dict = dict.fromkeys(['axis_1', 'axis_2', 'axis_3'], np.nan) if isinstance(value, MultiAxisPosition): value_dict.update(dataclasses.asdict(value)) elif isinstance(value, Mapping): value_dict.update(value) elif len(value) <= 3: value_dict.update({f'axis_{i}': val for i, val in enumerate(value, start=1)}) else: raise ValueError('Too many values, expected at most 3') # tuples of boolean indicating whether axis is set and position vals = [(setit := not np.isnan(value_dict[key]), value_dict[key] * _POSITION_SCALE if setit else 0.0) for key in ('axis_1', 'axis_2', 'axis_3')] sets, targets = zip(*vals) axes_to_move = list(compress(range(3), sets)) try: # Set target positions self.instrument.device.control.MultiAxisPositioning(*sets, *targets) # Start moving for axis in axes_to_move: self.instrument.device.control.setControlMove(axis, True) # Wait for target reached while not all(self.instrument.device.status.getStatusTargetRange(axis) for axis in axes_to_move): pass # Stop moving for axis in axes_to_move: self.instrument.device.control.setControlMove(axis, False) except self.instrument.exception_type as err: raise NotImplementedError from err else: # Cache target position on axis channels self._update_cache(**value_dict)
def _update_cache(self, axis_1: float = np.nan, axis_2: float = np.nan, axis_3: float = np.nan): if self.instrument is None: # mypy :) raise RuntimeError("No instrument attached to Parameter.") if not np.isnan(axis_1): self.instrument.axis_1.position.cache.set(axis_1) if not np.isnan(axis_2): self.instrument.axis_2.position.cache.set(axis_2) if not np.isnan(axis_3): self.instrument.axis_3.position.cache.set(axis_3)
[docs] class AMC100Axis(InstrumentChannel): def __init__(self, parent: 'AttocubeAMC100', name: str, axis: int, label: str | None = None, **kwargs: Any) -> None: super().__init__(parent, name, label=label, **kwargs) self._axis = axis - 1 self.actor_type = Parameter( 'actor_type', get_cmd=partial(self.parent.device.control.getActorType, self._axis), val_mapping={'linear': 0, 'rotator': 1, 'goniometer': 2}, label='Actor type', instrument=self ) self.open_loop_status = Parameter( 'open_loop_status', get_cmd=partial(self.parent.device.status.getOlStatus, self._axis), val_mapping={'NUM': 0, 'OL': 1, 'None': 2, 'RES': 3}, label='Feedback status', instrument=self ) self.reference_position_valid = Parameter( 'reference_position_valid', get_cmd=partial(self.parent.device.status.getStatusReference, self._axis), label='Refrence position valid', instrument=self ) self.reference_position = Parameter( 'reference_position', get_cmd=self._get_reference_position, scale=_POSITION_SCALE, label=f"Reference Position {f'axis {axis}' if label is None else label}", unit='mm' if self.actor_type() == 'linear' else '°', instrument=self ) self.position = Parameter( 'position', get_cmd=partial(self.parent.device.move.getPosition, self._axis), set_cmd=self._move_to_target_position, set_parser=int, scale=10**6, label=f"Position {f'axis {axis}' if label is None else label}", unit='mm' if self.actor_type() == 'linear' else '°', instrument=self ) self.frequency = Parameter( 'frequency', get_cmd=partial(self.parent.device.control.getControlFrequency, self._axis), set_cmd=partial(self.parent.device.control.setControlFrequency, self._axis), set_parser=int, scale=1e3, # Validator is not int because it's run before the parsers, # and we allow numbers that can be cast to ints vals=validators.Numbers(3, 5000), label=f"Frequency {f'axis {axis}' if label is None else label}", unit='Hz', instrument=self ) self.amplitude = Parameter( 'amplitude', get_cmd=partial(self.parent.device.control.getControlAmplitude, self._axis), set_cmd=partial(self.parent.device.control.setControlAmplitude, self._axis), set_parser=int, scale=1e3, # Validator is not int because it's run before the parsers, # and we allow numbers that can be cast to ints vals=validators.Numbers(0, 45), label=f"Amplitude {f'axis {axis}' if label is None else label}", unit='V', instrument=self ) self.output = Parameter( 'output', get_cmd=partial(self.parent.device.control.getControlOutput, self._axis), set_cmd=partial(self.parent.device.control.setControlOutput, self._axis), val_mapping=create_on_off_val_mapping(), label=f"Output {f'axis {axis}' if label is None else label}", instrument=self ) def _get_reference_position(self) -> int: if not self.reference_position_valid(): self.parent.device.control.searchReferencePosition(self._axis) return self.parent.device.control.getReferencePosition(self._axis) def _move_to_target_position(self, position: int): self.parent.device.move.setControlTargetPosition(self._axis, position) self.parent.device.control.setControlMove(self._axis, True) while not self.parent.device.status.getStatusTargetRange(self._axis): # Update qcodes cache self.position.get() self.parent.device.control.setControlMove(self._axis, False)
[docs] def move_to_reference_position(self): """This function starts an approach to the reference position. A running motion command is aborted; closed loop moving is switched on. Requires a valid reference position. """ self.parent.device.move.moveReference(self._axis)
[docs] def single_step(self, backward: bool): """This function triggers one step on the selected axis in desired direction. Parameters ---------- backward : Selects the desired direction. False triggers a forward step, true a backward step. """ self.parent.device.move.setSingleStep(self._axis, backward)
[docs] class AttocubeAMC100(Instrument): """Driver for the AMC100 position controller.""" # Tested with fw 1.3.23 def __init__(self, name: str, api_dir: os.PathLike, address: str | None = None, axis_labels: Sequence[str] = (), **kwargs: Any): super().__init__(name, **kwargs) try: sys.path.append(str(api_dir)) import AMC from ACS import AttoException except ImportError as err: sys.path.remove(str(api_dir)) raise ImportError('This driver requires the AMC-APIs package which comes packaged ' 'with the AMC software or can be downloaded from here:\n' 'https://github.com/attocube-systems/AMC-APIs') from err else: self._exception_type = AttoException if address is None: if not (discovered := AMC.discover()): raise ValueError('No devices discovered') address = list(discovered)[0] self.device = AMC.Device(address) self.device.connect() axes = [] for i, label in zip_longest((1, 2, 3), axis_labels, fillvalue=None): # Mypy ... if i is None: break axes.append(axis := AMC100Axis(self, name := f'axis_{i}', i, label)) self.add_submodule(name, axis) self.add_submodule('axis_channels', ChannelTuple(self, 'axis_channels', AMC100Axis, axes)) self.add_parameter('multi_axis_position', parameter_class=MultiAxisPositionParameter, names=('axis_1', 'axis_2', 'axis_3'), shapes=((), (), ()), units=[ax.position.unit for ax in self.axis_channels], labels=[ax.position.label for ax in self.axis_channels]) self.connect_message() @property def exception_type(self) -> Exception: return self._exception_type
[docs] def close(self): self.device.close() super().close()
[docs] def get_idn(self) -> dict[str, str | None]: return {"vendor": "Attocube", "model": self.device.description.getDeviceType(), "serial": self.device.system_service.getSerialNumber(), "firmware": self.device.system_service.getFirmwareVersion()}