Source code for qcodes_contrib_drivers.drivers.Horiba.Horiba_FHR

from __future__ import annotations

import abc
import configparser
import ctypes
import os
import pathlib
import sys
from typing import Any, Dict, Mapping, cast

from qcodes.parameters import DelegateParameter, Parameter
from qcodes import validators
from qcodes.instrument import (ChannelList, Instrument, InstrumentBase,
                               InstrumentChannel)
from qcodes_contrib_drivers.drivers.Horiba.private.fhr_client import FHRClient
from typing_extensions import Literal


[docs] class SpeError(Exception): """Error raised by the dll."""
[docs] class Dispatcher: """Implements the interface to the motors.""" _ERROR_CODES = {0: 'NoErr', 1: 'errInvalidDispatcherName', 2: 'errInvalidFunctionName', 3: 'errInvalidParameter', 4: 'errConnectionError', 5: 'errConnectionTimeout', 6: 'errWrongResult', 7: 'errAbort', 0xFFFFFFFF: 'errForce32bit'} def __init__(self, cli, handle): self.cli = cli self.handle = handle self.config: dict[str, str]
[docs] def error_check(self, code: int): if code != 0: raise SpeError(self._ERROR_CODES.get(code))
[docs] class PortChannel(Dispatcher, InstrumentChannel): """Manages instrument communication.""" def __init__(self, parent: InstrumentBase, name: str, cli, handle, port: int): Dispatcher.__init__(self, cli, handle) InstrumentChannel.__init__(self, parent, name) self.port = ctypes.c_int(port) self.add_parameter( 'open', get_cmd=self._is_open, set_cmd=lambda flag: self._open() if flag else self._close() ) def _open(self): """Open serial port.""" code, _ = self.cli.SpeCommand(self.handle, 'Port', 'Open', self.port) self.error_check(code) def _close(self): """Close serial port.""" code, _ = self.cli.SpeCommand(self.handle, 'Port', 'Close') self.error_check(code) def _is_open(self) -> bool: """Check if the port is open.""" code, value = self.cli.SpeCommand(self.handle, 'Port', 'IsOpen', ctypes.c_int()) self.error_check(code) return bool(value)
[docs] def set_baud_rate(self, baud_rate: int = 115200): """Set port baud rate for opened pot. Should be the default.""" code, _ = self.cli.SpeCommand(self.handle, 'Port', 'SetBaudRate', ctypes.c_int(baud_rate)) self.error_check(code)
[docs] def set_timeout(self, timeout: int = 90000): """Set timeout in milliseconds.""" code, _ = self.cli.SpeCommand(self.handle, 'Port', 'SetTimeout', ctypes.c_int(timeout)) self.error_check(code)
[docs] class MotorChannel(Dispatcher, InstrumentChannel, metaclass=abc.ABCMeta): """ABC for the various motors of the device.""" def __init__(self, parent: InstrumentBase, name: str, cli, handle, motor: int, metadata: Mapping[Any, Any] | None = None, label: str | None = None): Dispatcher.__init__(self, cli, handle) InstrumentChannel.__init__(self, parent, name, metadata=metadata, label=label) self.motor = motor
[docs] @classmethod def type(cls) -> str: return cls.__name__.removesuffix('Channel')
[docs] def set_id(self, i: int): """Set motor ID. This is the `Addr` address in LabSpec6.""" code, _ = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'SetID', ctypes.c_int(i)) self.error_check(code)
[docs] def get_id(self) -> int: """Get motor ID.""" code, value = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'GetID', ctypes.c_int()) self.error_check(code) return value
[docs] def stop(self, raise_exception: bool = False): """Stop motor. Parameters ---------- raise_exception : bool, default: False Raise an 'errAbort' exception upon successful stop. """ code, _ = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'Stop') try: self.error_check(code) except SpeError as se: if raise_exception or str(se) != 'errAbort': raise
def _set_position(self, pos: int): """Set motor position. It return final motor position after movement. For grating motor the position unit depends on 'Step' value in SpeSetup structure. If 'Step' = 1 the motor position is raw motor steps. Otherwise it is the position in picometers. For slit motors the position in motor steps is the 'Position' value multiplied by 'Step' value from SpeSetup. For DC motors, the position is binary. """ code, _ = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'SetPosition', ctypes.c_int(pos)) self.error_check(code)
[docs] class PrecisionMotorChannel(MotorChannel, metaclass=abc.ABCMeta): """ABC for the precision motors of the device.""" def __init__(self, parent: InstrumentBase, name: str, cli, handle, motor: int, min_value: int = 0, max_value: int = sys.maxsize, offset: int = 0, metadata: Mapping[Any, Any] | None = None, label: str | None = None): super().__init__(parent, name, cli, handle, motor, metadata, label) self._step: int = 1 self._offset: int = offset self.add_parameter('position', label=f'{label} position', get_cmd=self._get_position, set_cmd=self._set_position, set_parser=int, vals=validators.Numbers(min_value, max_value), unit=self.unit) @property @abc.abstractmethod def unit(self) -> str: pass
[docs] def init(self): """Initialize motor with offset position (optical zero order position in motor steps).""" code, _ = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'Init', ctypes.c_int(self._offset)) self.error_check(code)
[docs] def set_setup(self, min_speed: int = 50, max_speed: int = 600, ramp: int = 650, backlash: int = 500, step: int = 2, reverse: bool = False): """Write motor setup data. Parameters ---------- min_speed : int Minimal speed max_speed : int Maximal speed ramp : int Acceleration backlash : int Backlash (~+-500..2000) step : int Operation mode: - 1: position in motor steps - 0, 2, 3 ...: position in picometers reverse : bool Rotation direction: - 0: direct - 1: inverse """ code, _ = self.cli.SpeCommandSetup( self.handle, f'{self.type()}{self.motor}', fields=(min_speed, max_speed, ramp, backlash, step, reverse) ) self.error_check(code) self._step = step self.position.unit = self.unit
def _get_position(self) -> int: """Get current position. The result depends on 'Step' value similar to "SetPosition" parameter value.""" code, value = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'GetPosition', ctypes.c_int()) self.error_check(code) return value
[docs] class DCChannel(MotorChannel): """Handles DC motors (with binary positions).""" def __init__(self, parent: InstrumentBase, name: str, cli, handle, motor: int, val_mapping: Mapping[str, Literal[0, 1]] | None = None, metadata: Mapping[Any, Any] | None = None, label: str | None = None): label = label or f'DC {motor}' MotorChannel.__init__(self, parent, name, cli, handle, motor, metadata, label) self.add_parameter('position', label=f'{label} position', get_cmd=False, set_parser=int, set_cmd=self._set_position, val_mapping=val_mapping)
[docs] class SlitChannel(PrecisionMotorChannel): """Handles the linear slit motors of the device.""" def __init__(self, parent: InstrumentBase, name: str, cli, handle, motor: int, min_value: int = 0, max_value: int = sys.maxsize, offset: int = 0, metadata: Mapping[Any, Any] | None = None, label: str | None = None): label = label or f'Slit {motor}' super().__init__(parent, name, cli, handle, motor, min_value + offset, max_value + offset, offset, metadata, label) self._offset = offset self.add_parameter('width', parameter_class=DelegateParameter, source=self.position, label=f'{label} width', vals=validators.Numbers(min_value, max_value), offset=self._offset, docstring="Actual slit opening width") @property def unit(self) -> str: # From the manual: # For slit motors the position in motor steps is the # 'Position' value multiplied by 'Step' value from SpeSetup. # What I take from this is that it always returns microns. return 'μm'
[docs] class GratingChannel(PrecisionMotorChannel): """Handles the grating rotation motors of the device.""" def __init__(self, parent: HoribaFHR, name: str, cli, handle, motor: int, min_value: int = 0, max_value: int = sys.maxsize, offset: int = 0, metadata: Mapping[Any, Any] | None = None, label: str | None = None): label = label or f'Grating {motor}' super().__init__(parent, name, cli, handle, motor, min_value, max_value, offset, metadata, label) self.add_parameter('shift', label='Zero order shift', get_cmd=False, set_cmd=self._set_shift, set_parser=int, unit=self.unit) @property def unit(self) -> str: return 'motor steps' if self._step == 1 else 'pm'
[docs] def set_ini_params(self, phase: Literal[1, 2, 3], min_speed: int = 50, max_speed: int = 600, ramp: int = 600): """Motor initialization parameters. Parameters ---------- phase : {1, 2, 3} Initialization phase. min_speed : int Minimal speed max_speed : int Maximal speed ramp : int Acceleration """ code, _ = self.cli.SpeCommandIniParams( self.handle, f'{self.type()}{self.motor}', fields=(phase, min_speed, max_speed, ramp) ) self.error_check(code)
def _set_position(self, pos: int): # Override to keep track of currently active grating in parent instrument if not isinstance(self.parent, HoribaFHR): # mypy raise RuntimeError super()._set_position(pos) self.parent._active_grating = self def _set_shift(self, shift: int): """Set zero order shift.""" code, _ = self.cli.SpeCommand(self.handle, f'{self.type()}{self.motor}', 'SetShift', ctypes.c_int(shift)) self.error_check(code)
[docs] class HoribaFHR(Instrument): """Horiba FHR driver for a 32-bit dll. This driver uses ``msl.loadlib`` to communicate with the 32-bit dll through a 32-bit server from a 64-bit client. Args: dll_dir (path_like): Directory to search for SpeControl.dll config_file (path_like): Configuration file (see below) dc_val_mappings (dict): val_mappings for the DC motors (mirrors). Should be a dict of mappings with integer keys corresponding to the mirror id. For example, for an instrument with only one mirror "Mirror2", ``{2: {'front': 0, 'side': 1}}``. Notes: The configuration file should be an ``ini``-like file with sections - ``[Firmware]`` - ``[Port]`` - ``[Spectrometer]`` - ``[Grating1]`` etc. - ``[Slit1]`` etc. - ``[Mirror1]`` etc. Examples: See ``docs/examples`` for an example notebook. """ _active_grating: GratingChannel | None = None """Stores the currently selected grating. Needs to be initialized by setting the position of any grating.""" def __init__( self, name: str, dll_dir: str | os.PathLike | pathlib.Path, config_file: str | os.PathLike | pathlib.Path, dc_val_mappings: Dict[int, Dict[str, Literal[0, 1]] | None] = {}, metadata: Mapping[Any, Any] | None = None, label: str | None = None ): self.cli = FHRClient(dll_dir) self.handle: int = self.cli.CreateSpe() self.config = configparser.ConfigParser(comment_prefixes=('==',)) self.config.read(config_file) additional_metadata = { 'Focal length': self.config['Spectrometer'].getint('Focal'), 'Coefficient of angle': self.config['Spectrometer'].getfloat( 'CoefficientOfAngle' ), 'Number of gratings': self.config['Spectrometer'].getint( 'GratingNumber' ), 'Number of slits': self.config['Spectrometer'].getint( 'SlitNumber' ) } super().__init__(name, metadata=additional_metadata | dict(metadata or {}), label=label) gratings = ChannelList(self, 'gratings', GratingChannel) slits = ChannelList(self, 'slits', SlitChannel) mirrors = ChannelList(self, 'mirrors', DCChannel) for name, section in self.config.items(): if name == 'Port': # This relies on Port being the first section because otherwise # communication with the device will fail. port = PortChannel(self, 'port', self.cli, self.handle, section.getint('ComPort')) port.open.set(True) port.set_baud_rate(section.getint('Baudrate')) port.set_timeout(section.getint('Timeout')) port.config = dict(section) elif name.startswith('Grating'): # Grating1, Grating2, etc grating = GratingChannel( self, # Cannot use section['Name'] b/c of invalid identifier # chars. f"grating_{section['Value']}", self.cli, self.handle, motor=int(name[-1]), # TODO: this assumes MotorStepUnit to be != 1 min_value=section.getint('MinNm')*1000, # pm max_value=section.getint('MaxNm')*1000, offset=section.getint('Offset'), # motor steps label=section['Name'], metadata={'Coefficient of linearity': section.getfloat( 'CoefficientOfLinearity' )} ) grating.set_id(section.getint('AddrAxe')) grating.set_setup( # For whatever reason these parameters are in the # Spectrometer section... min_speed=self.config['Spectrometer'].getint('SpeedMin'), max_speed=self.config['Spectrometer'].getint('SpeedMax'), ramp=self.config['Spectrometer'].getint('Acceleration'), backlash=self.config['Spectrometer'].getint('Backlash'), step=self.config['Spectrometer'].getint('MotorStepUnit'), reverse=self.config['Spectrometer'].getboolean('Reverse') ) # Hardcoded since it is not present in my ini file. # Taken from 'SDK FHR Express -additional informations-.pdf' grating.set_ini_params(phase=1, min_speed=2000, max_speed=100000, ramp=400) grating.set_ini_params(phase=2, min_speed=2000, max_speed=100000, ramp=400) grating.set_ini_params(phase=3, min_speed=2000, max_speed=10000, ramp=400) grating.shift(section.getint('Shift')) grating.config = dict(section) gratings.append(grating) elif name.startswith('Slit'): # Slit1, Slit2, etc slit = SlitChannel( self, section['Name'].lower().replace(' ', '_'), self.cli, self.handle, motor=int(name[-1]), # min_value and max_value are used for the slit width, # not absolute position. min_value=section.getint('Minum'), max_value=section.getint('Maxum'), offset=section.getint('Offset'), label=section['Name'], metadata={'Coefficient of linearity': section.getfloat( 'CoefficientOfLinearity' )} ) slit.set_id(section.getint('AddrAxe')) slit.set_setup(min_speed=section.getint('SpeedMin'), max_speed=section.getint('SpeedMax'), ramp=section.getint('Acceleration'), backlash=section.getint('Backlash'), step=section.getint('MotorStepUnit'), reverse=section.getboolean('Reverse')) slit.config = dict(section) slits.append(slit) elif name.startswith('Mirror'): # Mirror1, Mirror2, etc mirror = DCChannel( self, section['Name'].lower().replace(' ', '_'), self.cli, self.handle, motor=int(name[-1]), val_mapping=dc_val_mappings.get(int(name[-1])), label=section['Name'], metadata={'Delay (ms)': section.getint('Delayms'), 'Duty cycle (%)': section.getint('DutyCycle%')} ) mirror.set_id(section.getint('AddrAxe')) mirror.config = dict(section) mirrors.append(mirror) self.add_submodule('port', port) self.add_submodule('mirrors', mirrors.to_channel_tuple()) self.add_submodule('slits', slits.to_channel_tuple()) self.add_submodule('gratings', gratings.to_channel_tuple()) self.active_grating = Parameter( 'active_grating', get_cmd=lambda: getattr(self, '_active_grating', None), set_cmd=self._set_active_grating, set_parser=self._parse_grating, label='Active grating', instrument=self ) """The currently active grating. It can be set using either the number of lines (eg ``600``) or the :class:`GratingChannel` object itself. If the set value is not the currently active one, the selected grating will be moved to the current one's position. """ self.connect_message() def _set_active_grating(self, grating: GratingChannel): if (active_grating := self.active_grating.get()) is None: raise ValueError('No grating has previously been moved. Please ' 'do so before setting this parameter.') if grating is not active_grating: grating.position(active_grating.position()) def _parse_grating(self, grating: str | int | GratingChannel) -> GratingChannel: if isinstance(grating, (int, str)): grating = cast(GratingChannel, self.gratings.get_channel_by_name(f'grating_{grating}')) return grating
[docs] def get_idn(self) -> Dict[str, str | None]: return {'serial': self.config['Firmware']['SerialNumber'], 'firmware': self.config['Firmware']['VersionNumber'], 'model': f"FHR{self.config['Spectrometer']['Focal']}", 'vendor': 'Horiba'}
[docs] def disconnect(self): if self.port.open.get(): self.port.open.set(False)
[docs] def close(self) -> None: self.disconnect() self.cli.DeleteSpe(self.handle) super().close()