Source code for qcodes_contrib_drivers.drivers.Andor.Andor_iDus4xx

"""Qcodes driver for Andor iDus 4xx cameras.

Tested with a Andor iDus 416. A typical workflow would look something
like this::

    ccd = AndorIDus4xx('ccd')
    ccd.acquisition_mode('kinetics')
    ccd.read_mode('random track')
    ccd.preamp_gain(1.)
    ccd.number_accumulations(2)
    ccd.number_kinetics(10)
    ccd.random_track_settings((2, [20, 30, 40, 40]))
    # The following time parameters are computed to a closest valid
    # value by the SDK. Setting all to zero would hence result in the
    # shortest possible acquisition time. These values should be set
    # last since they depend on other settings.
    ccd.exposure_time(0)
    ccd.accumulation_cycle_time(0)
    ccd.kinetic_cycle_time(0)
    # Acquire data
    data = ccd.ccd_data()
    data.shape  # (10, 2, 2000)
    # The shape of the ccd_data parameter is automatically adjusted to
    # the acquisition and read modes
    ccd.acquisition_mode('single scan')
    ccd.read_mode('full vertical binning')
    data = ccd.ccd_data()
    data.shape  # (2000,)

TODO (thangleiter, 23/11/11):

 - Triggering
 - Handle shutter modes
 - Multiple cameras

"""
from __future__ import annotations

import itertools
import operator
import textwrap
import time
from collections.abc import Callable, Sequence, Iterator
from functools import partial, wraps
from typing import Any, Dict, Literal, Optional, Tuple, TypeVar, NamedTuple

import numpy as np
import numpy.typing as npt
from qcodes import validators
from qcodes.instrument import Instrument
from qcodes.parameters import (ParameterBase, ParamRawDataType, DelegateParameter,
                               ManualParameter, MultiParameter, Parameter, ParameterWithSetpoints,
                               create_on_off_val_mapping)
from qcodes.parameters.cache import _Cache, _CacheProtocol
from tqdm import tqdm

from . import post_processing
from .private.andor_sdk import atmcd64d, SDKError

_T = TypeVar('_T')

_ACQUISITION_TIMEOUT_FACTOR = 1.5
"""Relative overhead for acquisition timeout."""
_MINIMUM_ACQUISITION_TIMEOUT = 1.0
"""Minimum acquisition timeout in seconds."""


