Source code for qcodes_contrib_drivers.drivers.Spectrum.M4i

# **************************************************************************
#
# Driver file for M4i.44x-x8
#
# **************************************************************************
#
# QuTech
#
# Written by: Luka Bavdaz, Marco Tagliaferri, Pieter Eendebak
# Also see: http://spectrum-instrumentation.com/en/m4i-platform-overview
#

# %%
import os
import sys
import logging
import numpy as np
import ctypes as ct
from functools import partial
from typing import Union, Type

from qcodes.utils.validators import Enum, Numbers, Anything, Ints
from qcodes.instrument.base import Instrument

log = logging.getLogger(__name__)

try:
    # add the location of the pyspcm header file manually
    header_dir = os.path.split(__file__)[0]

    if not header_dir in sys.path:
        log.info('M4i: adding header_dir %s to sys.path' % header_dir)
        sys.path.append(header_dir)
    import pyspcm
    import py_header.spcerr
except (ImportError, OSError):
    info_str = 'to use the M4i driver install the pyspcm module and the M4i libs'
    log.exception(info_str)
    raise ImportError(info_str)

# %% Helper functions


[docs] def szTypeToName(lCardType): """ Convert card type to string This function is taken from an example provided by Spectrum GmbH """ sName = '' lVersion = (lCardType & pyspcm.TYP_VERSIONMASK) if (lCardType & pyspcm.TYP_SERIESMASK) == pyspcm.TYP_M2ISERIES: sName = 'M2i.%04x' % lVersion elif (lCardType & pyspcm.TYP_SERIESMASK) == pyspcm.TYP_M2IEXPSERIES: sName = 'M2i.%04x-Exp' % lVersion elif (lCardType & pyspcm.TYP_SERIESMASK) == pyspcm.TYP_M3ISERIES: sName = 'M3i.%04x' % lVersion elif (lCardType & pyspcm.TYP_SERIESMASK) == pyspcm.TYP_M3IEXPSERIES: sName = 'M3i.%04x-Exp' % lVersion elif (lCardType & pyspcm.TYP_SERIESMASK) == pyspcm.TYP_M4IEXPSERIES: sName = 'M4i.%04x-x8' % lVersion else: sName = 'unknown type' return sName
_errormsg_dict = { getattr(py_header.spcerr, name): name for name in dir(py_header.spcerr) if name.startswith('ERR_') } # %% Main driver class
[docs] class M4i(Instrument): _NO_HF_MODE = -1
[docs] def __init__(self, name, cardid='spcm0', **kwargs): """ Driver for the Spectrum M4i.44xx-x8 cards. For more information see: http://spectrum-instrumentation.com/en/m4i-platform-overview Example: Example usage for acquisition with channel 2 using an external trigger that triggers multiple times with trigger mode HIGH:: m4 = M4i(name='M4i', server_name=None) m4.enable_channels(pyspcm.CHANNEL2) m4.set_channel_settings(2,mV_range, input_path, termination, coupling, compensation) m4.set_ext0_OR_trigger_settings(pyspcm.SPC_TM_HIGH,termination,coupling,level0) calc = m4.multiple_trigger_acquisition(mV_range,memsize,seg_size,posttrigger_size) Note: Error generated by the card can be retrieved with the method :func:`get_error_info32bit`. The card can be reset with :func:`reset`. Sometimes when an error occurs (including validation errors) the python console needs to be restarted """ super().__init__(name, **kwargs) self.hCard = pyspcm.spcm_hOpen(cardid) if self.hCard is None: logging.warning("M4i: no card found\n") self._last_set_result = 0 # add parameters for getting self.add_parameter('card_id', label='card id', get_cmd=None, set_cmd=None, initial_value=cardid, vals=Anything(), docstring='The card ID') self.add_parameter('max_sample_rate', label='max sample rate', unit='Hz', get_cmd=self.get_max_sample_rate, docstring='The maximumum sample rate') self.add_parameter('memory', label='memory', unit='bytes', get_cmd=self.get_card_memory, docstring='Amount of memory on card') self.add_parameter('resolution', label='resolution', unit='bits', get_cmd=partial(self._param32bit, pyspcm.SPC_MIINST_BITSPERSAMPLE), docstring='Resolution of the card') self.add_parameter('pcidate', label='pcidate', get_cmd=partial(self._param32bit, pyspcm.SPC_PCIDATE), docstring='The PCI date') self.add_parameter('serial_number', label='serial number', get_cmd=partial(self._param32bit, pyspcm.SPC_PCISERIALNO), docstring='The serial number of the board') self.add_parameter('hardware_version', label='hardware version', get_cmd=self.get_hardware_version, docstring='The hardware version of the board') self.add_parameter('firmware_version', label='firmware version', get_cmd=self.get_firmware_version, docstring='The firmware version of the board') self.add_parameter('features', label='features', get_cmd=partial(self._param32bit, pyspcm.SPC_PCIEXTFEATURES), docstring='Installed options and features') self.add_parameter('channel_count', label='channel count', get_cmd=partial(self._param32bit, pyspcm.SPC_CHCOUNT), docstring='Return number of enabled channels') self.add_parameter('input_path_count', label='input path count', get_cmd=partial(self._param32bit, pyspcm.SPC_READAIPATHCOUNT), docstring='Return number of analog input paths') self.add_parameter('input_ranges_count', label='input ranges count', get_cmd=partial(self._param32bit, pyspcm.SPC_READIRCOUNT), docstring='Return number of input ranges for the current input path') self.add_parameter('input_path_features', label='input path features', get_cmd=partial(self._param32bit, pyspcm.SPC_READAIFEATURES), docstring='Return a bitmap of features for current input path') self.add_parameter('available_card_modes', label='available card modes', get_cmd=partial(self._param32bit, pyspcm.SPC_AVAILCARDMODES), docstring='Return a bitmap of available card modes') self.add_parameter('card_status', label='card status', get_cmd=partial(self._param32bit, pyspcm.SPC_M2STATUS), docstring='Return a bitmap for the status information') self.add_parameter('read_range_min_0', label='read range min 0', unit='mV', get_cmd=partial(self._param32bit, pyspcm.SPC_READRANGEMIN0), docstring='Return the lower border of input range 0') # buffer handling self.add_parameter('user_available_length', label='user available length', get_cmd=partial(self._param32bit, pyspcm.SPC_DATA_AVAIL_USER_LEN), docstring='returns the number of currently to the user available bytes inside a sample data transfer') self.add_parameter('user_available_position', label='user available position', get_cmd=partial(self._param32bit, pyspcm.SPC_DATA_AVAIL_USER_POS), docstring='returns the position as byte index where the currently available data samles start') self.add_parameter('buffer_fill_size', label='buffer fill size', get_cmd=partial(self._param32bit, pyspcm.SPC_FILLSIZEPROMILLE), docstring='returns the current fill size of the on-board memory (FIFO buffer) in promille (1/1000)') # triggering self.add_parameter('available_trigger_or_mask', label='available trigger or mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_AVAILORMASK), docstring='bitmask, in which all bits of sources for the OR mask are set, if available') self.add_parameter('available_channel_or_mask', label='available channel or mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_CH_AVAILORMASK0), docstring='bitmask, in which all bits of sources/channels (0-31) for the OR mask are set, if available') self.add_parameter('available_trigger_and_mask', label='available trigger and mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_AVAILANDMASK), docstring='bitmask, in which all bits of sources for the AND mask are set, if available') self.add_parameter('available_channel_and_mask', label='available channel and mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_CH_AVAILANDMASK0), docstring='bitmask, in which all bits of sources/channels (0-31) for the AND mask are set, if available') self.add_parameter('available_trigger_delay', label='available trigger delay', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_AVAILDELAY), docstring='contains the maximum available delay as decimal integer value') self.add_parameter('available_external_trigger_modes', label='available external trigger modes', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT0_AVAILMODES), docstring='bitmask showing all available trigger modes for external 0 (main analog trigger input)') self.add_parameter('external_trigger_min_level', label='external trigger min level', unit='mV', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT_AVAIL0_MIN), docstring='returns the minimum trigger level') self.add_parameter('external_trigger_max_level', label='external trigger max level', unit='mV', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT_AVAIL0_MAX), docstring='returns the maximum trigger level') self.add_parameter('external_trigger_level_step_size', label='external trigger level step size', unit='mV', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT_AVAIL0_STEP), docstring='returns the step size of the trigger level') self.add_parameter('available_channel_trigger_modes', label='available channel trigger modes', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_CH_AVAILMODES), docstring='bitmask, in which all bits of the modes for the channel trigger are set') self.add_parameter('trigger_counter', label='trigger counter', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIGGERCOUNTER), docstring='returns the number of triger events since acquisition start') # data per sample self.add_parameter('bytes_per_sample', label='bytes per sample', get_cmd=partial(self._param32bit, pyspcm.SPC_MIINST_BYTESPERSAMPLE), docstring='returns the number of bytes per sample') self.add_parameter('bits_per_sample', label='bits per sample', get_cmd=partial(self._param32bit, pyspcm.SPC_MIINST_BITSPERSAMPLE), docstring='returns the number of bits per sample') # available clock modes self.add_parameter('available_clock_modes', label='available clock modes', get_cmd=partial(self._param32bit, pyspcm.SPC_AVAILCLOCKMODES), docstring='returns a bitmask in which the bits of the clock modes are set, if available') # converting ADC samples to voltage values self.add_parameter('ADC_to_voltage', label='ADC to voltage', get_cmd=partial(self._param32bit, pyspcm.SPC_MIINST_MAXADCVALUE), docstring='contains the decimal code (in LSB) of the ADC full scale value') self.add_parameter('box_averages', label='number samples in box averaging', get_cmd=partial(self._param32bit, pyspcm.SPC_BOX_AVERAGES), set_cmd=partial(self._set_param32bit, pyspcm.SPC_BOX_AVERAGES), vals=Enum(2, 4, 8, 16, 32, 64, 128, 256), docstring='Defines the number of successive samples per channel that are summed together') self.add_parameter('oversampling_factor', label='oversampling factor', get_cmd=partial(self._param32bit, pyspcm.SPC_OVERSAMPLINGFACTOR), docstring='Reads the oversampling factor') # add parameters for setting and getting (read/write direction # registers) self.add_parameter('enable_channels', label='Channels enabled', get_cmd=partial(self._param32bit, pyspcm.SPC_CHENABLE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_CHENABLE), vals=Enum(1, 2, 4, 8, 3, 5, 9, 6, 10, 12, 15), docstring='Set and get enabled channels') # analog input path functions # TODO: change Enum validator to set_parser for the numbered functions # if we want string inputs self.add_parameter('read_input_path', label='read input path', get_cmd=partial(self._param32bit, pyspcm.SPC_READAIPATH), set_cmd=partial(self._set_param32bit, pyspcm.SPC_READAIPATH), vals=Enum(0, 1, 2, 3), docstring='Select the input path which is used to read out the features') for i in [0, 1, 2, 3]: self.add_parameter(f'input_path_{i}', label=f'input path {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_PATH{i}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_PATH{i}')), vals=Enum(0, 1), docstring=f'Set and get analog input path for channel {i}') # channel range functions # TODO: check the input path to set the right validator (either by # directly calling input_path_x() or by storing a variable) self.add_parameter(f'range_channel_{i}', label=f'range channel {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_AMP{i}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_AMP{i}')), vals=Enum(200, 500, 1000, 2000, 2500, 5000, 10000), unit='mV', docstring=f'Set and get input range of channel {i} (in mV)') # input termination functions self.add_parameter(f'termination_{i}', label=f'termination {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_50OHM{i}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_50OHM{i}')), vals=Enum(0, 1), docstring=f'if 1 sets termination to 50 Ohm, otherwise 1 MOhm for channel {i}') # input coupling ACDC_coupling_docstring = f'if 1 sets the AC coupling, otherwise sets the DC coupling for channel {i}' ACDC_coupling_docstring += '\nThe AC coupling only works if the card is in HF mode.' self.add_parameter(f'ACDC_coupling_{i}', label=f'ACDC coupling {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_ACDC{i}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_ACDC{i}')), vals=Enum(0, 1), docstring=ACDC_coupling_docstring) # AC/DC offset compensation self.add_parameter(f'ACDC_offs_compensation_{i}', label=f'ACDC offs compensation {i}', get_cmd=partial(self._get_compensation, i), set_cmd=partial(self._set_compensation, i), vals=Enum(0, 1, M4i._NO_HF_MODE), docstring=(f'if 1 enables compensation, if 0 disables compensation for channel {i}. ' 'Value {M4i._NO_HF_MODE} means the card is not in HF mode')) # anti aliasing filter (Bandwidth limit) self.add_parameter(f'anti_aliasing_filter_{i}', label=f'anti aliasing filter {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_FILTER{i}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_FILTER{i}')), vals=Enum(0, 1), docstring=f'if 1 selects bandwidth limit, if 0 sets to full bandwidth for channel {i}') self.add_parameter(f'channel_{i}', label=f'channel {i}', unit='a.u.', get_cmd=partial(self._read_channel, i)) # acquisition modes # TODO: If required, the other acquisition modes can be added to the # validator self.add_parameter('card_mode', label='card mode', get_cmd=partial(self._param32bit, pyspcm.SPC_CARDMODE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_CARDMODE), vals=Enum(pyspcm.SPC_REC_STD_SINGLE, pyspcm.SPC_REC_STD_MULTI, pyspcm.SPC_REC_STD_GATE, pyspcm.SPC_REC_STD_ABA, pyspcm.SPC_REC_FIFO_SINGLE, pyspcm.SPC_REC_FIFO_MULTI, pyspcm.SPC_REC_FIFO_GATE, pyspcm.SPC_REC_FIFO_ABA, pyspcm.SPC_REC_STD_AVERAGE, pyspcm.SPC_REC_STD_BOXCAR), docstring='defines the used operating mode') # wait command self.add_parameter('timeout', label='timeout', get_cmd=partial(self._param32bit, pyspcm.SPC_TIMEOUT), unit='ms', set_cmd=partial(self._set_param32bit, pyspcm.SPC_TIMEOUT), docstring='defines the timeout for wait commands (in ms)') # Single acquisition mode memory, pre- and posttrigger (pretrigger = memory size - posttrigger) # TODO: improve the validators to make them take into account the # current state of the instrument self.add_parameter('data_memory_size', label='data memory size', get_cmd=partial(self._param32bit, pyspcm.SPC_MEMSIZE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_MEMSIZE), vals=Numbers(min_value=16), docstring='sets the memory size in samples per channel') self.add_parameter('posttrigger_memory_size', label='posttrigger memory size', get_cmd=partial(self._param32bit, pyspcm.SPC_POSTTRIGGER), set_cmd=partial(self._set_param32bit, pyspcm.SPC_POSTTRIGGER), docstring='sets the number of samples to be recorded after trigger event') # FIFO single acquisition length and pretrigger self.add_parameter('pretrigger_memory_size', label='pretrigger memory size', get_cmd=partial(self._param32bit, pyspcm.SPC_PRETRIGGER), set_cmd=partial(self._set_param32bit, pyspcm.SPC_PRETRIGGER), docstring='sets the number of samples to be recorded before trigger event') self.add_parameter('segment_size', label='segment size', get_cmd=partial(self._param32bit, pyspcm.SPC_SEGMENTSIZE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_SEGMENTSIZE), docstring='length of segments to acquire') self.add_parameter('total_segments', label='total segments', get_cmd=partial(self._param32bit, pyspcm.SPC_LOOPS), set_cmd=partial(self._set_param32bit, pyspcm.SPC_LOOPS), docstring='number of segments to acquire in total. Setting 0 makes it run until stopped by user') # clock generation self.add_parameter('clock_mode', label='clock mode', get_cmd=partial(self._param32bit, pyspcm.SPC_CLOCKMODE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_CLOCKMODE), vals=Enum(pyspcm.SPC_CM_INTPLL, pyspcm.SPC_CM_QUARTZ2, pyspcm.SPC_CM_EXTREFCLOCK, pyspcm.SPC_CM_PXIREFCLOCK), docstring='defines the used clock mode or reads out the actual selected one') self.add_parameter('reference_clock', label='frequency of external reference clock', unit='Hz', get_cmd=partial(self._param32bit, pyspcm.SPC_REFERENCECLOCK), set_cmd=partial(self._set_param32bit, pyspcm.SPC_REFERENCECLOCK), vals=Ints(), docstring='defines the frequency of the external reference clock') self.add_parameter('sample_rate', label='sample rate', get_cmd=partial(self._param32bit, pyspcm.SPC_SAMPLERATE), unit='Hz', set_cmd=partial(self._set_param32bit, pyspcm.SPC_SAMPLERATE), docstring='write the sample rate for internal sample generation or read rate nearest to desired. This sample rate is rounded to an integer number.') self.add_parameter('exact_sample_rate', label='sample rate', get_cmd=self._exact_sample_rate, unit='Hz', docstring='return the exact sampling rate in Hz. This is an integer divisor of the maximum sample rate') self.add_parameter('special_clock', label='special clock', get_cmd=partial(self._param32bit, pyspcm.SPC_SPECIALCLOCK), unit='Hz', set_cmd=partial(self._set_param32bit, pyspcm.SPC_SPECIALCLOCK), docstring='Activate/Deactivate the special clock mode (lower and more sampling clock rates)') # triggering self.add_parameter('trigger_or_mask', label='trigger or mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_ORMASK), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_ORMASK), vals=Enum(pyspcm.SPC_TMASK_NONE, pyspcm.SPC_TMASK_SOFTWARE, pyspcm.SPC_TMASK_EXT0, pyspcm.SPC_TMASK_EXT1), docstring='defines the events included within the trigger OR mask card') self.add_parameter('channel_or_mask', label='channel or mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_CH_ORMASK0), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_CH_ORMASK0), docstring='includes the channels (0-31) within the channel trigger OR mask of the card') self.add_parameter('trigger_and_mask', label='trigger and mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_ANDMASK), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_ANDMASK), vals=Enum(pyspcm.SPC_TMASK_NONE, pyspcm.SPC_TMASK_EXT0, pyspcm.SPC_TMASK_EXT1), docstring='defines the events included within the trigger AND mask card') self.add_parameter('channel_and_mask', label='channel and mask', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_CH_ANDMASK0), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_CH_ANDMASK0), docstring='includes the channels (0-31) within the channel trigger AND mask of the card') self.add_parameter('trigger_delay', label='trigger delay', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_DELAY), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_DELAY), docstring='defines the delay for the detected trigger events') self.add_parameter('external_trigger_mode', label='external trigger mode', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT0_MODE), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_EXT0_MODE), docstring='defines the external trigger mode for the external SMA connector trigger input') self.add_parameter('external_trigger_termination', label='external trigger termination', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_TERM), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_TERM), vals=Enum(0, 1), docstring='A 1 sets the 50 Ohm termination, a 0 sets high impedance termination') self.add_parameter('external_trigger_input_coupling', label='external trigger input coupling', get_cmd=partial(self._param32bit, pyspcm.SPC_TRIG_EXT0_ACDC), set_cmd=partial(self._set_param32bit, pyspcm.SPC_TRIG_EXT0_ACDC), vals=Enum(0, 1), docstring='A 1 sets the AC coupling for the external trigger, a 0 sets DC') for l in [0, 1]: self.add_parameter(f'external_trigger_level_{l}', label=f'external trigger level {l}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_TRIG_EXT0_LEVEL{l}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_TRIG_EXT0_LEVEL{l}')), unit='mV', docstring=f'trigger level {l} for external trigger') for i in [0, 1, 2, 3]: self.add_parameter(f'trigger_mode_channel_{i}', label=f'trigger mode channel {i}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_TRIG_CH{i}_MODE')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_TRIG_CH{i}_MODE')), docstring=f'sets the trigger mode for channel {i}') for l in [0, 1]: self.add_parameter(f'trigger_channel_{i}_level_{l}', label=f'trigger channel {i} level {l}', get_cmd=partial(self._param32bit, getattr(pyspcm, f'SPC_TRIG_CH{i}_LEVEL{l}')), set_cmd=partial(self._set_param32bit, getattr(pyspcm, f'SPC_TRIG_CH{i}_LEVEL{l}')), docstring=f'trigger level {l} channel {i}') # add parameters for setting (write only registers) # Buffer handling self.add_parameter('card_available_length', label='card available length', set_cmd=partial(self._set_param32bit, pyspcm.SPC_DATA_AVAIL_CARD_LEN), docstring='writes the number of bytes that the card can now use for sample data transfer again') # General self.add_parameter('general_command', label='general command', set_cmd=partial(self._set_param32bit, pyspcm.SPC_M2CMD), docstring='executes a command for the card or data transfer') # memsize used for simple channel read-out self._channel_memsize = 2**12
# checks if requirements for the compensation get and set functions are met def _get_compensation(self, i): # if HF enabled if self.parameters[f'input_path_{i}']() == 1: return self._param32bit(getattr(pyspcm, f'SPC_ACDC_OFFS_COMPENSATION{i}')) else: logging.info("M4i: HF path not set, ACDC offset compensation parameter will be ignored by the M4i card\n") return M4i._NO_HF_MODE def _set_compensation(self, i, value): # if HF enabled if self.parameters[f'input_path_{i}']() == 1: self._set_param32bit( getattr(pyspcm, f'SPC_ACDC_OFFS_COMPENSATION{i}'), value) else: logging.warning("M4i: HF path not set, ignoring ACDC offset compensation set\n")
[docs] def get_hardware_version(self): version_info = self._param32bit(pyspcm.SPC_PCIVERSION) return version_info >> 16
[docs] def get_firmware_version(self): version_info = self._param32bit(pyspcm.SPC_PCIVERSION) return version_info & 0xFFFF
[docs] def active_channels(self): """ Return a list with the indices of the active channels """ x = bin(self.enable_channels.cache())[2:][::-1] return [i for i in range(len(x)) if int(x[i])]
[docs] def get_idn(self): return { 'vendor': 'Spectrum_GMBH', 'model': szTypeToName(self.get_card_type()), 'serial': self.serial_number(), 'firmware': self.firmware_version() }
[docs] def reset(self): """ Reset the card The pyspcm.M2CMD_CARD_RESET command is executed. """ self.general_command(pyspcm.M2CMD_CARD_RESET)
[docs] def convert_to_voltage(self, data, input_range): """convert an array of numbers to an array of voltages.""" resolution = self.ADC_to_voltage.cache() return data * (input_range / resolution)
[docs] def initialize_channels(self, channels=None, mV_range=1000, input_path=0, termination=0, coupling=0, compensation=None, memsize=2**12, pretrigger_memsize=16, lp_filter=None): """ Setup channels of the digitizer for simple readout using Parameters The channels can be read out using the Parameters `channel_0`, `channel_1`, ... Args: channels (list): list of channels to setup. mV_range, input_path, termination, coupling, compensation. Passed to the set_channel_settings function memsize (int): memory size to use for simple channel readout pretrigger_memsize (int): Pretrigger memory size to use. The default value used is 16, which is the smallest value possible. """ allchannels = 0 self._channel_memsize = memsize self._channel_pretrigger_memsize = pretrigger_memsize self.data_memory_size(memsize) if channels is None: channels = range(4) for ch in channels: self.set_channel_settings(ch, mV_range, input_path=input_path, termination=termination, coupling=coupling, compensation=compensation, lp_filter=lp_filter) allchannels = allchannels + getattr(pyspcm, 'CHANNEL%d' % ch) self.enable_channels(allchannels)
def _channel_mask(self, channels=range(4)): """ Return mask for specified channels Args: channels (list): list of channel indices Returns: cx (int): channel mask """ cx = 0 for c in channels: cx += getattr(pyspcm, f'CHANNEL{c}') return cx def _read_channel(self, channel, memsize=None): """ Helper function to read out a channel Before a channel is measured all channels are enabled to ensure we can read out channels without the overhead of changing channels. """ if memsize is None: memsize = self._channel_memsize posttrigger_size = memsize - self._channel_pretrigger_memsize mV_range = self.parameter[f'range_channel_{channel}']() cx = self._channel_mask() self.enable_channels(cx) data = self.single_software_trigger_acquisition( mV_range, memsize, posttrigger_size) active = self.active_channels() data = data.reshape((-1, len(active))) value = np.mean(data[:, channel]) return value
[docs] def set_channel_settings(self, channel_index, mV_range, input_path, termination, coupling, compensation=None, lp_filter=None): """ Update settings of the specified channel Args: channel_index (int): channel to update mV_range (float): measurement range for the channel (buffered path: 200, 500, 1000, 2000, 5000, or 10000) (HF path: 500, 1000, 2500, or 5000) input_path (int): input path (0: default/buffered; 1: HF/50 Ohm) termination (None or int): If None, then do not update the termination (0: 1 MOhm; 1: 50 Ohm) coupling (None or int): Set the ACDC_coupling.If None, then do not update the coupling (0: DC; 1 AC) compensation (None or int): If None, then do not update the compensation (0: off, 1: off) lp_filter (Optional[int]): enable (1) or disable (0) the 20 MHz low pass filter """ # initialize self.parameters[f'input_path_{channel_index}'](input_path) if termination is not None: self.parameters[f'termination_{channel_index}'](termination) if coupling is not None: self.parameters[f'ACDC_coupling_{channel_index}'](coupling) if lp_filter is not None: self.parameters[f'anti_aliasing_filter_{channel_index}'](lp_filter) self.parameters[f'range_channel_{channel_index}'](mV_range) # note: set after voltage range # can only be used with DC coupling and 50 Ohm path (hf) if compensation is not None: self.parameters[f'ACDC_offs_compensation_{channel_index}'](compensation)
[docs] def set_ext0_OR_trigger_settings(self, trig_mode, termination, coupling, level0, level1=None): """ Configures ext0 trigger Args: trig_mode: 0: None, 1: Positive edge, 2: Negative edge, 4: Both, 8: High, 16: Low, 32: Enter window, 64: Leave window, 128: Inside window, 256: Outside window, 0x01000001: Positive + re-arm, 0x01000002: Negative + re-arm termination: input termination 0: 1 MOhm, 1: 50 Ohm coupling: DC/AC input coupling (0: DC, 1: AC) level0: trigger level [mV] level1: 2nd level for re-arm and windowed modes. [mV] """ self.channel_or_mask(0) self.external_trigger_mode(trig_mode) self.trigger_or_mask(pyspcm.SPC_TMASK_EXT0) self.external_trigger_termination(termination) self.external_trigger_input_coupling(coupling) self.external_trigger_level_0(level0) if(level1 != None): self.external_trigger_level_1(level1)
# Note: the levels need to be set in bits, not voltages! (between -8191 to # 8191 for 14 bits)
[docs] def set_channel_OR_trigger_settings(self, i, trig_mode, bitlevel0, bitlevel1=None): """When a channel is used for triggering it must be enabled during the acquisition.""" self.trigger_or_mask(0) self.channel_or_mask(getattr(pyspcm, f'SPC_TMASK0_CH{i}')) self.parameters[f'trigger_channel_{i}_level_0'](bitlevel0) if(bitlevel1 != None): self.parameters[f'trigger_channel_{i}_level_1'](bitlevel1) self.parameters[f'trigger_mode_channel_{i}'](trig_mode) # trigger mode
[docs] def setup_multi_recording(self, posttrigger_size, n_triggers=1, pretrigger_size=None, boxcar_average=False): """ Setup multi recording. Triggering must have been configured separately. Data acquisition must started with start_triggered(). The collected data can be acquired with the function get_data(). Args: posttrigger_size (int): size of data trace after triggering n_triggers (int): total number of triggers pretrigger_size (int): size of data trace before triggering boxcar_average (bool): use mode 'boxcar average' Example: digitizer.setup_multi_recording(size, n_triggers) digitizer.start_triggered() data = digitizer.get_data() # do another measurement with same settings digitizer.start_triggered() data = digitizer.get_data() """ if boxcar_average: if self.oversampling_factor() != 1: raise Exception('Averaging with BOXCAR can only be used with ' 'maximum sample rate') self.card_mode(pyspcm.SPC_REC_STD_BOXCAR) else: self.card_mode(pyspcm.SPC_REC_STD_MULTI) if not pretrigger_size: pretrigger_size = self.pretrigger_memory_size() posttrigger_size = self._hw_memsize(posttrigger_size) seg_size = self._hw_memsize(posttrigger_size+pretrigger_size) memsize = self._hw_memsize(n_triggers * seg_size) self.data_memory_size(memsize) self.segment_size(seg_size) self.posttrigger_memory_size(posttrigger_size)
[docs] def start_triggered(self): """ Starts triggered acquisition """ self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER)
[docs] def get_data(self): """ Reads measurement data from the digitizer. The data acquisition must have been started by start_acquisition() or start_triggered(). Returns: 2D array with voltages per channel in V. """ active_channels = self.active_channels() memsize = self.data_memory_size.cache() numch = len(active_channels) res = self.wait_ready() if res == pyspcm.ERR_TIMEOUT: logging.error(f'Timeout waiting for data (timeout: {self.timeout()} ms)') elif res != pyspcm.ERR_OK: raise Exception(f'Error waiting for data: (0x{res:04x})') try: if self.card_mode() == pyspcm.SPC_REC_STD_BOXCAR: box_averages = self.box_averages() raw_data = self._transfer_buffer_numpy(memsize, numch, bytes_per_sample=4) else: box_averages = 1 raw_data = self._transfer_buffer_numpy(memsize, numch) finally: self._stop_acquisition() resolution = self.ADC_to_voltage.cache() voltages = np.zeros((numch, len(raw_data)//numch)) for i,ch in enumerate(active_channels): mV_range = self.parameters[f'range_channel_{ch}']() voltages[i,:] = raw_data[i::numch] * (mV_range / 1000 / resolution / box_averages) return voltages
def _stop_acquisition(self): # close acquisition self.general_command(pyspcm.M2CMD_DATA_STOPDMA) # invalidate buffer self._invalidate_buf(pyspcm.SPCM_BUF_DATA) self.general_command(pyspcm.M2CMD_CARD_STOP) # TODO: if multiple channels are used at the same time, the voltage conversion needs to be updated # TODO: the data also needs to be organized nicely (currently it # interleaves the data)
[docs] def multiple_trigger_acquisition(self, mV_range, memsize, seg_size, posttrigger_size): """ Acquire traces with the SPC_REC_STD_MULTI mode This method does not update the triggering properties. Args: mV_range (float): Input range used for coversion to voltage memsize (int): Size of total buffer to acquire seg_size (int): Size of segments to record posttrigger_size (int): Size of the if post trigger buffer Returns: Array with measured voltages """ self.card_mode(pyspcm.SPC_REC_STD_MULTI) # multi self.data_memory_size(memsize) self.segment_size(seg_size) self.posttrigger_memory_size(posttrigger_size) numch = self._num_channels() self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) self.wait_ready() # convert transfer data to numpy array try: output = self._transfer_buffer_numpy(memsize, numch, bytes_per_sample=2) finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
[docs] def start_acquisition(self, mV_range, memsize, posttrigger_size=None, verbose=0): """ Start data acquisition of a single data trace The resulting data can be acquired with the function retrieve_data. Args: mV_range (float): range in mV memsize (int): size of data trace posttrigger_size (int or None): size of data trace after triggering Returns: trace as a dict. Data concerning the trace """ self.card_mode(pyspcm.SPC_REC_STD_SINGLE) # single self.data_memory_size(memsize) if posttrigger_size is None: posttrigger_size = memsize - 16 self.posttrigger_memory_size(posttrigger_size) numch = self._num_channels() # start/enable trigger/wait ready self.trigger_or_mask(pyspcm.SPC_TMASK_SOFTWARE) # software trigger self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) return {'memsize': memsize, 'numch': numch, 'mV_range': mV_range}
def _transfer_buffer_numpy(self, memsize: int, numch: int, bytes_per_sample=2) -> np.ndarray: """ Transfer buffer to numpy array Args: memsize (int): number of samples to transfer numch (int): number of channels bytes_per_sample (int): specifies the datatype. 2 for int16, 4 for int32 Returns: array: transfered data """ # setup software buffer sample_ctype: Union[Type[ct.c_int16], Type[ct.c_int32]] if bytes_per_sample == 2: sample_ctype = ct.c_int16 elif bytes_per_sample == 4: sample_ctype = ct.c_int32 else: raise ValueError('bytes_per_sample should be 2 or 4') ctype_buffer_type = sample_ctype * memsize * numch data_buffer = (ctype_buffer_type)() data_pointer = ct.cast(data_buffer, ct.c_void_p) # data acquisition self._def_transfer64bit( pyspcm.SPCM_BUF_DATA, pyspcm.SPCM_DIR_CARDTOPC, 0, data_pointer, 0, bytes_per_sample * memsize * numch) self.general_command(pyspcm.M2CMD_DATA_STARTDMA | pyspcm.M2CMD_DATA_WAITDMA) if self._last_set_result != pyspcm.ERR_OK: res = self._last_set_result raise Exception(f'Error transferring data: {_errormsg_dict[res]} (0x{res:04x})') # convert buffer to numpy array # this does not typecheck with numpy 1.22 should be updated # by someone with access to test on the real data. # for some reason this does type check with 3.12 but not earlier output = np.frombuffer(data_buffer, dtype=sample_ctype) # type: ignore[call-overload,unused-ignore] return output
[docs] def retrieve_data(self, trace): """ Retrieve data from the digitizer The data acquisition must have been started by start_acquisition. Args: trace (dict): dictionary with acquisition settings. Returns: voltages (array) """ memsize = trace['memsize'] numch = trace['numch'] mV_range = trace['mV_range'] self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch) finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
[docs] def single_trigger_acquisition(self, mV_range, memsize, posttrigger_size): """ Acquire traces with the SPC_REC_STD_SINGLE mode This method does not update the triggering properties. Args: mV_range (float): Input range used for coversion to voltage memsize (int): Size of total buffer to acquire posttrigger_size (int): Size of the if post trigger buffer Returns: Array with measured voltages """ self.card_mode(pyspcm.SPC_REC_STD_SINGLE) # set memsize and posttrigger self.data_memory_size(memsize) self.posttrigger_memory_size(posttrigger_size) numch = self._num_channels() self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch) finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
[docs] def gated_trigger_acquisition(self, mV_range, memsize, pretrigger_size, posttrigger_size): """doesn't work completely as expected, it triggers even when the trigger level is set outside of the signal range it also seems to additionally acquire some wrong parts of the wave, but this also exists in SBench6, so it is not a problem caused by this code.""" self.card_mode(pyspcm.SPC_REC_STD_GATE) # gated # set memsize and posttrigger self.data_memory_size(memsize) self.pretrigger_memory_size(pretrigger_size) self.posttrigger_memory_size(posttrigger_size) numch = self._num_channels() self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER ) self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch) finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
[docs] def single_software_trigger_acquisition_boxcar(self, mV_range, memsize, posttrigger_size): """ Acquire a single data trace with boxcar averaging Args: mV_range (float): range in mV memsize (int): size of data trace posttrigger_size (int): size of data trace after triggering Returns: voltages (array) """ self.card_mode(pyspcm.SPC_REC_STD_BOXCAR) # single self.segment_size(memsize) self.posttrigger_memory_size(posttrigger_size) self.data_memory_size(memsize * self.box_averages()) numch = self._num_channels() self.trigger_or_mask(pyspcm.SPC_TMASK_SOFTWARE) self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch, bytes_per_sample=4) finally: self._stop_acquisition() voltages = self.convert_to_voltage( output, mV_range / 1000) / self.box_averages() return voltages
[docs] def single_software_trigger_acquisition(self, mV_range, memsize, posttrigger_size): """ Acquire a single data trace Args: mV_range (float): range in mV memsize (int): size of data trace posttrigger_size (int): size of data trace after triggering Returns: voltages (array) """ self.card_mode(pyspcm.SPC_REC_STD_SINGLE) # single self.data_memory_size(memsize) self.posttrigger_memory_size(posttrigger_size) numch = self._num_channels() # start/enable trigger/wait ready self.trigger_or_mask(pyspcm.SPC_TMASK_SOFTWARE) # software trigger self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch) finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
def _check_buffers(self): """ Check validity of buffers See: manual section "Limits of pre trigger, post trigger, memory size" """ pretrigger = self.data_memory_size() - self.posttrigger_memory_size() if pretrigger > 2**13: raise Exception('value of SPC_PRETRIGGER is invalid') def _num_channels(self): """ Return number of channels that is enabled """ return bin(self.enable_channels()).count("1")
[docs] def wait_ready(self) -> int: """ Wait for the M4i card to be ready using M2CMD_CARD_WAITREADY Returns: Return code of the M4i general command used to wait for the card to be ready """ command_result = pyspcm.spcm_dwSetParam_i32(self.hCard, pyspcm.SPC_M2CMD, int(pyspcm.M2CMD_CARD_WAITREADY)) return command_result
[docs] def blockavg_hardware_trigger_acquisition(self, mV_range, nr_averages=10, verbose=0, post_trigger=None): """ Acquire data using block averaging and hardware triggering To read out multiple channels, use `initialize_channels`. This methods updates the external_trigger_mode and trigger_or_mask parameters. Args: mV_range (float) nr_averages (int): number of averages to take verbose (int): output level post_trigger (None or int): optional size of post_trigger buffer Returns: An array of voltages. If multiple channels are read, then the data is interleaved """ # self.available_card_modes() memsize = self.data_memory_size() self.segment_size(memsize) if post_trigger is None: pre_trigger = min(2**13, 16 * int((memsize / 2) // 16)) post_trigger = memsize - pre_trigger else: pre_trigger = memsize - post_trigger self.posttrigger_memory_size(post_trigger) self.pretrigger_memory_size(pre_trigger) self._check_buffers() if verbose: print('blockavg_hardware_trigger_acquisition: errors %s' % (self.get_error_info32bit(), )) print('blockavg_hardware_trigger_acquisition: card_status %s' % (self.card_status(), )) if nr_averages == 1: # special case since SPC_AVERAGES cannot handle 1 if verbose: print( 'blockavg_hardware_trigger_acquisition: pass to single_trigger_acquisition') return self.single_trigger_acquisition(mV_range=mV_range, memsize=memsize, posttrigger_size=post_trigger) self.card_mode(pyspcm.SPC_REC_STD_AVERAGE) self._set_param32bit(pyspcm.SPC_AVERAGES, nr_averages) numch = self._num_channels() self.external_trigger_mode(pyspcm.SPC_TM_POS) self.trigger_or_mask(pyspcm.SPC_TMASK_EXT0) self.general_command(pyspcm.M2CMD_CARD_START | pyspcm.M2CMD_CARD_ENABLETRIGGER) self.wait_ready() try: output = self._transfer_buffer_numpy(memsize, numch, bytes_per_sample=4) / nr_averages finally: self._stop_acquisition() voltages = self.convert_to_voltage(output, mV_range / 1000) return voltages
[docs] def close(self): """Close handle to the card.""" if self.hCard is not None: pyspcm.spcm_vClose(self.hCard) self.hCard = None super().close()
[docs] def get_card_type(self, verbose=0): """Read card type.""" # read type, function and sn and check for D/A card lCardType = pyspcm.int32(0) pyspcm.spcm_dwGetParam_i32( self.hCard, pyspcm.SPC_PCITYP, pyspcm.byref(lCardType)) if verbose: print('card_type: %s' % szTypeToName(lCardType.value)) return (lCardType.value)
# only works if the error was not caused by running the entire program # (and therefore making a new M4i object)
[docs] def get_error_info32bit(self, verbose=False): """Read an error from the error register. Args: verbose (bool): If True then print the error message to stdout Returns: errorreg (int) errorvalue (int) """ dwErrorReg = pyspcm.uint32(0) lErrorValue = pyspcm.int32(0) if verbose: buffer = (ct.c_uint8 * pyspcm.ERRORTEXTLEN)() pyspcm.spcm_dwGetErrorInfo_i32(self.hCard, pyspcm.byref( dwErrorReg), pyspcm.byref(lErrorValue), buffer) bb = (bytearray(buffer)).decode().strip('\x00') print('get_error_info32bit: %d %d: %s' % (dwErrorReg.value, lErrorValue.value, bb)) else: pyspcm.spcm_dwGetErrorInfo_i32(self.hCard, pyspcm.byref( dwErrorReg), pyspcm.byref(lErrorValue), None) return (dwErrorReg.value, lErrorValue.value)
def _param64bit(self, param): """Read a 64-bit parameter from the device.""" data = pyspcm.int64(0) pyspcm.spcm_dwGetParam_i64(self.hCard, param, pyspcm.byref(data)) return (data.value) def _param32bit(self, param): """Read a 32-bit parameter from the device.""" data = pyspcm.int32(0) pyspcm.spcm_dwGetParam_i32(self.hCard, param, pyspcm.byref(data)) return (data.value) def _set_param32bit(self, param, value): """ Set a 32-bit parameter on the device.""" value = int(value) # convert floating point to int if necessary res = pyspcm.spcm_dwSetParam_i32(self.hCard, param, value) self._last_set_result = res if res == pyspcm.ERR_TIMEOUT: logging.warning('SetParam timeout') elif res != pyspcm.ERR_OK: raise Exception(f'SetParam failed. param:0x{param:08X} value:0x{value:08X}: ' f'result: {_errormsg_dict[res]} (0x{res:08X})') def _invalidate_buf(self, buf_type): """Invalidate device buffer.""" pyspcm.spcm_dwInvalidateBuf(self.hCard, buf_type) def _def_transfer64bit(self, buffer_type, direction, bytes_till_event, data_pointer, offset, buffer_length): """Define a 64-bit transer between the device and the computer.""" pyspcm.spcm_dwDefTransfer_i64( self.hCard, buffer_type, direction, bytes_till_event, data_pointer, offset, buffer_length) def _exact_sample_rate(self): """ Return exact sampling rate as a floating point number """ sample_rate_hz = self.sample_rate() max_sample_rate = self.get_max_sample_rate() factor = int(np.round(max_sample_rate/sample_rate_hz)) return max_sample_rate/factor
[docs] def get_max_sample_rate(self, verbose=0): """Return max sample rate.""" # read type, function and sn and check for D/A card value = self._param32bit(pyspcm.SPC_PCISAMPLERATE) if verbose: print('max_sample_rate: %s' % (value)) return value
[docs] def get_card_memory(self, verbose=0): data = pyspcm.int64(0) pyspcm.spcm_dwGetParam_i64( self.hCard, pyspcm.SPC_PCIMEMSIZE, pyspcm.byref(data)) if verbose: print('card_memory: %s' % (data.value)) return (data.value)
def _hw_memsize(self, size): return (size + 15) // 16 * 16