"""
This is a driver for the Stahl power supplies
"""
import logging
import re
from collections import OrderedDict
from functools import partial
from typing import Any, Callable, Dict, Iterable, Optional
import numpy as np
from pyvisa.resources.serial import SerialInstrument
from qcodes.instrument import ChannelList, InstrumentChannel, VisaInstrument
from qcodes.validators import Numbers
logger = logging.getLogger()
def chain(*functions: Callable[..., Any]) -> Callable[..., Any]:
"""
The output of the first callable is piped to the input of the second, etc.
Example:
>>> def f():
>>> return "1.2"
>>> chain(f, float)() # return 1.2 as float
"""
def make_iter(args: Any) -> Iterable[Any]:
if not isinstance(args, Iterable) or isinstance(args, str):
return args,
return args
def inner(*args: Any) -> Any:
result = args
for fun in functions:
new_args = make_iter(result)
result = fun(*new_args)
return result
return inner
[docs]class StahlChannel(InstrumentChannel):
"""
A Stahl source channel
Args:
parent
name
channel_number
"""
acknowledge_reply = chr(6)
def __init__(self, parent: VisaInstrument, name: str, channel_number: int):
super().__init__(parent, name)
self._channel_string = f"{channel_number:02d}"
self._channel_number = channel_number
self.add_parameter(
"voltage",
get_cmd=f"{self.parent.identifier} U{self._channel_string}",
get_parser=chain(
re.compile(r"^([+\-]\d+,\d+) V$").findall,
partial(re.sub, ",", "."),
float
),
set_cmd=self._set_voltage,
unit="V",
vals=Numbers(
-self.parent.voltage_range,
self.parent.voltage_range
)
)
self.add_parameter(
"current",
get_cmd=f"{self.parent.identifier} I{self._channel_string}",
get_parser=chain(
re.compile(r"^([+\-]\d+,\d+) mA$").findall,
partial(re.sub, ",", "."),
lambda ma: float(ma) / 1000 # Convert mA to A
),
unit="A",
)
self.add_parameter(
"is_locked",
get_cmd=self._get_lock_status
)
def _set_voltage(self, voltage: float) -> None:
"""
Args:
voltage
"""
# Normalize the voltage in the range 0 to 1, where 0 is maximum negative
# voltage and 1 is maximum positive voltage
voltage_normalized = np.interp(
voltage,
self.parent.voltage_range * np.array([-1, 1]),
[0, 1]
)
send_string = f"{self.parent.identifier} CH{self._channel_string} " \
f"{voltage_normalized:.5f}"
response = self.ask(send_string)
if response != self.acknowledge_reply:
self.log.warning(
f"Command {send_string} did not produce an acknowledge reply")
def _get_lock_status(self) -> bool:
"""
A lock occurs when an output is overloaded
Return:
lock_status: True when locked
"""
send_string = f"{self.parent.identifier} LOCK"
response = self.parent.visa_handle.query_binary_values(
send_string,
datatype='B',
header_fmt="empty"
)
channel_index = self._channel_number - 1
channel_group = channel_index // 4
lock_code_group = response[channel_group]
return format(lock_code_group, "b")[channel_index % 4 + 1] == "1"
[docs]class Stahl(VisaInstrument):
"""
Stahl driver.
Args:
name
address: A serial port address
"""
def __init__(self, name: str, address: str, **kwargs: Any):
super().__init__(name, address, terminator="\r", **kwargs)
assert isinstance(self.visa_handle, SerialInstrument)
self.visa_handle.baud_rate = 115200
instrument_info = self.parse_idn_string(
self.ask("IDN")
)
for key, value in instrument_info.items():
setattr(self, key, value)
channels = ChannelList(
self, "channel", StahlChannel, snapshotable=False
)
for channel_number in range(1, self.n_channels + 1):
name = f"channel{channel_number}"
channel = StahlChannel(
self,
name,
channel_number
)
self.add_submodule(name, channel)
channels.append(channel)
self.add_submodule("channel", channels)
self.add_parameter(
"temperature",
get_cmd=f"{self.identifier} TEMP",
get_parser=chain(
re.compile("^TEMP (.*)°C$").findall,
float
),
unit="C"
)
self.connect_message()
[docs] def ask_raw(self, cmd: str) -> str:
"""
Sometimes the instrument returns non-ascii characters in response
strings manually adjust the encoding to latin-1
"""
self.visa_log.debug(f"Querying: {cmd}")
self.visa_handle.write(cmd)
response = self.visa_handle.read(encoding="latin-1")
self.visa_log.debug(f"Response: {response}")
return response
[docs] @staticmethod
def parse_idn_string(idn_string: str) -> Dict[str, Any]:
"""
Return:
dict: The dict contains the following keys "model",
"serial_number", "voltage_range","n_channels", "output_type"
"""
result = re.search(
r"(HV|BS)(\d{3}) (\d{3}) (\d{2}) ([buqsm])",
idn_string
)
if result is None:
raise RuntimeError(
"Unexpected instrument response. Perhaps the model of the "
"instrument does not match the drivers expectation or a "
"firmware upgrade has taken place. Please get in touch "
"with a QCoDeS core developer"
)
converters: Dict[str, Callable[..., Any]] = OrderedDict({
"model": str,
"serial_number": str,
"voltage_range": float,
"n_channels": int,
"output_type": {
"b": "bipolar",
"u": "unipolar",
"q": "quadrupole",
"s": "steerer",
"m": "bipolar milivolt"
}.get
})
return {
name: converter(value)
for (name, converter), value in zip(converters.items(), result.groups())
}
[docs] def get_idn(self) -> Dict[str, Optional[str]]:
"""
The Stahl sends a uncommon IDN string which does not include a
firmware version.
"""
return {
"vendor": "Stahl",
"model": self.model,
"serial": self.serial_number,
"firmware": None
}
@property
def identifier(self) -> str:
return f"{self.model}{self.serial_number}"