[docs] class AcquisitionTimings(NamedTuple): """Timings computed by the SDK.""" exposure_time: float accumulation_cycle_time: float kinetic_cycle_time: float
class _AcquisitionParams(NamedTuple): """Data tuple defining acquisition parameters and objects. Used internally. """ timeout_ms: int cycle_time: float number_accumulations: int number_frames_acquired: int buffer: npt.NDArray[np.int32] shape: tuple[int, ...] fetch_lazy: bool @wraps(textwrap.dedent) def dedent(text: str | None) -> str | None: """Wrap textwrap.dedent for mypy for use with __doc__ attributes.""" return textwrap.dedent(text) if text is not None else None def _merge_docstrings(*objs) -> str | None: doc = '' for obj in objs: if obj.__doc__ is not None: doc = doc + obj.__doc__ return doc if doc != '' else None class _HeterogeneousSequence(validators.Validator[Sequence[Any]]): """A validator for heterogeneous sequences.""" def __init__( self, elt_validators: Sequence[validators.Validator[Any]] = (validators.Anything(),) ) -> None: self._elt_validators = elt_validators self._valid_values = ([vval for vval in itertools.chain(*( elt_validator._valid_values for elt_validator in self._elt_validators ))],) def validate(self, value: Sequence[Any], context: str = "") -> None: if not isinstance(value, Sequence): raise TypeError(f"{value!r} is not a sequence; {context}") if len(value) != self.length: raise ValueError( f"{value!r} has not length {self.length} but {len(value)}" ) for elt, validator in zip(value, self._elt_validators): if not isinstance(validator, validators.Anything): validator.validate(elt) @property def elt_validators(self) -> Sequence[validators.Validator[Any]]: return self._elt_validators @property def length(self) -> int: return len(self.elt_validators) class _PostProcessingCallable(validators.Validator[Callable[[npt.NDArray[np.int32]], npt.NDArray[np.int32]]]): """A validator for post-processing functions.""" def __init__(self) -> None: self._valid_values = (lambda x: x,) def __repr__(self) -> str: return '<Callable[[npt.NDArray[np.int32]], npt.NDArray[np.int32]>' def validate(self, value: Callable[..., Any], context: str = "") -> None: if not callable(value) and isinstance(value, post_processing.PostProcessingFunction): raise TypeError(f"{value!r} is not a post-processing function; {context}")
[docs] class DetectorPixelsParameter(MultiParameter): """Stores the detector size in pixels.""" instrument: AndorIDus4xx
[docs] class DetectorPixels(NamedTuple): horizontal: int vertical: int
[docs] def get_raw(self) -> DetectorPixels: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") return self.DetectorPixels(*self.instrument.atmcd64d.get_detector())
[docs] class PixelSizeParameter(MultiParameter): """Stores the pixel size in microns.""" instrument: AndorIDus4xx
[docs] class PixelSize(NamedTuple): horizontal: float vertical: float
[docs] def get_raw(self) -> PixelSize: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") return self.PixelSize(*self.instrument.atmcd64d.get_pixel_size())
[docs] class DetectorSizeParameter(MultiParameter): """Stores the detector size in microns.""" instrument: AndorIDus4xx
[docs] class DetectorSize(NamedTuple): horizontal: float vertical: float
[docs] def get_raw(self) -> DetectorSize: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") px_x, px_y = self.instrument.atmcd64d.get_detector() size_x, size_y = self.instrument.atmcd64d.get_pixel_size() return self.DetectorSize(px_x * size_x, px_y * size_y)
[docs] class AcquiredPixelsParameter(MultiParameter): """Returns the shape of a single frame for the current settings.""" instrument: AndorIDus4xx
[docs] class AcquiredPixels(NamedTuple): horizontal: int vertical: int
[docs] def get_raw(self) -> AcquiredPixels: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") width, height = self.instrument.detector_pixels.get_latest() read_mode = self.instrument.read_mode.get_latest() if read_mode == 'image': try: hbin, vbin, hstart, hend, vstart, vend = self.instrument.image_settings.get() except TypeError: raise RuntimeError('Please set the image_settings parameter.') from None width = (hend - hstart + 1) // hbin height = (vend - vstart + 1) // vbin elif read_mode == 'multi track': try: height, *_ = self.instrument.multi_track_settings.get() except TypeError: raise RuntimeError('Please set the multi_track_settings parameter.') from None elif read_mode == 'random track': try: height, _ = self.instrument.random_track_settings.get() except TypeError: raise RuntimeError('Please set the random_track_settings parameter.') from None elif read_mode in ('single track', 'full vertical binning'): height = 1 return self.AcquiredPixels(width, height)
[docs] class SingleTrackSettingsParameter(MultiParameter): """Represents the settings for single-track acquisition.""" instrument: AndorIDus4xx
[docs] class SingleTrackSettings(NamedTuple): centre: int height: int
[docs] def get_raw(self) -> Optional[SingleTrackSettings]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") val = self.cache.get(False) return self.SingleTrackSettings(*val) if val is not None else None
[docs] def set_raw(self, val: SingleTrackSettings): if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") self.instrument.atmcd64d.set_single_track(*val)
[docs] class MultiTrackSettingsParameter(MultiParameter): """ Represents the settings for multi-track acquisition. When setting, a sequence of *three* numbers (number, height, and offset). When getting, a tuple of *three* numbers (number, height, offset) is again returned. In addition, the CCD calculates the gap between tracks and the offset from the bottom row. These are stored as properties of the parameter, i.e., accessible through :attr:`gap` and :attr:`bottom`, respectively. """ instrument: AndorIDus4xx
[docs] class MultiTrackSettings(NamedTuple): number: int height: int offset: int
_bottom: int | None = None _gap: int | None = None @property def bottom(self) -> int | None: """The bottom row as computed by the CCD.""" return self._bottom @property def gap(self) -> int | None: """The gap between rows as computed by the CCD.""" return self._gap
[docs] def get_raw(self) -> Optional[MultiTrackSettings]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") val = self.cache.get(False) return self.MultiTrackSettings(*val) if val is not None else None
[docs] def set_raw(self, val: Tuple[int, int, int]): if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") self._bottom, self._gap = self.instrument.atmcd64d.set_multi_track(*val)
[docs] class RandomTrackSettingsParameter(MultiParameter): """Represents the settings for random-track acquisition.""" instrument: AndorIDus4xx
[docs] class RandomTrackSettings(NamedTuple): number_tracks: int areas: Sequence[int]
[docs] def get_raw(self) -> Optional[RandomTrackSettings]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") val = self.cache.get(False) return self.RandomTrackSettings(*val) if val is not None else None
[docs] def set_raw(self, val: RandomTrackSettings): if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") self.instrument.atmcd64d.set_random_tracks(*val)
[docs] class ImageSettingsParameter(MultiParameter): """Represents the settings for image acquisition.""" instrument: AndorIDus4xx
[docs] class ImageSettings(NamedTuple): hbin: int vbin: int hstart: int hend: int vstart: int vend: int
[docs] def get_raw(self) -> Optional[ImageSettings]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") val = self.cache.get(False) return self.ImageSettings(*val) if val is not None else None
[docs] def set_raw(self, val: ImageSettings): if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") self.instrument.atmcd64d.set_image(*val)
[docs] class FastKineticsSettingsParameter(MultiParameter): """Represents fast kinetics settings.""" instrument: AndorIDus4xx
[docs] class FastKineticsSettings(NamedTuple): exposed_rows: int series_length: int time: float mode: int hbin: int vbin: int offset: int
[docs] def get_raw(self) -> Optional[FastKineticsSettings]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") val = self.cache.get(False) return self.FastKineticsSettings(*val) if val is not None else None
[docs] def set_raw(self, val: FastKineticsSettings): if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") self.instrument.atmcd64d.set_fast_kinetics(*val) # Set the number of frames so that CCDData knows the correct # shape of the data. self.instrument.number_kinetics.set(val[1]) # The exposure time always seems to be 0 in fast kinetics mode # self.instrument.exposure_time.set(val[2]) self.instrument.read_mode.set(self.instrument.read_mode.inverse_val_mapping[val[3]])
[docs] class ParameterWithSetSideEffect(Parameter): """A :class:`Parameter` allowing for side effects on set events. Parameters ---------- set_side_effect : A callable that is run before every set event. Receives the set value as sole argument. """ def __init__(self, name: str, set_side_effect: Callable[[Any], None], **kwargs: Any) -> None: if not callable(set_cmd := kwargs.pop('set_cmd', False)): raise ValueError('ParameterWithSetSideEffect requires a set_cmd') def set_raw(value: ParamRawDataType) -> None: # Parameter does not allow overriding set_raw method set_side_effect(value) set_cmd(value) super().__init__(name, set_cmd=set_raw, **kwargs)
[docs] class PixelAxis(Parameter): """ A parameter that enumerates the pixels along an axis. If acquisition is in 'image' mode, getting this parameter returns the center pixels of subpixels binned together, e.g. for the horizontal axis:: px = [hstart + i * hbin + hbin // 2 for i in range((hend - hstart) // hbin + 1)] Otherwise, simply enumerates the number of pixels starting from 1. If you have a calibration of horizontal pixels to, for example in a spectrograph, wavelength at hand, set this parameter's get_parser and unit. """ instrument: AndorIDus4xx def __init__(self, name: str, dimension: Literal[0, 1], instrument: 'AndorIDus4xx', **kwargs: Any) -> None: self.dimension = dimension super().__init__(name, instrument, **kwargs)
[docs] def get_raw(self) -> npt.NDArray[np.int_]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") if self.instrument.read_mode.get_latest() == 'image': try: hbin, vbin, hstart, hend, vstart, vend = self.instrument.image_settings.get() except TypeError: raise RuntimeError('Please set the image_settings parameter.') from None bins = [hbin, vbin] starts = [hstart, vstart] ends = [hend, vend] return np.arange(starts[self.dimension], ends[self.dimension] + 1, bins[self.dimension], dtype=np.int_) + bins[self.dimension] // 2 # In the other read modes there is no horizontal binning whereas for the vertical axis # select rows may be read out and binned, so we cannot assign meaningful 'pixel' values to # them. return np.arange(1, self.instrument.acquired_pixels.get_latest()[self.dimension] + 1, dtype=np.int_)
[docs] class TimeAxis(Parameter): """ A parameter that holds the start of each exposure window. If the acquisition mode is a kinetic series, the size corresponds to number_kinetics(), otherwise it's always 1. """ instrument: AndorIDus4xx
[docs] def get_raw(self) -> npt.NDArray[np.float64]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") n_pts = self.instrument.acquired_frames.get() dt = self.instrument.kinetic_cycle_time.get() return np.arange(0, dt * n_pts, dt)
[docs] class PersistentDelegateParameter(DelegateParameter): """A delegate parameter with an independent cache.""" def __init__(self, name: str, source: Parameter | None, *args: Any, **kwargs: Any): super().__init__(name, source, *args, **kwargs) self.cache: _CacheProtocol = _Cache(self, max_val_age=kwargs.get('max_val_age', None))
[docs] class CCDData(ParameterWithSetpoints): """ Parameter class for data taken with an Andor CCD. The data is saved in an integer array with dynamic shape depending on the acquisition and readout modes. - If the acquisition mode is a kinetic series, the first axis is a :class:`TimeAxis` with size the number of frames, otherwise it is empty. - The last axes correspond to the image dimensions, which may be 1d or 2d depending on the readout mode. If 2d, the y-axis (vertical dimension) is stored first. Note: In 2d mode, the last two axes are switched around compared to the rest of this driver. """ instrument: AndorIDus4xx _delegates: set['CCDDataDelegateParameter'] = set()
[docs] def get_raw(self) -> npt.NDArray[np.int32]: if self.instrument is None: raise RuntimeError("No instrument attached to Parameter.") # Calls get_acquisition_timings() to get the correct timing info. data = self.instrument._get_acquisition_data() self.instrument.arm() self.instrument.start_acquisition() t0 = time.perf_counter() try: for frame in range(data.number_frames_acquired): self.instrument.log.debug('Acquiring frame ' f'{frame}/{data.number_frames_acquired}.') for accumulation in range(data.number_accumulations): self.instrument.log.debug('Acquiring accumulation ' f'{accumulation}/{data.number_accumulations}.') self.instrument.atmcd64d.wait_for_acquisition_timeout(data.timeout_ms) if not data.fetch_lazy: self.instrument.log.debug('Fetching frame ' f'{frame}/{data.number_frames_acquired}.') self.instrument.atmcd64d.get_oldest_image_by_reference(data.buffer[frame]) if data.fetch_lazy: self.instrument.log.debug('Fetching all frames.') self.instrument.atmcd64d.get_acquired_data_by_reference(data.buffer.reshape(-1)) except KeyboardInterrupt: self.instrument.log.debug('Aborting acquisition after ' f'{time.perf_counter() - t0:.3g} s.') self.instrument.abort_acquisition() else: self.instrument.log.debug('Finished acquisition after ' f'{time.perf_counter() - t0:.3g} s.') finally: return data.buffer.reshape(data.shape)
[docs] def register_delegate(self, delegate: 'CCDDataDelegateParameter'): self._delegates.add(delegate)
@property def setpoints(self) -> Sequence[ParameterBase]: # Only here for mypy: https://github.com/python/mypy/issues/5936 return super().setpoints @setpoints.setter def setpoints(self, setpoints: Sequence[ParameterBase]) -> None: # https://github.com/python/mypy/issues/5936#issuecomment-1429175144 ParameterWithSetpoints.setpoints.fset(self, setpoints) # type: ignore[attr-defined] for delegate in self._delegates: delegate.setpoints = setpoints @property def vals(self) -> validators.Validator | None: # Only here for mypy: https://github.com/python/mypy/issues/5936 return super().vals @vals.setter def vals(self, vals: validators.Validator | None) -> None: # https://github.com/python/mypy/issues/5936#issuecomment-1429175144 ParameterWithSetpoints.vals.fset(self, vals) # type: ignore[attr-defined] for delegate in self._delegates: delegate.vals = vals
[docs] class CCDDataDelegateParameter(DelegateParameter, ParameterWithSetpoints): """A DelegateParameter that can be used as a ParameterWithSetpoints.""" def __init__(self, name: str, source: CCDData, **kwargs: Any): kwargs.setdefault('vals', getattr(source, 'vals')) kwargs.setdefault('setpoints', getattr(source, 'setpoints')) kwargs.setdefault('snapshot_get', getattr(source, '_snapshot_get')) kwargs.setdefault('snapshot_value', getattr(source, '_snapshot_value')) super().__init__(name, source, **kwargs) self._register_with_source(source) def _register_with_source(self, source): while not isinstance(source, CCDData): try: source = source.source except AttributeError: raise ValueError('Expected source to be CCData or delegate thereof.') source.register_delegate(self)
[docs] class AndorIDus4xx(Instrument): """ Instrument driver for the Andor iDus 4xx family CCDs. Args: name: Instrument name. dll_path: Path to the atmcd64.dll file. If not set, a default path is used. camera_id: ID for the desired CCD. min_temperature: The minimum temperature of operation for the CCD. Defaults to the value the model supports. Note that that might apply for water cooling only. Attributes: serial_number: Serial number of the CCD. head_model: Head model of the CCD. firmware_version: Firmware version of the CCD. firmware_build: Firmware build of the CCD. acquisition_mode_capabilities: Available acquisition modes. read_mode_capabilities: Available read modes. trigger_mode_capabilities: Available trigger modes. pixel_mode_capabilities: Bit-depth and color mode. feature_capabilities: Available camera and SDK features. """ # For iDus models, there are only a single channel and amplifier each AFAIK _CHANNEL: int = 0 _AMPLIFIER: int = 0 # TODO (SvenBo90): implement further trigger modes # TODO (SvenBo90): handle shutter closing and opening timings # TODO (thangleiter): implement further real-time filter modes def __init__(self, name: str, dll_path: Optional[str] = None, camera_id: int = 0, min_temperature: Optional[int] = None, **kwargs): super().__init__(name, **kwargs) # link to dll self.atmcd64d = atmcd64d(dll_path=dll_path) # initialization self.atmcd64d.initialize(' ') self.atmcd64d.set_current_camera(self.atmcd64d.get_camera_handle(camera_id)) # get camera information self.serial_number = self.atmcd64d.get_camera_serial_number() self.head_model = self.atmcd64d.get_head_model() self.firmware_version = self.atmcd64d.get_hardware_version()[4] self.firmware_build = self.atmcd64d.get_hardware_version()[5] self.acquisition_mode_capabilities = self.atmcd64d.get_capabilities()[0] self.read_mode_capabilities = self.atmcd64d.get_capabilities()[1] self.trigger_mode_capabilities = self.atmcd64d.get_capabilities()[2] self.pixel_mode_capabilities = self.atmcd64d.get_capabilities()[4] self.feature_capabilities = self.atmcd64d.get_capabilities()[5] # add the instrument parameters self.add_parameter('accumulation_cycle_time', get_cmd=self.atmcd64d.get_acquisition_timings, set_cmd=self.atmcd64d.set_accumulation_cycle_time, get_parser=lambda ans: float(ans[1]), max_val_age=0, unit='s', label='accumulation cycle time', docstring=dedent(self.atmcd64d.set_accumulation_cycle_time.__doc__)) self.add_parameter('cooler', get_cmd=self.atmcd64d.is_cooler_on, set_cmd=self._set_cooler, val_mapping=create_on_off_val_mapping(on_val=1, off_val=0), label='cooler', docstring=dedent(self.atmcd64d.cooler_on.__doc__)) self.add_parameter('cooler_mode', set_cmd=self.atmcd64d.set_cooler_mode, val_mapping={'maintain': 1, 'return': 0}, label='Cooler mode', initial_value='return', docstring=dedent(""" Determines whether the cooler is switched off when the camera is shut down. 'maintain' means it is maintained on shutdown, 'return' means the camera returns to ambient temperature. Defaults to 'return'. """)) self.add_parameter('cosmic_ray_filter_mode', get_cmd=self.atmcd64d.get_filter_mode, set_cmd=self.atmcd64d.set_filter_mode, val_mapping=create_on_off_val_mapping(on_val=2, off_val=0), label='Cosmic ray filter mode', docstring=dedent(self.atmcd64d.set_filter_mode.__doc__)) self.add_parameter('data_averaging_filter_mode', get_cmd=self.atmcd64d.filter_get_data_averaging_mode, set_cmd=self.atmcd64d.filter_set_data_averaging_mode, val_mapping={'No Averaging Filter': 0, 'Recursive Averaging Filter': 5, 'Frame Averaging Filter': 6}, label='Data averaging filter mode', docstring=dedent(self.atmcd64d.filter_set_data_averaging_mode.__doc__)) self.add_parameter('data_averaging_filter_factor', get_cmd=self.atmcd64d.filter_get_averaging_factor, set_cmd=self.atmcd64d.filter_set_averaging_factor, vals=validators.Ints(1), label='Data averaging filter factor', docstring=dedent(self.atmcd64d.filter_set_averaging_factor.__doc__)) self.add_parameter('data_averaging_filter_frame_count', get_cmd=self.atmcd64d.filter_get_averaging_frame_count, set_cmd=self.atmcd64d.filter_set_averaging_frame_count, vals=validators.Ints(1), label='Data averaging filter frame count', docstring=dedent( self.atmcd64d.filter_set_averaging_frame_count.__doc__ )) self.add_parameter('detector_size', parameter_class=DetectorSizeParameter, names=DetectorSizeParameter.DetectorSize._fields, shapes=((), ()), units=('μm', 'μm'), labels=('Horizontal chip size', 'Vertical chip size'), docstring=DetectorSizeParameter.__doc__, snapshot_value=True) self.add_parameter('exposure_time', get_cmd=self.atmcd64d.get_acquisition_timings, set_cmd=self.atmcd64d.set_exposure_time, get_parser=lambda ans: float(ans[0]), max_val_age=0, unit='s', label='exposure time', docstring=dedent(self.atmcd64d.set_exposure_time.__doc__)) self.add_parameter('fast_kinetics_settings', parameter_class=FastKineticsSettingsParameter, names=FastKineticsSettingsParameter.FastKineticsSettings._fields, shapes=((), (), (), (), (), (), ()), units=('px', 'px', 's', '', 'px', 'px', 'px'), vals=_HeterogeneousSequence([validators.Ints(1), validators.Ints(1), validators.Numbers(0), validators.Enum(0, 4), validators.Ints(1), validators.Ints(1), validators.Ints(0)]), docstring=dedent(self.atmcd64d.set_fast_kinetics.__doc__), snapshot_value=True) self.add_parameter('fast_external_trigger', set_cmd=self.atmcd64d.set_fast_ext_trigger, val_mapping=( create_on_off_val_mapping(on_val=1, off_val=0) | {'Unset': -1} ), initial_cache_value='Unset', label='Fast external trigger mode', docstring=dedent(self.atmcd64d.set_fast_ext_trigger.__doc__)) self.add_parameter('fastest_recommended_vertical_shift_speed', get_cmd=self.atmcd64d.get_fastest_recommended_vs_speed, get_parser=operator.itemgetter(1), unit='μs/px', docstring=dedent( self.atmcd64d.get_fastest_recommended_vs_speed.__doc__ )) speeds = [round(self.atmcd64d.get_hs_speed(self._CHANNEL, self._AMPLIFIER, index), 3) for index in range(self.atmcd64d.get_number_hs_speeds(self._CHANNEL, self._AMPLIFIER))] self.add_parameter('horizontal_shift_speed', label='Horizontal shift speed', set_cmd=partial(self.atmcd64d.set_hs_speed, self._AMPLIFIER), val_mapping={speed: index for index, speed in enumerate(speeds)} | {'Unset': -1}, initial_cache_value='Unset', unit='MHz', docstring=dedent(self.atmcd64d.set_hs_speed.__doc__)) self.add_parameter('keep_clean_time', get_cmd=self.atmcd64d.get_keep_clean_time, unit='s', label='Keep clean cycle duration', docstring=dedent(self.atmcd64d.get_keep_clean_time.__doc__)) self.add_parameter('kinetic_cycle_time', get_cmd=self.atmcd64d.get_acquisition_timings, set_cmd=self.atmcd64d.set_kinetic_cycle_time, get_parser=lambda ans: float(ans[2]), max_val_age=0, unit='s', label='Kinetic cycle time', docstring=dedent(self.atmcd64d.set_kinetic_cycle_time.__doc__)) self.add_parameter('multi_track_settings', parameter_class=MultiTrackSettingsParameter, names=MultiTrackSettingsParameter.MultiTrackSettings._fields, shapes=((), (), ()), units=('px', 'px', 'px'), vals=_HeterogeneousSequence([validators.Ints(1), validators.Ints(1), validators.Ints(0)]), docstring=dedent(self.atmcd64d.set_multi_track.__doc__), snapshot_value=True) self.add_parameter('number_accumulations', set_cmd=self.atmcd64d.set_number_accumulations, initial_value=1, label='number accumulations', docstring=dedent(self.atmcd64d.set_number_accumulations.__doc__)) self.add_parameter('number_kinetics', set_cmd=self.atmcd64d.set_number_kinetics, initial_value=1, label='number of frames', docstring=dedent(self.atmcd64d.set_number_kinetics.__doc__)) self.add_parameter('detector_pixels', parameter_class=DetectorPixelsParameter, names=DetectorPixelsParameter.DetectorPixels._fields, shapes=((), ()), units=('px', 'px'), labels=('Horizontal number of pixels', 'Vertical number of pixels'), docstring=DetectorPixelsParameter.__doc__, snapshot_value=True) self.add_parameter('pixel_size', parameter_class=PixelSizeParameter, names=PixelSizeParameter.PixelSize._fields, shapes=((), ()), units=('μm', 'μm'), labels=('Horizontal pixel size', 'Vertical pixel size'), docstring=PixelSizeParameter.__doc__, snapshot_value=True) # Real-time photon counting self.add_parameter('photon_counting', set_cmd=self.atmcd64d.set_photon_counting, val_mapping=create_on_off_val_mapping(), label='Photon counting enabled', initial_cache_value=False, docstring=dedent(self.atmcd64d.set_photon_counting.__doc__)) no_of_divisions = self.atmcd64d.get_number_photon_counting_divisions() self.add_parameter('photon_counting_divisions', set_cmd=partial(self.atmcd64d.set_photon_counting_divisions, no_of_divisions), vals=validators.Sequence(validators.Ints(1, 65535), length=no_of_divisions + 1, require_sorted=True), label='Photon counting divisions', docstring=dedent(self.atmcd64d.set_photon_counting_divisions.__doc__)) self.add_parameter('photon_counting_threshold', set_cmd=self.atmcd64d.set_photon_counting_threshold, vals=validators.Sequence(validators.Ints(1, 65535), length=2, require_sorted=True), label='Photon counting threshold', docstring=dedent(self.atmcd64d.set_photon_counting_threshold.__doc__)) self.add_parameter('post_processing_function', label='Post processing function', parameter_class=ManualParameter, initial_value=post_processing.Identity(), vals=_PostProcessingCallable(), set_parser=self._parse_post_processing_function, docstring="A callable with signature f(data) -> processed_data that is " "used as the ccd_data parameter get_parser.") gains = [round(self.atmcd64d.get_preamp_gain(index), 3) for index in range(self.atmcd64d.get_number_preamp_gains())] self.add_parameter('preamp_gain', label='Pre-Amplifier gain', set_cmd=self.atmcd64d.set_preamp_gain, val_mapping={gain: index for index, gain in enumerate(gains)} | {'Unset': -1}, initial_cache_value='Unset', docstring=dedent(self.atmcd64d.set_preamp_gain.__doc__)) self.add_parameter('random_track_settings', parameter_class=RandomTrackSettingsParameter, names=RandomTrackSettingsParameter.RandomTrackSettings._fields, shapes=((), ()), units=('', 'px'), vals=_HeterogeneousSequence([validators.Ints(1), validators.Sequence(validators.Ints(1))]), docstring=dedent(self.atmcd64d.set_random_tracks.__doc__), snapshot_value=True) self.add_parameter('readout_time', get_cmd=self.atmcd64d.get_readout_time, label='Readout time', docstring=dedent(self.atmcd64d.get_readout_time.__doc__)) temperature_range = self.atmcd64d.get_temperature_range() self.add_parameter('set_temperature', set_cmd=self.atmcd64d.set_temperature, vals=validators.Ints(min_value=min_temperature or temperature_range[0], max_value=temperature_range[1]), unit=u"\u00b0" + 'C', label='set temperature', docstring=dedent(self.atmcd64d.set_temperature.__doc__)) self.add_parameter('shutter_mode', set_cmd=self._set_shutter_mode, val_mapping={'fully auto': 0, 'permanently open': 1, 'permanently closed': 2}, label='shutter mode', initial_value='fully auto', docstring=dedent(self.atmcd64d.set_shutter.__doc__)) self.add_parameter('single_track_settings', parameter_class=SingleTrackSettingsParameter, names=SingleTrackSettingsParameter.SingleTrackSettings._fields, shapes=((), ()), units=('px', 'px'), vals=validators.Sequence(validators.Ints(1), length=2), docstring=dedent(self.atmcd64d.set_single_track.__doc__), snapshot_value=True) # Real-time spurious noise filter self.add_parameter('spurious_noise_filter_mode', get_cmd=self.atmcd64d.filter_get_mode, set_cmd=self.atmcd64d.filter_set_mode, val_mapping={'No Filter': 0, 'Median Filter': 1, 'Level Above Filter': 2, 'Interquartile Range Filter': 3, 'Noise Threshold Filter': 4}, label='Spurious noise filter mode', docstring=dedent(self.atmcd64d.filter_set_mode.__doc__)) self.add_parameter('spurious_noise_filter_threshold', get_cmd=self.atmcd64d.filter_get_threshold, set_cmd=self.atmcd64d.filter_set_threshold, vals=validators.Numbers(0, 65535), label='Spurious noise threshold', docstring=dedent(self.atmcd64d.filter_set_threshold.__doc__)) self.add_parameter('status', label='Camera Status', get_cmd=self.atmcd64d.get_status, get_parser=self._parse_status, docstring=dedent(self.atmcd64d.get_status.__doc__)) self.add_parameter('temperature', get_cmd=self.atmcd64d.get_temperature, unit=u"\u00b0" + 'C', label='temperature', docstring=dedent(self.atmcd64d.get_temperature.__doc__)) self.add_parameter('trigger_mode', set_cmd=self.atmcd64d.set_trigger_mode, val_mapping={'internal': 0, 'external': 1, 'external start': 6, 'external exposure': 7, 'software trigger': 10}, initial_value='internal', docstring=dedent(self.atmcd64d.set_trigger_mode.__doc__)) speeds = [self.atmcd64d.get_vs_speed(index) for index in range(self.atmcd64d.get_number_vs_speeds())] self.add_parameter('vertical_shift_speed', label='Vertical shift speed', set_cmd=self.atmcd64d.set_vs_speed, val_mapping={speed: index for index, speed in enumerate(speeds)} | {'Unset': -1}, initial_cache_value='Unset', unit='μs/px', docstring=dedent(self.atmcd64d.set_vs_speed.__doc__)) # Parameters that depend on other parameters and therefore cannot be sorted alphabetically self.add_parameter('image_settings', parameter_class=ImageSettingsParameter, names=ImageSettingsParameter.ImageSettings._fields, shapes=((), (), (), (), (), ()), units=('px', 'px', 'px', 'px', 'px', 'px'), vals=_HeterogeneousSequence([ validators.Ints(1, self.detector_pixels.get_latest()[0]), validators.Ints(1, self.detector_pixels.get_latest()[1]), validators.Ints(1, self.detector_pixels.get_latest()[0]), validators.Ints(1, self.detector_pixels.get_latest()[0]), validators.Ints(1, self.detector_pixels.get_latest()[1]), validators.Ints(1, self.detector_pixels.get_latest()[1]) ]), docstring=dedent(self.atmcd64d.set_image.__doc__), snapshot_value=True) self.add_parameter('acquired_accumulations', get_cmd=self._get_acquired_accumulations, # Always infer from acquisition mode, never use cache max_val_age=0, docstring='Number of accumulations per frame.') self.add_parameter('acquired_frames', get_cmd=self._get_acquired_frames, # Always infer from acquisition mode, never use cache max_val_age=0, docstring='Number of frames that will be acquired.') self.add_parameter('acquired_pixels', parameter_class=AcquiredPixelsParameter, names=AcquiredPixelsParameter.AcquiredPixels._fields, shapes=((), ()), units=('px', 'px'), docstring=AcquiredPixelsParameter.__doc__, snapshot_value=True) self.add_parameter('time_axis', parameter_class=TimeAxis, vals=validators.Arrays(shape=(self.acquired_frames.get_latest,)), unit='s', label='Time axis', docstring=TimeAxis.__doc__) self.add_parameter('horizontal_axis', parameter_class=PixelAxis, dimension=0, vals=validators.Arrays(shape=(self._acquired_horizontal_pixels,)), unit='px', label='Horizontal axis', docstring=PixelAxis.__doc__) self.add_parameter('vertical_axis', parameter_class=PixelAxis, dimension=1, vals=validators.Arrays(shape=(self._acquired_vertical_pixels,)), unit='px', label='Vertical axis', docstring=PixelAxis.__doc__) self.add_parameter('ccd_data', setpoints=(self.time_axis, self.vertical_axis, self.horizontal_axis,), parameter_class=CCDData, get_parser=self._parse_ccd_data, vals=validators.Arrays(shape=( self.acquired_frames.get_latest, self._acquired_vertical_pixels, self._acquired_horizontal_pixels )), unit='cts', label='CCD Data', docstring=CCDData.__doc__) self.add_parameter('ccd_data_bg_corrected', parameter_class=CCDDataDelegateParameter, source=self.ccd_data, get_parser=self._subtract_background, label='CCD Data (bg corrected)', unit='cts', docstring="CCD data with a background image previously taken " "subtracted.") self.add_parameter('ccd_data_rate', parameter_class=CCDDataDelegateParameter, source=self.ccd_data, get_parser=self._to_count_rate, label='CCD Data rate', unit='cps', docstring="CCD data (counts) divided by the total exposure time per " "frame (including accumulations).") self.add_parameter('ccd_data_rate_bg_corrected', parameter_class=CCDDataDelegateParameter, source=self.ccd_data_bg_corrected, get_parser=self._to_count_rate, label='CCD Data rate (bg corrected)', unit='cps', docstring="CCD data with a background image previously taken " "subtracted and divided by the total exposure time per " "frame (including accumulations).") self.add_parameter('background', parameter_class=PersistentDelegateParameter, source=self.ccd_data, get_parser=self._parse_background, docstring=dedent(""" Takes a background image for the current acquisition settings. Note that data conversions set in post_processing_function are still run. """)) self.add_parameter('acquisition_mode', parameter_class=ParameterWithSetSideEffect, set_cmd=self.atmcd64d.set_acquisition_mode, set_side_effect=self._process_acquisition_mode, val_mapping={'single scan': 1, 'accumulate': 2, 'kinetics': 3, 'fast kinetics': 4, 'run till abort': 5}, initial_value='single scan', label='acquisition mode', docstring=dedent(self.atmcd64d.set_acquisition_mode.__doc__)) self.add_parameter('read_mode', parameter_class=ParameterWithSetSideEffect, set_cmd=self.atmcd64d.set_read_mode, set_side_effect=self._process_read_mode, val_mapping={'full vertical binning': 0, 'multi track': 1, 'random track': 2, 'single track': 3, 'image': 4}, initial_value='full vertical binning', label='read mode', docstring=dedent(self.atmcd64d.set_read_mode.__doc__)) self.connect_message() # get methods def _get_acquired_accumulations(self) -> int: # Fast kinetics does not seem to support accumulations if self.acquisition_mode.get_latest() in {'single scan', 'fast kinetics'}: return 1 return self.number_accumulations.get_latest() def _get_acquired_frames(self) -> int: if 'kinetics' in self.acquisition_mode.get_latest(): return self.number_kinetics.get_latest() return 1 def _get_acquisition_data(self) -> _AcquisitionParams: """Prepare all relevant data needed by acquisition functions.""" settings = self.freeze_acquisition_settings() if settings['acquisition_mode'] == 'run till abort': cycle_time = settings['acquisition_timings'].kinetic_cycle_time else: cycle_time = settings['acquisition_timings'].accumulation_cycle_time timeout_ms = max(_ACQUISITION_TIMEOUT_FACTOR * cycle_time, _MINIMUM_ACQUISITION_TIMEOUT) * 1e3 number_frames = settings['acquired_frames'] number_accumulations = settings['acquired_accumulations'] number_pixels = np.prod(settings['acquired_pixels']) # In fast kinetics mode, an acquisition event only occurs once per series, # not number_frames times like in regular kinetic series mode. if settings['acquisition_mode'] != 'fast kinetics': number_frames_acquired = number_frames else: number_frames_acquired = 1 # We decide here which method we use to fetch data from the SDK. The CCD # has a circular buffer, so we should fetch data during acquisition if # the desired result is larger. fetch_lazy = number_frames < self.atmcd64d.get_size_of_circular_buffer() # TODO (thangleiter): for fast kinetics, one might want to fetch a number of images # every few acquisitions. if fetch_lazy: buffer = np.empty(number_frames * number_pixels, dtype=np.int32) else: buffer = np.empty((number_frames, number_pixels), dtype=np.int32) shape = tuple(setpoints.get().size for setpoints in self.ccd_data.setpoints) return _AcquisitionParams(timeout_ms, cycle_time, number_accumulations, number_frames_acquired, buffer, shape, fetch_lazy)
[docs] def get_idn(self) -> Dict[str, Optional[str]]: return {'vendor': 'Andor', 'model': self.head_model, 'serial': str(self.serial_number), 'firmware': f'{self.firmware_version}.{self.firmware_build}'}
# set methods def _set_cooler(self, cooler_on: int) -> None: if cooler_on == 1: self.atmcd64d.cooler_on() elif cooler_on == 0: self.atmcd64d.cooler_off() def _set_shutter_mode(self, shutter_mode: int) -> None: self.atmcd64d.set_shutter(1, shutter_mode, 30, 30) # further methods def _acquired_horizontal_pixels(self) -> int: return self.acquired_pixels.get_latest()[0] def _acquired_vertical_pixels(self) -> int: return self.acquired_pixels.get_latest()[1] def _process_acquisition_mode(self, param_val: str): # Invalidate relevant caches self.acquired_frames.cache.invalidate() self.acquired_accumulations.cache.invalidate() # Update self.ccd_data with correct dimensions setpoints: tuple[PixelAxis] | tuple[PixelAxis, PixelAxis] shape: tuple[Callable[[], int]] | tuple[Callable[[], int], Callable[[], int]] if self.vertical_axis in self.ccd_data.setpoints: setpoints = (self.vertical_axis, self.horizontal_axis) shape = (self._acquired_vertical_pixels, self._acquired_horizontal_pixels) else: setpoints = (self.horizontal_axis,) shape = (self._acquired_horizontal_pixels,) if self._has_time_dimension(param_val): self.ccd_data.setpoints = (self.time_axis,) + setpoints self.ccd_data.vals = validators.Arrays( shape=(self.acquired_frames.get_latest,) + shape ) else: self.ccd_data.setpoints = setpoints self.ccd_data.vals = validators.Arrays(shape=shape) def _process_read_mode(self, param_val: str): # Invalidate relevant caches self.acquired_pixels.cache.invalidate() # Update self.ccd_data with correct dimensions setpoints: tuple[TimeAxis] | tuple[()] shape: tuple[Callable[[], int]] | tuple[()] if self.time_axis in self.ccd_data.setpoints: setpoints = (self.time_axis,) shape = (self.acquired_frames.get_latest,) else: setpoints = () shape = () if self._has_vertical_dimension(param_val): self.ccd_data.setpoints = setpoints + (self.vertical_axis, self.horizontal_axis) self.ccd_data.vals = validators.Arrays( shape=shape + (self._acquired_vertical_pixels, self._acquired_horizontal_pixels) ) else: self.ccd_data.setpoints = setpoints + (self.horizontal_axis,) self.ccd_data.vals = validators.Arrays( shape=shape + (self._acquired_horizontal_pixels,) ) @staticmethod def _has_vertical_dimension(read_mode) -> bool: return read_mode in (1, 2, 4) @staticmethod def _has_time_dimension(acquisition_mode) -> bool: return acquisition_mode not in (1, 2) def _to_count_rate(self, val: npt.NDArray[np.int_]) -> npt.NDArray[np.float64]: """Parser to convert counts into count rate.""" # Always get the exposure time since it's computed by the camera lazily. total_exposure_time = self.exposure_time.get() if self.acquisition_mode.get_latest() not in {'single scan', 'run till abort'}: total_exposure_time *= self.number_accumulations.get_latest() return val / total_exposure_time def _parse_background(self, data: npt.NDArray) -> npt.NDArray: """Stores current acquisition settings as parameter metadata.""" self.background.load_metadata(self.freeze_acquisition_settings()) return data def _parse_ccd_data(self, val: npt.NDArray) -> npt.NDArray: # Make sure post_processing_function always gets a 3d array but return # the shape that CCDData returns. shp = list(val.shape) if not self._has_time_dimension(self.acquisition_mode.get_latest()): shp.insert(0, 1) if not self._has_vertical_dimension(self.read_mode.get_latest()): shp.insert(1, 1) return self.post_processing_function()(val.reshape(shp)).reshape(val.shape) def _parse_post_processing_function(self, val: _T) -> _T: # Make sure the post-processing function knows the dll if not hasattr(val, 'atmcd64d') or val.atmcd64d is None: setattr(val, 'atmcd64d', self.atmcd64d) return val def _parse_status(self, code: int) -> str: status = { 'DRV_IDLE': 'IDLE waiting on instructions.', 'DRV_TEMPCYCLE': 'Executing temperature cycle.', 'DRV_ACQUIRING': 'Acquisition in progress.', 'DRV_ACCUM_TIME_NOT_MET': 'Unable to meet Accumulate cycle time.', 'DRV_KINETIC_TIME_NOT_MET': 'Unable to meet Kinetic cycle time.', 'DRV_ERROR_ACK': 'Unable to communicate with card.', 'DRV_ACQ_BUFFER': 'Computer unable to read the data via the ISA ' 'slot at the required rate.', 'DRV_ACQ_DOWNFIFO_FULL': 'Computer unable to read data fast ' 'enough to stop camera memory going full.', 'DRV_SPOOLERROR': 'Overflow of the spool buffer.' } status_code = self.atmcd64d.error_codes[code] return f'{status_code}: {status[status_code]}' def _subtract_background(self, data: npt.NDArray) -> npt.NDArray: if (background := self.background.cache.get(False)) is None: raise RuntimeError("No background acquired. Perform a get on the 'background' " "parameter") if not self.background_is_valid: background_settings = {key: self.background.metadata.get(key, None) for key in self.freeze_acquisition_settings()} raise RuntimeError('Background was acquired for different settings; cannot subtract ' 'it. Consider taking a new background or changing the settings. ' f'Previous settings were: {background_settings}') return data - background
[docs] def freeze_acquisition_settings(self) -> dict[str, Any]: return {'acquisition_mode': self.acquisition_mode.get_latest(), 'acquisition_timings': self.get_acquisition_timings(), 'acquired_frames': self._get_acquired_frames(), 'acquired_accumulations': self._get_acquired_accumulations(), # acquired_pixels can change depending on other settings; perform new get 'acquired_pixels': self.acquired_pixels.get(), 'fast_kinetics_settings': self.fast_kinetics_settings.get_latest(), 'read_mode': self.read_mode.get_latest(), 'single_track_settings': self.single_track_settings.get_latest(), 'multi_track_settings': self.multi_track_settings.get_latest(), 'random_track_settings': self.random_track_settings.get_latest(), 'image_settings': self.image_settings.get_latest()}
@property def background_is_valid(self) -> bool: if not self.background.cache.valid: return False current_settings = self.freeze_acquisition_settings() background_settings = {key: self.background.metadata.get(key, None) for key in current_settings} keys_to_ignore = {'single_track_settings', 'multi_track_settings', 'random_track_settings', 'image_settings', 'fast_kinetics_settings'} if current_settings['acquisition_mode'] == 'fast kinetics': keys_to_ignore.remove('fast_kinetics_settings') if current_settings['read_mode'] != 'full vertical binning': keys_to_ignore.remove(current_settings['read_mode'].replace(' ', '_') + '_settings') return all(current_settings[key] == background_settings[key] for key in keys_to_ignore.symmetric_difference(current_settings.keys()))
[docs] def close(self) -> None: self.atmcd64d.shut_down() super().close()
[docs] def arm(self) -> None: """Arm the CCD for acquisition. This method ensures the camera is not yet acquiring, frees the internal memory, and finally prepares it for acquisition using the current acquisition timings. While not required, calling this method ahead of acquisition reduces the overhead when actually starting it. """ status = self.status.get() if not (status.startswith('DRV_IDLE') or status.startswith('DRV_ACQUIRING')): raise RuntimeError(f'Device not ready to acquire data. {status}') self.free_internal_memory() self.prepare_acquisition()
# Some methods of the dll that we expose directly on the instrument
[docs] def cancel_wait(self): self.atmcd64d.cancel_wait()
cancel_wait.__doc__ = atmcd64d.cancel_wait.__doc__
[docs] def abort_acquisition(self) -> None: if self.status().startswith('DRV_ACQUIRING'): self.log.debug('Aborting acquisition.') self.atmcd64d.abort_acquisition()
abort_acquisition.__doc__ = atmcd64d.abort_acquisition.__doc__
[docs] def prepare_acquisition(self) -> None: self.log.debug('Preparing acquisition.') self.atmcd64d.prepare_acquisition()
prepare_acquisition.__doc__ = atmcd64d.prepare_acquisition.__doc__
[docs] def start_acquisition(self) -> None: """Start the acquisition. Exposed for 'run till abort' acquisition mode and external triggering.""" if not self.status().startswith('DRV_ACQUIRING'): self.log.debug('Starting acquisition.') self.atmcd64d.start_acquisition()
start_acquisition.__doc__ = atmcd64d.start_acquisition.__doc__
[docs] def send_software_trigger(self) -> None: self.atmcd64d.send_software_trigger()
send_software_trigger.__doc__ = atmcd64d.send_software_trigger.__doc__
[docs] def free_internal_memory(self) -> None: self.log.debug('Clearing internal buffer.') self.atmcd64d.free_internal_memory()
free_internal_memory.__doc__ = atmcd64d.free_internal_memory.__doc__
[docs] def get_acquisition_timings(self) -> AcquisitionTimings: """The current acquisition timing parameters actually used by the device. This method also updates the caches of the corresponding parameters. This can be used to ensure they are snapshotted correctly when measuring. Docstring of the dll function: ------------------------------ """ timings = AcquisitionTimings(*self.atmcd64d.get_acquisition_timings()) self.exposure_time.cache.set(timings.exposure_time) self.accumulation_cycle_time.cache.set(timings.accumulation_cycle_time) self.kinetic_cycle_time.cache.set(timings.kinetic_cycle_time) return timings
get_acquisition_timings.__doc__ = _merge_docstrings(get_acquisition_timings, atmcd64d.get_acquisition_timings)
[docs] def yield_till_abort(self) -> Iterator[npt.NDArray]: """Yields data from the CCD until aborted. This method uses 'run-till-abort' mode to continuously acquire data. To use it in the main thread, simply iterate over it:: for data in ccd.yield_till_abort(): ... A concurrent application could look something like this:: import queue import threading def put(queue: queue.Queue, stop_flag: threading.Event): gen = ccd.yield_till_abort() while not stop_flag.is_set(): queue.put(next(gen)) queue = queue.Queue() stop_flag = threading.Event() thread = threading.Thread(target=put, args=(queue, stop_flag)) thread.start() # queue is continuously filled with data. current_data = queue.get() # Stop the data streaming: stop_flag.set() # Optionally cancel waiting for the next acquisition: ccd.cancel_wait() .. note:: Each iteration of this method yields a single frame of data. However, since it is technically a time-series, the yielded data has a time axis of size 1 so that for ``read_mode('full vertical binning')`` for example the shape is ``(1, horizontal_pixels)``. """ # Awkward. RLock does not have `Lock`s locked() method if not self.atmcd64d.lock.acquire(blocking=False): raise RuntimeError('Another thread is currently locking the CCD.') else: self.atmcd64d.lock.release() with self.acquisition_mode.set_to('run till abort'): data = self._get_acquisition_data() self.arm() self.start_acquisition() self.log.debug('Started acquisition in run-till-abort mode.') t_start = time.perf_counter() taken = 0 try: while True: try: self.atmcd64d.wait_for_acquisition_timeout(data.timeout_ms) except SDKError as error: # Most likely timeout before acquisition event. self.log.error(f'Error during wait_for_acquisition_timeout(): {error}') continue # Make absolutely sure a new image has arrived while not (new := self.atmcd64d.get_acquisition_progress()[1] - taken): continue if new > 1: self.log.warning(f'Skipping {new-1} new frames.') self.atmcd64d.get_most_recent_image_by_reference(data.buffer) t_elapsed = time.perf_counter() - t_start self.log.debug(f'Got new frame in {t_elapsed:.4g} s.') t_start += t_elapsed taken += new yield data.buffer.reshape(data.shape) finally: self.abort_acquisition() self.log.debug('Stopped acquisition in run-till-abort mode')
[docs] def cool_down(self, setpoint: int | None = None, target: Literal['stabilized', 'reached'] = 'reached', show_progress: bool = True) -> None: """Turn the cooler on and wait for the temperature to stabilize. Args: setpoint: The target temperature. Required if *set_temperature* is not initialized. target: Finish if temperature is reached or reached and stabilized. show_progress: Show a progressbar. The default is True. """ if setpoint is None and (setpoint := self.set_temperature.get()) is None: raise ValueError('Please set the set_temperature first or specify setpoint.') targets: tuple[str, str] | tuple[str] if target.lower() == 'reached': targets = ('DRV_TEMP_NOT_STABILIZED', 'DRV_TEMP_STABILIZED') elif target.lower() == 'stabilized': targets = ('DRV_TEMP_STABILIZED',) else: raise ValueError('target should be one of reached or stabilized.') self.set_temperature(setpoint) self.cooler.set('on') # bar does not show for negative totals, but ok with tqdm( total=(initial := self.temperature.get()) - setpoint, desc=f'{self.label} cooling down from {initial}{self.temperature.unit} ' f'to {setpoint}{self.temperature.unit}. |Delta|', unit=self.temperature.unit, disable=not show_progress ) as pbar: while (status := self.atmcd64d.get_cooling_status()[0]) not in targets: # For lack of a better method: # https://github.com/tqdm/tqdm/issues/1264 pbar.postfix = f'status={status}' pbar.n = initial - self.temperature.get() pbar.refresh() time.sleep(1) pbar.postfix = status pbar.n = pbar.total pbar.refresh()
[docs] def warm_up(self, target: int = 15, show_progress: bool = True) -> None: """Turn the cooler off and wait for the temperature to reach target. Parameters ---------- target : int, optional The target temperature. Defaults to 15C. show_progress : bool, optional Show a progressbar. The default is True. """ self.cooler.set('off') # bar does not show for negative totals, but ok with tqdm( total=target - (initial := self.temperature.get()), desc=f'{self.name} warming up from {initial}{self.temperature.unit} ' f'to {target}{self.temperature.unit}. Delta', unit=self.temperature.unit, disable=not show_progress ) as pbar: while (temp := self.temperature.get()) < target: # For lack of a better method: # https://github.com/tqdm/tqdm/issues/1264 pbar.n = temp - initial pbar.refresh() time.sleep(1) pbar.n = pbar.total pbar.refresh()