Source code for qcodes_contrib_drivers.drivers.Newport.AG_UC8

"""
QCoDeS driver for Newport AGILIS AG-UC8 Piezo Stepper Controller.
"""

import logging
import time
from typing import Optional, Dict, Callable

from pyvisa.resources.serial import SerialInstrument

from qcodes import VisaInstrument, InstrumentChannel, ChannelList
from qcodes.utils.validators import Ints

log = logging.getLogger(__name__)


# Meaning of error codes returned by the device.
ERROR_CODES = {
    0: "No error",
    -1: "Unknown command",
    -2: "Axis out of range",
    -3: "Wrong format for parameter",
    -4: "Parameter out of range",
    -5: "Not allowed in local mode",
    -6: "Not allowed in current state",
}


def _make_get_parser(cmd: str) -> Callable[[str], int]:
    """Return a parser function which expects a line
    starting with <cmd> followed by an integer, and returns
    only the integer.
    """

    def get_parser(resp: str) -> int:
        if resp.startswith(cmd):
            try:
                return int(resp[len(cmd):].strip())
            except ValueError:
                # Parsing return value failed.
                # Ignore the error here, we will report it below.
                pass
        log.warning("Unexpected response from %s: %r" % (cmd, resp))
        raise Newport_AG_UC8_Exception("Unexpected response from %s: %r"
                                       % (cmd, resp))

    return get_parser


