# this file defines the sequence object
# along with a few helpers
import json
import logging
import typing
import warnings
from copy import deepcopy
from typing import Union, cast
import numpy as np
from schema import Optional, Or, Schema
from broadbean.blueprint import BluePrint
from broadbean.element import Element # TODO: change import to element.py
from broadbean.ripasso import applyInverseRCFilter
from .broadbean import (
PulseAtoms,
_AWGOutput,
_channelListSorter, # TODO: change import to helpers.py
)
log = logging.getLogger(__name__)
fs_schema = Schema(
{
int: {
"type": Or("subsequence", "element"),
"content": {
int: {
"data": {Or(str, int): {str: np.ndarray}},
Optional("sequencing"): {Optional(str): int},
}
},
"sequencing": {Optional(str): int},
}
}
)
[docs]
class SequencingError(Exception):
pass
[docs]
class SequenceConsistencyError(Exception):
pass
[docs]
class InvalidForgedSequenceError(Exception):
pass
[docs]
class SequenceCompatibilityError(Exception):
pass
[docs]
class SpecificationInconsistencyError(Exception):
pass
[docs]
class Sequence:
"""
Sequence object
"""
def __init__(self):
"""
Not much to see here...
"""
# the internal data structure, a dict with tuples as keys and values
# the key is sequence position (int), the value is element (Element)
# or subsequence (Sequence)
self._data = {}
# Here goes the sequencing info. Key: position
# value: dict with keys 'twait', 'nrep', 'jump_input',
# 'jump_target', 'goto'
#
# the sequencing is filled out automatically with default values
# when an element is added
# Note that not all output backends use all items in the list
self._sequencing = {}
# The dictionary to store AWG settings
# Keys will include:
# 'SR', 'channelX_amplitude', 'channelX_offset', 'channelX_filter'
self._awgspecs = {}
# The metainfo to be extracted by measurements
# todo: I'm pretty sure this is obsolete now that description exists
self._meta = {}
# some backends (seqx files) allow for a sequence to have a name
# we make the name a property of the sequence
self._name = ""
def __eq__(self, other):
if not isinstance(other, Sequence):
return False
elif not self._data == other._data:
return False
elif not self._meta == other._meta:
return False
elif not self._awgspecs == other._awgspecs:
return False
elif not self._sequencing == other._sequencing:
return False
else:
return True
def __add__(self, other):
"""
Add two sequences.
Return a new sequence with is the right argument appended to the
left argument.
"""
# Validation
if not self.checkConsistency():
raise SequenceConsistencyError("Left hand sequence inconsistent!")
if not other.checkConsistency():
raise SequenceConsistencyError("Right hand sequence inconsistent!")
if not self._awgspecs == other._awgspecs:
raise SequenceCompatibilityError(
"Incompatible sequences: different AWGspecifications."
)
newseq = Sequence()
N = len(self._data)
newdata1 = {key: self.element(key).copy() for key in self._data.keys()}
newdata2 = {key + N: other.element(key).copy() for key in other._data.keys()}
newdata1.update(newdata2)
newseq._data = newdata1
newsequencing1 = {
key: self._sequencing[key].copy() for key in self._sequencing.keys()
}
newsequencing2 = dict()
for key, item in other._sequencing.items():
newitem = item.copy()
# update goto and jump according to new sequence length
if newitem["goto"] > 0:
newitem["goto"] += N
if newitem["jump_target"] > 0:
newitem["jump_target"] += N
newsequencing2.update({key + N: newitem})
newsequencing1.update(newsequencing2)
newseq._sequencing = newsequencing1
newseq._awgspecs = other._awgspecs.copy()
return newseq
[docs]
def copy(self):
"""
Returns a copy of the sequence.
"""
newseq = Sequence()
newseq._data = deepcopy(self._data)
newseq._meta = deepcopy(self._meta)
newseq._awgspecs = deepcopy(self._awgspecs)
newseq._sequencing = deepcopy(self._sequencing)
return newseq
[docs]
def setSequenceSettings(self, pos, wait, nreps, jump, goto):
"""
Set the sequence setting for the sequence element at pos.
Args:
pos (int): The sequence element (counting from 1)
wait (int): The wait state specifying whether to wait for a
trigger. 0: OFF, don't wait, 1: ON, wait. For some backends,
additional integers are allowed to specify the trigger input.
0 always means off.
nreps (int): Number of repetitions. 0 corresponds to infinite
repetitions
jump (int): Event jump target, the position of a sequence element.
If 0, the event jump state is off.
goto (int): Goto target, the position of a sequence element.
0 means next.
"""
warnings.warn(
"Deprecation warning. This function is only compatible "
"with AWG5014 output and will be removed. "
"Please use the specific setSequencingXXX methods."
)
# Validation (some validation 'postponed' and put in checkConsistency)
#
# Because of different compliances for different backends,
# most validation of these settings is deferred and performed
# in the outputForXXX methods
self._sequencing[pos] = {
"twait": wait,
"nrep": nreps,
"jump_target": jump,
"goto": goto,
"jump_input": 0,
}
[docs]
def setSequencingTriggerWait(self, pos: int, wait: int) -> None:
"""
Set the trigger wait for the sequence element at pos. For
AWG 5014 out, this can be 0 or 1, For AWG 70000A output, this
can be 0, 1, 2, or 3.
Args:
pos: The sequence element (counting from 1)
wait: The wait state/input depending on backend.
"""
self._sequencing[pos]["twait"] = wait
[docs]
def setSequencingNumberOfRepetitions(self, pos: int, nrep: int) -> None:
"""
Set the number of repetitions for the sequence element at pos.
Args:
pos: The sequence element (counting from 1)
nrep: The number of repetitions (0 means infinite)
"""
self._sequencing[pos]["nrep"] = nrep
[docs]
def setSequencingEventJumpTarget(self, pos: int, jump_target: int) -> None:
"""
Set the event jump target for the sequence element at pos.
Args:
pos: The sequence element (counting from 1)
jump_target: The sequence element to jump to (counting from 1)
"""
self._sequencing[pos]["jump_target"] = jump_target
[docs]
def setSequencingGoto(self, pos: int, goto: int) -> None:
"""
Set the goto target (which element to play after the current one ends)
for the sequence element at pos.
Args:
pos: The sequence element (counting from 1)
goto: The position of the element to play. 0 means 'next in line'
"""
self._sequencing[pos]["goto"] = goto
[docs]
def setSR(self, SR):
"""
Set the sample rate for the sequence
"""
self._awgspecs["SR"] = SR
[docs]
def setChannelVoltageRange(self, channel, ampl, offset):
"""
Assign the physical voltages of the channel. This is used when making
output for .awg files. The corresponding parameters in the QCoDeS
AWG5014 driver are called chXX_amp and chXX_offset. Please ensure that
the channel in question is indeed in ampl/offset mode and not in
high/low mode.
Args:
channel (int): The channel number
ampl (float): The channel peak-to-peak amplitude (V)
offset (float): The channel offset (V)
"""
warnings.warn(
"Deprecation warning. This function is deprecated."
" Use setChannelAmplitude and SetChannelOffset "
"instead."
)
keystr = f"channel{channel}_amplitude"
self._awgspecs[keystr] = ampl
keystr = f"channel{channel}_offset"
self._awgspecs[keystr] = offset
[docs]
def setChannelAmplitude(self, channel: Union[int, str], ampl: float) -> None:
"""
Assign the physical voltage amplitude of the channel. This is used
when making output for real instruments.
Args:
channel: The channel number
ampl: The channel peak-to-peak amplitude (V)
"""
keystr = f"channel{channel}_amplitude"
self._awgspecs[keystr] = ampl
[docs]
def setChannelOffset(self, channel: Union[int, str], offset: float) -> None:
"""
Assign the physical voltage offset of the channel. This is used
by some backends when making output for real instruments
Args:
channel: The channel number/name
offset: The channel offset (V)
"""
keystr = f"channel{channel}_offset"
self._awgspecs[keystr] = offset
[docs]
def setChannelDelay(self, channel: Union[int, str], delay: float) -> None:
"""
Assign a delay to a channel. This is used when making output for .awg
files. Use the delay to compensate for cable length differences etc.
Zeros are prepended to the waveforms to delay them and correspondingly
appended to non (or less) delayed channels.
Args:
channel: The channel number/name
delay: The required delay (s)
Raises:
ValueError: If a non-integer or non-non-negative channel number is
given.
"""
self._awgspecs[f"channel{channel}_delay"] = delay
[docs]
def setChannelFilterCompensation(
self,
channel: Union[str, int],
kind: str,
order: int = 1,
f_cut: typing.Optional[float] = None,
tau: typing.Optional[float] = None,
) -> None:
"""
Specify a filter to compensate for.
The specified channel will get a compensation (pre-distorion) to
compensate for the specified frequency filter. Just to be clear:
the INVERSE transfer function of the one you specify is applied.
Only compensation for simple RC-circuit type high pass and low
pass is supported.
Args:
channel: The channel to apply this to.
kind: Either 'LP' or 'HP'
order: The order of the filter to compensate for.
May be negative. Default: 1.
f_cut: The cut_off frequency (Hz).
tau): The time constant (s). Note that
tau = 1/f_cut and that only one of the two can be specified.
Raises:
ValueError: If kind is not 'LP' or 'HP'
ValueError: If order is not an int.
SpecificationInconsistencyError: If both f_cut and tau are given.
"""
if kind not in ["HP", "LP"]:
raise ValueError(
'Filter kind must either be "LP" (low pass) or "HP" (high pass).'
)
if not isinstance(order, int):
raise ValueError("Filter order must be an integer.")
if (f_cut is not None) and (tau is not None):
raise SpecificationInconsistencyError(
"Can not specify BOTH a time constant and a cut-off frequency."
)
keystr = f"channel{channel}_filtercompensation"
self._awgspecs[keystr] = {
"kind": kind,
"order": order,
"f_cut": f_cut,
"tau": tau,
}
[docs]
def addElement(self, position: int, element: Element) -> None:
"""
Add an element to the sequence. Overwrites previous values.
Args:
position (int): The sequence position of the element (lowest: 1)
element (Element): An element instance
Raises:
ValueError: If the element has inconsistent durations
"""
# Validation
element.validateDurations()
# make a new copy of the element
newelement = element.copy()
# Data mutation
self._data.update({position: newelement})
# insert default sequencing settings
self._sequencing[position] = {
"twait": 0,
"nrep": 1,
"jump_input": 0,
"jump_target": 0,
"goto": 0,
}
[docs]
def addSubSequence(self, position: int, subsequence: "Sequence") -> None:
"""
Add a subsequence to the sequence. Overwrites anything previously
assigned to this position. The subsequence can not contain any
subsequences itself.
Args:
position: The sequence position (starting from 1)
subsequence: The subsequence to add
"""
if not isinstance(subsequence, Sequence):
raise ValueError(
"Subsequence must be a sequence object. "
"Received object of type "
f"{type(subsequence)}."
)
for elem in subsequence._data.values():
if isinstance(elem, Sequence):
raise ValueError("Subsequences can not contain subsequences.")
if subsequence.SR != self.SR:
raise ValueError(
"Subsequence SR does not match (main) sequence SR"
f". ({subsequence.SR} and {self.SR})."
)
self._data[position] = subsequence.copy()
self._sequencing[position] = {
"twait": 0,
"nrep": 1,
"jump_input": 0,
"jump_target": 0,
"goto": 0,
}
[docs]
def checkConsistency(self, verbose=False):
"""
Checks wether the sequence can be built, i.e. wether all elements
have waveforms on the same channels and of the same length.
"""
# TODO: Give helpful info if the check fails
try:
self._awgspecs["SR"]
except KeyError:
raise KeyError("No sample rate specified. Can not perform check")
# First check that all sample rates agree
# Since all elements are validated on input, the SR exists
SRs = [elem.SR for elem in self._data.values()]
if SRs == []: # case of empty Sequence
SRs = [None]
if SRs.count(SRs[0]) != len(SRs):
failmssg = "checkConsistency failed: inconsistent sample rates."
log.info(failmssg)
if verbose:
print(failmssg)
return False
# Then check that elements use the same channels
specchans = []
for elem in self._data.values():
chans = _channelListSorter(elem.channels)
specchans.append(chans)
if specchans == []: # case of empty Sequence
chans = None
specchans = [None]
if specchans.count(chans) != len(specchans):
failmssg = (
"checkConsistency failed: different elements specify "
"different channels"
)
log.info(failmssg)
if verbose:
print(failmssg)
return False
# TODO: must all elements have same length? Does any AWG require this?
# Finally, check that all positions are filled
positions = list(self._data.keys())
if positions == []: # case of empty Sequence
positions = [1]
if not positions == list(range(1, len(positions) + 1)):
failmssg = (
"checkConsistency failed: inconsistent sequence"
"positions. Must be 1, 2, 3, ..."
)
log.info(failmssg)
if verbose:
print(failmssg)
return False
# If all three tests pass...
return True
@property
def description(self):
"""
Return a dictionary fully describing the Sequence.
"""
desc = {}
for pos, elem in self._data.items():
desc[str(pos)] = {}
desc[str(pos)]["channels"] = elem.description
try:
sequencing = self._sequencing[pos]
seqdict = {
"Wait trigger": sequencing["twait"],
"Repeat": sequencing["nrep"],
"jump_input": sequencing["jump_input"],
"jump_target": sequencing["jump_target"],
"Go to": sequencing["goto"],
}
desc[str(pos)]["sequencing"] = seqdict
except KeyError:
desc[str(pos)]["sequencing"] = "Not set"
desc["awgspecs"] = self._awgspecs
return desc
[docs]
def write_to_json(self, path_to_file: str) -> None:
"""
Writes sequences to JSON file
Args:
path_to_file: the path to the file to write to ex:
path_to_file/sequense.json
"""
with open(path_to_file, "w") as fp:
json.dump(self.description, fp, indent=4)
[docs]
@classmethod
def sequence_from_description(cls, seq_dict: dict) -> "Sequence":
"""
Returns a sequence from a description given as a dict
Args:
seq_dict: a dict in the same form as returned by
Sequence.description
"""
awgspecs = seq_dict["awgspecs"]
SR = awgspecs["SR"]
elem_list = list(seq_dict.keys())
new_instance = cls()
for ele in elem_list[:-1]:
channels_list = list(seq_dict[ele]["channels"].keys())
elem = Element()
for chan in channels_list:
bp_sum = BluePrint.blueprint_from_description(
seq_dict[ele]["channels"][chan]
)
bp_sum.setSR(SR)
elem.addBluePrint(int(chan), bp_sum)
if "flags" in seq_dict[ele]["channels"][chan]:
flags = seq_dict[ele]["channels"][chan]["flags"]
elem.addFlags(int(chan), flags)
ChannelAmplitude = awgspecs[f"channel{chan}_amplitude"]
new_instance.setChannelAmplitude(
int(chan), ChannelAmplitude
) # Call signature: channel, amplitude (peak-to-peak)
ChannelOffset = awgspecs[f"channel{chan}_offset"]
new_instance.setChannelOffset(int(chan), ChannelOffset)
new_instance.addElement(int(ele), elem)
sequencedict = seq_dict[ele]["sequencing"]
new_instance.setSequencingTriggerWait(
int(ele), sequencedict["Wait trigger"]
)
new_instance.setSequencingNumberOfRepetitions(
int(ele), sequencedict["Repeat"]
)
new_instance.setSequencingEventInput(int(ele), sequencedict["jump_input"])
new_instance.setSequencingEventJumpTarget(
int(ele), sequencedict["jump_target"]
)
new_instance.setSequencingGoto(int(ele), sequencedict["Go to"])
new_instance.setSR(SR)
return new_instance
[docs]
@classmethod
def init_from_json(cls, path_to_file: str) -> "Sequence":
"""
Reads sequense from JSON file
Args:
path_to_file: the path to the file to be read ex:
path_to_file/sequense.json
This function is the inverse of write_to_json
The JSON file needs to be structured as if it was writen
by the function write_to_json
"""
new_instance = cls()
with open(path_to_file) as fp:
data_loaded = json.load(fp)
new_instance = Sequence.sequence_from_description(data_loaded)
return new_instance
@property
def name(self):
return self._name
@name.setter
def name(self, newname):
if not isinstance(newname, str):
raise ValueError("The sequence name must be a string")
self._name = newname
@property
def length_sequenceelements(self):
"""
Returns the current number of specified sequence elements
"""
return len(self._data)
@property
def SR(self):
"""
Returns the sample rate, if defined. Else returns -1.
"""
try:
SR = self._awgspecs["SR"]
except KeyError:
SR = -1
return SR
@property
def channels(self):
"""
Returns a list of the specified channels of the sequence
"""
if self.checkConsistency():
return self.element(1).channels
else:
raise SequenceConsistencyError(
"Sequence not consistent. Can not figure out the channels."
)
@property
def points(self):
"""
Returns the number of points of the sequence, disregarding
sequencing info (like repetitions). Useful for asserting upload
times, i.e. the size of the built sequence.
"""
total = 0
for elem in self._data.values():
total += elem.points
return total
@property
def duration(self) -> float:
"""
Returns the duration in seconds of the sequence.
"""
duration = 0.0
for pos, elem in self._data.items():
nrep = self._sequencing[pos]["nrep"]
duration += nrep * elem.duration
return duration
[docs]
def element(self, pos):
"""
Returns the element at the given position. Changes made to the return
value of this methods will apply to the sequence. If this is undesired,
make a copy of the returned element using Element.copy
Args:
pos (int): The sequence position
Raises:
KeyError: If no element is specified at the given position
"""
try:
elem = self._data[pos]
except KeyError:
raise KeyError(f"No element specified at sequence position {pos}")
return elem
@staticmethod
def _plotSummary(seq: dict[int, dict]) -> dict[int, dict[str, np.ndarray]]:
"""
Return a plotting summary of a subsequence.
Args:
seq: The 'content' value of a forged sequence where a
subsequence resides
Returns:
A dict that looks like a forged element, but all waveforms
are just two points, np.array([min, max])
"""
output = {}
# we assume correctness, all postions specify the same channels
chans = seq[1]["data"].keys()
minmax = dict(zip(chans, [(0, 0)] * len(chans)))
for element in seq.values():
arr_dict = element["data"]
for chan in chans:
wfm = arr_dict[chan]["wfm"]
if wfm.min() < minmax[chan][0]:
minmax[chan] = (wfm.min(), minmax[chan][1])
if wfm.max() > minmax[chan][1]:
minmax[chan] = (minmax[chan][0], wfm.max())
output[chan] = {
"wfm": np.array(minmax[chan]),
"m1": np.zeros(2),
"m2": np.zeros(2),
"time": np.linspace(0, 1, 2),
}
return output
[docs]
def forge(
self,
apply_delays: bool = True,
apply_filters: bool = True,
includetime: bool = False,
) -> dict[int, dict]:
"""
Forge the sequence, applying all specified transformations
(delays and ripasso filter corrections). Copies the data, so
that the sequence is not modified by forging.
Args:
apply_delays: Whether to apply the assigned channel delays
(if any)
apply_filters: Whether to apply the assigned channel filters
(if any)
includetime: Whether to include the time axis and the segment
durations (a list) with the arrays. Used for plotting.
Returns:
A nested dictionary holding the forged sequence.
"""
# Validation
if not self.checkConsistency():
raise ValueError(
"Can not generate output. Something is "
"inconsistent. Please run "
"checkConsistency(verbose=True) for more details"
)
output: dict[int, dict] = {}
channels = self.channels
data = deepcopy(self._data)
seqlen = len(data.keys())
# TODO: in this function, we iterate through the sequence three times
# It is probably worth considering refactoring that into a single
# iteration, although that may compromise readability
# Apply channel delays.
if apply_delays:
delays = []
for chan in channels:
try:
delays.append(self._awgspecs[f"channel{chan}_delay"])
except KeyError:
delays.append(0)
for pos in range(1, seqlen + 1):
if isinstance(data[pos], Sequence):
subseq = data[pos]
for elem in subseq._data.values():
elem._applyDelays(delays)
elif isinstance(data[pos], Element):
data[pos]._applyDelays(delays)
# forge arrays and form the output dict
for pos in range(1, seqlen + 1):
output[pos] = {}
output[pos]["sequencing"] = self._sequencing[pos]
if isinstance(data[pos], Sequence):
subseq = data[pos]
output[pos]["type"] = "subsequence"
output[pos]["content"] = {}
for pos2 in range(1, subseq.length_sequenceelements + 1):
output[pos]["content"][pos2] = {"data": {}, "sequencing": {}}
elem = subseq.element(pos2)
dictdata = elem.getArrays(includetime=includetime)
output[pos]["content"][pos2]["data"] = dictdata
seqing = subseq._sequencing[pos2]
output[pos]["content"][pos2]["sequencing"] = seqing
# TODO: update sequencing
elif isinstance(data[pos], Element):
elem = data[pos]
output[pos]["type"] = "element"
dictdata = elem.getArrays(includetime=includetime)
output[pos]["content"] = {1: {"data": dictdata}}
# apply filter corrections to forged arrays
if apply_filters:
for pos1 in range(1, seqlen + 1):
thiselem = output[pos1]["content"]
for pos2 in thiselem.keys():
data = thiselem[pos2]["data"]
for channame in data.keys():
keystr = f"channel{channame}_filtercompensation"
if keystr in self._awgspecs.keys():
kind = self._awgspecs[keystr]["kind"]
order = self._awgspecs[keystr]["order"]
f_cut = self._awgspecs[keystr]["f_cut"]
tau = self._awgspecs[keystr]["tau"]
if f_cut is None:
f_cut = 1 / tau
prefilter = data[channame]["wfm"]
postfilter = applyInverseRCFilter(
prefilter, self.SR, kind, f_cut, order, DCgain=1
)
(
output[pos1]["content"][pos2]["data"][channame]["wfm"]
) = postfilter
return output
def _prepareForOutputting(self) -> list[dict[int, np.ndarray]]:
"""
The preparser for numerical output. Applies delay and ripasso
corrections.
Returns:
A list of outputs of the Element's getArrays functions, i.e.
a list of dictionaries with key position (int) and value
an np.ndarray of array([wfm, m1, m2, time]), where the
wfm values are still in V. The particular backend output
function must rescale to the specific format it adheres to.
"""
# Validation
if not self.checkConsistency():
raise ValueError(
"Can not generate output. Something is "
"inconsistent. Please run "
"checkConsistency(verbose=True) for more details"
)
#
#
channels = self.element(1).channels # all elements have ident. chans
# We copy the data so that the state of the Sequence is left unaltered
# by outputting for AWG
data = deepcopy(self._data)
seqlen = len(data.keys())
# check if sequencing information is specified for each element
if not sorted(list(self._sequencing.keys())) == list(range(1, seqlen + 1)):
raise ValueError(
"Can not generate output for file; incorrect sequencer information."
)
# Verify physical amplitude specifiations
for chan in channels:
ampkey = f"channel{chan}_amplitude"
if ampkey not in self._awgspecs.keys():
raise KeyError(
"No amplitude specified for channel {chan}. Can not continue."
)
# Apply channel delays.
delays = []
for chan in channels:
try:
delays.append(self._awgspecs[f"channel{chan}_delay"])
except KeyError:
delays.append(0)
maxdelay = max(delays)
for pos in range(1, seqlen + 1):
for chanind, chan in enumerate(channels):
element = data[pos]
delay = delays[chanind]
if "blueprint" in element._data[chan].keys():
blueprint = element._data[chan]["blueprint"]
# prevent information about flags to be lost
if "flags" in element._data[chan].keys():
flags = element._data[chan]["flags"]
else:
flags = None
# update existing waituntils
for segpos in range(len(blueprint._funlist)):
if blueprint._funlist[segpos] == "waituntil":
oldwait = blueprint._argslist[segpos][0]
blueprint._argslist[segpos] = (oldwait + delay,)
# insert delay before the waveform
if delay > 0:
blueprint.insertSegment(0, "waituntil", (delay,), "waituntil")
# add zeros at the end
if maxdelay - delay > 0:
blueprint.insertSegment(
-1, PulseAtoms.ramp, (0, 0), dur=maxdelay - delay
)
# TODO: is the next line even needed?
# If not, remove the code updating the flags below
# and the one remembering them above
element.addBluePrint(chan, blueprint)
if flags is not None:
element.addFlags(chan, flags)
else:
arrays = element._data[chan]["array"]
for name, arr in arrays.items():
pre_wait = np.zeros(int(delay / self.SR))
post_wait = np.zeros(int((maxdelay - delay) / self.SR))
arrays[name] = np.concatenate((pre_wait, arr, post_wait))
# Now forge all the elements as specified
elements = [] # the forged elements
for pos in range(1, seqlen + 1):
elements.append(data[pos].getArrays())
# Now that the numerical arrays exist, we can apply filter compensation
for chan in channels:
keystr = f"channel{chan}_filtercompensation"
if keystr in self._awgspecs.keys():
kind = self._awgspecs[keystr]["kind"]
order = self._awgspecs[keystr]["order"]
f_cut = self._awgspecs[keystr]["f_cut"]
tau = self._awgspecs[keystr]["tau"]
if f_cut is None:
f_cut = 1 / tau
for pos in range(seqlen):
prefilter = elements[pos][chan]["wfm"]
postfilter = applyInverseRCFilter(
prefilter, self.SR, kind, f_cut, order, DCgain=1
)
elements[pos][chan]["wfm"] = postfilter
return elements
[docs]
def outputForSEQXFile(
self,
) -> tuple[
list[int],
list[int],
list[int],
list[int],
list[int],
list[list[np.ndarray]],
list[float],
str,
]:
"""
Generate a tuple matching the call signature of the QCoDeS
AWG70000A driver's `makeSEQXFile` function. If channel delays
have been specified, they are added to the ouput before exporting.
The intended use of this function together with the QCoDeS driver is
.. code:: python
pkg = seq.outputForSEQXFile()
seqx = awg70000A.makeSEQXFile(*pkg)
Returns:
A tuple holding (trig_waits, nreps, event_jumps, event_jump_to,
go_to, wfms, amplitudes, seqname)
"""
# most of the footwork is done by the following function
elements = self._prepareForOutputting()
# _prepareForOutputting asserts that channel amplitudes and
# full sequencing is specified
seqlen = len(elements)
# all elements have ident. chans since _prepareForOutputting
# did not raise an exception
channels = self.element(1).channels
for chan in channels:
offkey = f"channel{chan}_offset"
if offkey in self._awgspecs.keys():
log.warning(
"Found a specified offset for channel "
f"{chan}, but .seqx files can't contain offset "
"information. Will ignore the offset."
""
)
# now check that the amplitudes are within the allowed limits
# also verify that all waveforms are at least 2400 points
# No rescaling because the driver's _makeWFMXBinaryData does
# the rescaling
amplitudes = []
for chan in channels:
ampl = self._awgspecs[f"channel{chan}_amplitude"]
amplitudes.append(ampl)
if len(amplitudes) == 1:
amplitudes.append(0)
for pos in range(1, seqlen + 1):
element = elements[pos - 1]
for chan in channels:
ampl = self._awgspecs[f"channel{chan}_amplitude"]
wfm = element[chan]["wfm"]
# check the waveform length
if len(wfm) < 2400:
raise ValueError(
"Waveform too short on channel "
f"{chan} at step {pos}; only {len(wfm)} points. "
"The required minimum is 2400 points."
""
)
# check whether the waveform voltages can be realised
if wfm.max() > ampl / 2:
raise ValueError(
"Waveform voltages exceed channel range "
f"on channel {chan}"
f" sequence element {pos}."
f" {wfm.max()} > {ampl/2}!"
)
if wfm.min() < -ampl / 2:
raise ValueError(
"Waveform voltages exceed channel range "
f"on channel {chan}"
f" sequence element {pos}. "
f"{wfm.min()} < {-ampl/2}!"
)
element[chan]["wfm"] = wfm
elements[pos - 1] = element
# Finally cast the lists into the shapes required by the AWG driver
waveforms = cast(list[list[np.ndarray]], [[] for dummy in range(len(channels))])
nreps = []
trig_waits = []
gotos = []
jump_states = []
jump_tos = []
# Since sequencing options are valid/invalid differently for
# different backends, we make the validation here
for pos in range(1, seqlen + 1):
for chanind, chan in enumerate(channels):
wfm = elements[pos - 1][chan]["wfm"]
m1 = elements[pos - 1][chan]["m1"]
m2 = elements[pos - 1][chan]["m2"]
waveforms[chanind].append(np.array([wfm, m1, m2]))
twait = self._sequencing[pos]["twait"]
nrep = self._sequencing[pos]["nrep"]
jump_to = self._sequencing[pos]["jump_target"]
jump_state = self._sequencing[pos]["jump_input"]
goto = self._sequencing[pos]["goto"]
if twait not in [0, 1, 2, 3]:
raise SequencingError(
"Invalid trigger input at position"
f"{pos}: {twait}. Must be 0, 1, 2, or 3."
""
)
if jump_state not in [0, 1, 2, 3]:
raise SequencingError(
"Invalid event jump input at position"
f"{pos}: {twait}. Must be either 0, 1, 2, or 3."
""
)
if nrep not in range(0, 16384):
raise SequencingError(
"Invalid number of repetions at position"
f"{pos}: {nrep}. Must be either 0 (infinite) "
"or 1-16,383."
)
if jump_to not in range(-1, seqlen + 1):
raise SequencingError(
"Invalid event jump target at position"
f"{pos}: {jump_to}. Must be either -1 (next),"
f" 0 (off), or 1-{seqlen}."
""
)
if goto not in range(0, seqlen + 1):
raise SequencingError(
"Invalid goto target at position"
f"{pos}: {goto}. Must be either 0 (next),"
f" or 1-{seqlen}."
""
)
trig_waits.append(twait)
nreps.append(nrep)
jump_tos.append(jump_to)
jump_states.append(jump_state)
gotos.append(goto)
return (
trig_waits,
nreps,
jump_states,
jump_tos,
gotos,
waveforms,
amplitudes,
self.name,
)
[docs]
def outputForSEQXFileWithFlags(
self,
) -> tuple[
list[int],
list[int],
list[int],
list[int],
list[int],
list[list[np.ndarray]],
list[float],
str,
list[list[list[int]]],
]:
"""
Generate a tuple matching the call signature of the QCoDeS
AWG70000A driver's `makeSEQXFile` function. Same as outputForSEQXFile(),
but also includes information about the flags.
Returns:
A tuple holding (trig_waits, nreps, event_jumps, event_jump_to,
go_to, wfms, amplitudes, seqname, flags)
"""
elements = self._prepareForOutputting()
seqlen = len(elements)
channels = self.element(1).channels
# add flags for every element and channel
all_flags = []
for chanind, chan in enumerate(channels):
flags_pos = []
for pos in range(1, seqlen + 1):
if "flags" in elements[pos - 1][chan]:
flags = elements[pos - 1][chan]["flags"].tolist()
else:
flags = [0, 0, 0, 0]
flags_pos.append(flags)
all_flags.append(flags_pos)
return self.outputForSEQXFile() + (all_flags,)
[docs]
def outputForAWGFile(self):
"""
Returns a sliceable object with items matching the call
signature of the 'make_*_awg_file' functions of the QCoDeS
AWG5014 driver. One may then construct an awg file as follows
(assuming that seq is the sequence object):
.. code:: python
package = seq.outputForAWGFile()
make_awg_file(*package[:], **kwargs)
"""
elements = self._prepareForOutputting()
seqlen = len(elements)
# all elements have ident. chans since _prepareForOutputting
# did not raise an exception
channels = self.element(1).channels
for chan in channels:
offkey = f"channel{chan}_offset"
if offkey not in self._awgspecs.keys():
raise ValueError(
f"No specified offset for channel {chan}, can not continue."
)
# Apply channel scaling
# We must rescale to the interval -1, 1 where 1 is ampl/2+off and -1 is
# -ampl/2+off.
#
def rescaler(val, ampl, off):
return val / ampl * 2 - off
for pos in range(1, seqlen + 1):
element = elements[pos - 1]
for chan in channels:
ampl = self._awgspecs[f"channel{chan}_amplitude"]
off = self._awgspecs[f"channel{chan}_offset"]
wfm = element[chan]["wfm"]
# check whether the waveform voltages can be realised
if wfm.max() > ampl / 2 + off:
raise ValueError(
"Waveform voltages exceed channel range "
f"on channel {chan}"
f" sequence element {pos}."
f" {wfm.max()} > {ampl/2+off}!"
)
if wfm.min() < -ampl / 2 + off:
raise ValueError(
"Waveform voltages exceed channel range "
f"on channel {chan}"
f" sequence element {pos}. "
f"{wfm.min()} < {-ampl/2+off}!"
)
wfm = rescaler(wfm, ampl, off)
element[chan]["wfm"] = wfm
elements[pos - 1] = element
# Finally cast the lists into the shapes required by the AWG driver
waveforms = [[] for dummy in range(len(channels))]
m1s = [[] for dummy in range(len(channels))]
m2s = [[] for dummy in range(len(channels))]
nreps = []
trig_waits = []
gotos = []
jump_tos = []
# Since sequencing options are valid/invalid differently for
# different backends, we make the validation here
for pos in range(1, seqlen + 1):
for chanind, chan in enumerate(channels):
waveforms[chanind].append(elements[pos - 1][chan]["wfm"])
m1s[chanind].append(elements[pos - 1][chan]["m1"])
m2s[chanind].append(elements[pos - 1][chan]["m2"])
twait = self._sequencing[pos]["twait"]
nrep = self._sequencing[pos]["nrep"]
jump_to = self._sequencing[pos]["jump_target"]
goto = self._sequencing[pos]["goto"]
if twait not in [0, 1]:
raise SequencingError(
"Invalid trigger wait state at position"
f"{pos}: {twait}. Must be either 0 or 1."
""
)
if nrep not in range(0, 65537):
raise SequencingError(
"Invalid number of repetions at position"
f"{pos}: {nrep}. Must be either 0 (infinite) "
"or 1-65,536."
)
if jump_to not in range(-1, seqlen + 1):
raise SequencingError(
"Invalid event jump target at position"
f"{pos}: {jump_to}. Must be either -1 (next),"
f" 0 (off), or 1-{seqlen}."
""
)
if goto not in range(0, seqlen + 1):
raise SequencingError(
"Invalid goto target at position"
f"{pos}: {goto}. Must be either 0 (next),"
f" or 1-{seqlen}."
""
)
trig_waits.append(twait)
nreps.append(nrep)
jump_tos.append(jump_to)
gotos.append(goto)
# ...and make a sliceable object out of them
output = _AWGOutput(
(waveforms, m1s, m2s, nreps, trig_waits, gotos, jump_tos), self.channels
)
return output