[docs] class Newport_AG_UC8_Exception(Exception): pass
[docs] class Newport_AG_UC8_ErrorCode(Newport_AG_UC8_Exception): def __init__(self, cmd: str, err: int) -> None: self.failed_command = cmd self.error_code = err error_string = ERROR_CODES.get(err, "unknown error") super().__init__("Command %s failed with error code %d (%s)" % (cmd, err, error_string))
[docs] class Newport_AG_UC8_Axis(InstrumentChannel): """Represents one of the 2 axes of one channel of an AG-UC8 controller.""" SPEED_TABLE = { 1: "5 steps/second at defined step amplitude", 2: "100 steps/second at maximum step amplitude", 3: "1700 steps/second at maximum step amplitude", 4: "666 steps/second at defined step amplitude", } def __init__(self, parent: 'Newport_AG_UC8_Channel', axis: int) -> None: assert axis in (1, 2) super().__init__(parent, "axis_%d" % axis) self.axis = axis self.add_parameter("step_delay", label="Step delay in units of 10 us", get_cmd="%dDL?" % axis, set_cmd="%dDL{}" % axis, get_parser=_make_get_parser("%dDL" % axis), vals=Ints(0, 200000)) self.add_parameter("step_amplitude_pos", label="Step amplitude in positive direction", get_cmd="%dSU+?" % axis, set_cmd="%dSU+{}" % axis, get_parser=_make_get_parser("%dSU+" % axis), vals=Ints(1, 50)) self.add_parameter("step_amplitude_neg", label="Step amplitude in negative direction", get_cmd="%dSU-?" % axis, set_cmd="%dSU-{}" % axis, get_parser=_make_get_parser("%dSU-" % axis), vals=Ints(1, 50)) self.add_parameter("steps", label="Accumulated number of steps since last " + "reset of the step counter", get_cmd="%dTP" % axis, get_parser=_make_get_parser("%dTP" % axis)) self.add_parameter("status", label="Status of the axis", get_cmd="%dTS" % axis, get_parser=_make_get_parser("%dTS" % axis), val_mapping={"ready": 0, "stepping": 1, "jogging": 2, "moving_to_limit": 3})
[docs] def jog(self, speed: int) -> None: """Start moving in positive (speed > 0) or negative (speed < 0) direction. Args: speed (int): speed and direction of movement. Negative values (-1 .. -4) start moving in negative direction. Positive values (1 .. 4) start moving in positive direction. Magnitude determines speed according to Newport_AG_UC8_Axis.SPEED_TABLE. """ assert speed >= -4 and speed <= 4 self.write("%dJA%d" % (self.axis, speed))
[docs] def move_limit(self, speed: int) -> None: """Start moving to positive (speed > 0) or negative (speed < 0) limit. Args: speed (int): speed and direction of movement. Negative values (-1 .. -4) start moving in negative direction. Positive values (1 .. 4) start moving in positive direction. Magnitude determines speed according to Newport_AG_UC8_Axis.SPEED_TABLE. """ assert speed >= -4 and speed <= 4 self.write("%dMV%d" % (self.axis, speed))
[docs] def measure_position(self) -> int: """Measure current position. This is a slow command which may take up to 2 minutes to finish. Returns: int: Current position in range 0 .. 1000 representing steps of 1/1000 of total travel. """ return self._slow_command("%dMA" % self.axis, "")
[docs] def move_abs(self, position: int) -> int: """Move to absolute position. This is a slow command which may take up to 2 minutes to finish. Args: position (int): Target position in range 0 .. 1000 representing steps of 1/1000 of total travel. """ assert position >= 0 and position <= 1000 return self._slow_command("%dPA" % self.axis, "%d" % position)
[docs] def move_rel(self, steps: int) -> None: """Start a relative move to current position. Args: steps (int): Number of steps to move relative to current position. """ assert steps >= -(2**31) and steps < 2**31 self.write("%dPR%d" % (self.axis, steps))
[docs] def stop(self): """Stop current movement.""" self.write("%dST" % self.axis)
[docs] def zero_position(self): """Reset the step counter to zero.""" self.write("%dZP" % self.axis)
def _slow_command(self, cmd: str, arg: str) -> int: """Execute a slow command with longer timeout and parse return value.""" # Temporarily set long timeout to support slow command. tmo = self.root_instrument.slow_command_timeout self.root_instrument.timeout(tmo) try: # Execute command. resp = self.ask(cmd + arg) finally: # Restore normal timeout. tmo = self.root_instrument.default_timeout self.root_instrument.timeout(tmo) # Parse response. if resp.startswith(cmd): try: return int(resp[len(cmd):].strip()) except ValueError: # Parsing return value failed. # Ignore the error here, we will report it below. pass log.warning("Unexpected response from %s: %r" % (cmd, resp)) raise Newport_AG_UC8_Exception("Unexpected response from %s: %r" % (cmd, resp))
[docs] class Newport_AG_UC8_Channel(InstrumentChannel): """Represents one of the 4 channels of an AG-UC8 controller. Each channel drives 2 axes of an optical mount. """ def __init__(self, parent: 'Newport_AG_UC8', channel_number: int) -> None: assert channel_number in (1, 2, 3, 4) super().__init__(parent, "channel_%d" % channel_number) self._channel_number = channel_number self.add_submodule("axis1", Newport_AG_UC8_Axis(self, 1)) self.add_submodule("axis2", Newport_AG_UC8_Axis(self, 2)) self.add_parameter("limit_status", label="Limit switch status", get_cmd="PH", get_parser=_make_get_parser('PH'), val_mapping={"not_active": 0, "active_on_axis_1": 1, "active_on_axis_2": 2, "active_on_both_axes": 3})
[docs] def write(self, cmd: str) -> None: return self.parent.write_channel(self._channel_number, cmd)
[docs] def ask(self, cmd: str) -> str: return self.parent.ask_channel(self._channel_number, cmd)
[docs] class Newport_AG_UC8(VisaInstrument): """ QCoDeS driver for the Newport AGILIS AG-UC8 Piezo Stepper Controller. Args: name (str): name of the instrument. address (str): VISA string describing the serial port, for example "ASRL3" for COM3. """ # By default, expect response to command within 1 second. default_timeout = 1.0 # Some commands (position measurement and absolute move) can take # up to 2 minutes to complete. slow_command_timeout = 120.0 # After a command which does not generate a response, a short # delay is needed before we can send the following command. command_delay = 0.002 # After a reset command, a longer delay is needed before # we can send the following command. reset_delay = 0.05 def __init__(self, name: str, address: str) -> None: log.debug("Opening Newport_AG_UC8 at %s" % address) super().__init__(name, address, timeout=self.default_timeout, terminator="\r\n") assert isinstance(self.visa_handle, SerialInstrument) self.visa_handle.baud_rate = 912600 self._current_channel: Optional[int] = None channels = [Newport_AG_UC8_Channel(self, channel_number) for channel_number in range(1, 4+1)] channel_list = ChannelList(self, "channels", Newport_AG_UC8_Channel, channels) self.add_submodule("channels", channel_list) self.add_function("reset", call_cmd=self.reset, args=()) # Set controller in remote mode (otherwise many commands don't work). self.write("MR")
[docs] def get_last_error(self) -> int: """Send a TE command (get error of previous command) and return a numerical error code. Returns: int: Error code for previous command. Value 0 means no error (success). See global table ERROR_CODES for the meaning of the codes. This function is called automatically after each command sent to the device. When a command results in error, exception Newport_AG_UC8_ErrorCode is raised. """ resp = self.ask('TE') if resp.startswith("TE"): try: return int(resp.strip()[2:]) except ValueError: # Parsing error code failed. # Ignore the error here, we will report it below. pass log.warning("Unexpected response to TE command: %r" % resp) raise Newport_AG_UC8_Exception("Unexpected response to TE command: %r" % resp)
[docs] def reset(self) -> None: """Reset the motor controller.""" self._current_channel = None # Send reset command. super().write("RS") # Sleep until reset completed. time.sleep(self.reset_delay) # Switch controller to remote mode (many commands require remote mode). self.write("MR")
[docs] def get_idn(self) -> Dict[str, Optional[str]]: resp = self.ask("VE") words = resp.strip().split() if len(words) == 2: model = words[0] version = words[1] else: log.warning("Unexpected response to VE command: %r" % resp) raise Newport_AG_UC8_Exception( "Unexpected response to VE command: %r" % resp) return {"vendor": "Newport", "model": model, "firmware": version}
[docs] def write(self, cmd: str) -> None: # Send command. super().write(cmd) # Sleep until command completed. time.sleep(self.command_delay) # Check if command produced an error. err = self.get_last_error() if err != 0: log.warning("Command %s failed with error %d" % (cmd, err)) raise Newport_AG_UC8_ErrorCode(cmd, err)
def _select_channel(self, channel_number: int) -> None: """Make sure the specified channel is selected.""" if self._current_channel != channel_number: # Switch to channel. super().write("CC%d" % channel_number) # Sleep until change channel command completed. time.sleep(self.command_delay) self._current_channel = channel_number
[docs] def write_channel(self, channel_number: int, cmd: str) -> None: """Select specified channel, then apply specified command.""" self._select_channel(channel_number) self.write(cmd)
[docs] def ask_channel(self, channel_number: int, cmd: str) -> str: """Select specified channel, then apply specified query and return response.""" self._select_channel(channel_number) return self.ask(cmd)