Source code for broadbean.blueprint

# This file is for defining the blueprint object

import functools as ft
import json
import re
import warnings
from inspect import signature

import numpy as np

from .broadbean import PulseAtoms


[docs] class SegmentDurationError(Exception): pass
[docs] class BluePrint: """ The class of a waveform to become. """ def __init__( self, funlist=None, argslist=None, namelist=None, marker1=None, marker2=None, segmentmarker1=None, segmentmarker2=None, SR=None, durslist=None, ): """ Create a BluePrint instance Args: funlist (list): List of functions argslist (list): List of tuples of arguments namelist (list): List of names for the functions marker1 (list): List of marker1 specification tuples marker2 (list): List of marker2 specifiation tuples durslist (list): List of durations Returns: BluePrint """ # TODO: validate input # Sanitising if funlist is None: funlist = [] if argslist is None: argslist = [] if namelist is None: namelist = [] if durslist is None: durslist = [] # Are the lists of matching lengths? lenlist = [len(funlist), len(argslist), len(namelist), len(durslist)] if any(elem != lenlist[0] for elem in lenlist): raise ValueError( "All input lists must be of same length. " f"Received lengths: {lenlist}" ) # Are the names valid names? for name in namelist: if not isinstance(name, str): raise ValueError(f"All segment names must be strings. Received {name}.") if name != "" and name[-1].isdigit(): raise ValueError( "Segment names are not allowed to end" f" in a number. {name} is " "therefore not a valid name." ) self._funlist = funlist # Make special functions live in the funlist but transfer their names # to the namelist # Infer names from signature if not given, i.e. allow for '' names for ii, name in enumerate(namelist): if isinstance(funlist[ii], str): namelist[ii] = funlist[ii] elif name == "": namelist[ii] = funlist[ii].__name__ # Allow single arguments to be given as not tuples for ii, args in enumerate(argslist): if not isinstance(args, tuple): argslist[ii] = (args,) self._argslist = argslist self._namelist = namelist namelist = self._make_names_unique(namelist) # initialise markers if marker1 is None: self.marker1 = [] else: self.marker1 = marker1 if marker2 is None: self.marker2 = [] else: self.marker2 = marker2 if segmentmarker1 is None: self._segmark1 = [(0, 0)] * len(funlist) else: self._segmark1 = segmentmarker1 if segmentmarker2 is None: self._segmark2 = [(0, 0)] * len(funlist) else: self._segmark2 = segmentmarker2 if durslist is not None: self._durslist = list(durslist) else: self._durslist = None self._SR = SR @staticmethod def _basename(string): """ Remove trailing numbers from a string. """ if not isinstance(string, str): raise ValueError( f"_basename received a non-string input! Got the following: {string}" ) if string == "": return string if not (string[-1].isdigit()): return string else: counter = 0 for ss in string[::-1]: if ss.isdigit(): counter += 1 else: break return string[:-counter] # lst = [letter for letter in string if not letter.isdigit()] # return ''.join(lst) @staticmethod def _make_names_unique(lst): """ Make all strings in the input list unique by appending numbers to reoccuring strings Args: lst (list): List of strings. Intended for the _namelist """ if not isinstance(lst, list): raise ValueError(f"_make_names_unique received a non-list input! Got {lst}") baselst = [BluePrint._basename(lstel) for lstel in lst] uns = np.unique(baselst) for un in uns: inds = [ii for ii, el in enumerate(baselst) if el == un] for ii, ind in enumerate(inds): # Do not append numbers to the first occurence if ii == 0: lst[ind] = f"{un}" else: lst[ind] = f"{un}{ii+1}" return lst @property def length_segments(self): """ Returns the number of segments in the blueprint """ return len(self._namelist) @property def duration(self): """ The total duration of the BluePrint. If necessary, all the arrays are built. """ waits = "waituntil" in self._funlist ensavgs = "ensureaverage_fixed_level" in self._funlist if not (waits) and not (ensavgs): return sum(self._durslist) elif waits and not (ensavgs): waitdurations = self._makeWaitDurations() return sum(waitdurations) elif ensavgs: # TODO: call the forger raise NotImplementedError( "ensureaverage_fixed_level does not exist yet. Cannot proceed" ) @property def points(self): """ The total number of points in the BluePrint. If necessary, all the arrays are built. """ waits = "waituntil" in self._funlist ensavgs = "ensureaverage_fixed_level" in self._funlist SR = self.SR if SR is None: raise ValueError( "No sample rate specified, can not return the number of points." ) if not (waits) and not (ensavgs): return int(np.round(sum(self._durslist) * SR)) elif waits and not (ensavgs): waitdurations = self._makeWaitDurations() return int(np.round(sum(waitdurations) * SR)) elif ensavgs: # TODO: call the forger raise NotImplementedError( "ensureaverage_fixed_level does not exist yet. Cannot proceed" ) @property def durations(self): """ The list of durations """ return self._durslist @property def SR(self): """ Sample rate of the blueprint """ return self._SR @property def description(self): """ Returns a dict describing the blueprint. """ desc = {} # the dict to return no_segs = len(self._namelist) for sn in range(no_segs): segkey = f"segment_{sn+1:02d}" desc[segkey] = {} desc[segkey]["name"] = self._namelist[sn] if self._funlist[sn] == "waituntil": desc[segkey]["function"] = self._funlist[sn] else: funname = str(self._funlist[sn])[1:] funname = funname[: funname.find(" at")] desc[segkey]["function"] = funname desc[segkey]["durations"] = self._durslist[sn] if desc[segkey]["function"] == "waituntil": desc[segkey]["arguments"] = {"waittime": self._argslist[sn]} else: sig = signature(self._funlist[sn]) desc[segkey]["arguments"] = dict( zip(sig.parameters, self._argslist[sn]) ) desc["marker1_abs"] = self.marker1 desc["marker2_abs"] = self.marker2 desc["marker1_rel"] = self._segmark1 desc["marker2_rel"] = self._segmark2 return desc
[docs] def write_to_json(self, path_to_file: str) -> None: """ Writes blueprint to JSON file Args: path_to_file: the path to the file to write to ex: path_to_file/blueprint.json """ with open(path_to_file, "w") as fp: json.dump(self.description, fp, indent=4)
[docs] @classmethod def blueprint_from_description(cls, blue_dict): """ Returns a blueprint from a description given as a dict Args: blue_dict: a dict in the same form as returned by BluePrint.description """ knowfunctions = { f"function PulseAtoms.{fun}": getattr(PulseAtoms, fun) for fun in dir(PulseAtoms) if "__" not in fun } seg_mar_list = list(blue_dict.keys()) seg_list = [s for s in seg_mar_list if "segment" in s] bp_sum = cls() for i, seg in enumerate(seg_list): seg_dict = blue_dict[seg] bp_seg = BluePrint() if seg_dict["function"] == "waituntil": arguments = blue_dict[seg]["arguments"].values() arguments = (list(arguments)[0][0],) bp_seg.insertSegment(i, "waituntil", arguments) else: arguments = tuple(blue_dict[seg]["arguments"].values()) bp_seg.insertSegment( i, knowfunctions[seg_dict["function"]], arguments, name=re.sub(r"\d", "", seg_dict["name"]), dur=seg_dict["durations"], ) bp_sum = bp_sum + bp_seg bp_sum.marker1 = blue_dict["marker1_abs"] bp_sum.marker2 = blue_dict["marker2_abs"] listmarker1 = blue_dict["marker1_rel"] listmarker2 = blue_dict["marker2_rel"] bp_sum._segmark1 = [tuple(mark) for mark in listmarker1] bp_sum._segmark2 = [tuple(mark) for mark in listmarker2] return bp_sum
[docs] @classmethod def init_from_json(cls, path_to_file: str) -> "BluePrint": """ Reads blueprint from JSON file Args: path_to_file: the path to the file to be read ex: path_to_file/blueprint.json This function is the inverse of write_to_json The JSON file needs to be structured as if it was writen by the function write_to_json """ with open(path_to_file) as fp: data_loaded = json.load(fp) return cls.blueprint_from_description(data_loaded)
def _makeWaitDurations(self): """ Translate waituntills into durations and return that list. """ if "ensureaverage_fixed_level" in self._funlist: raise NotImplementedError( 'There is an "ensureaverage_fixed_level"' " in this BluePrint. Cannot compute." ) funlist = self._funlist.copy() durations = self._durslist.copy() argslist = self._argslist no_of_waits = funlist.count("waituntil") waitpositions = [ii for ii, el in enumerate(funlist) if el == "waituntil"] # Calculate elapsed times for nw in range(no_of_waits): pos = waitpositions[nw] funlist[pos] = PulseAtoms.waituntil elapsed_time = sum(durations[:pos]) wait_time = argslist[pos][0] dur = wait_time - elapsed_time if dur < 0: raise ValueError( "Inconsistent timing. Can not wait until " f"{wait_time} at position {pos}." f" {elapsed_time} elapsed already" ) else: durations[pos] = dur return durations
[docs] def showPrint(self): """ Pretty-print the contents of the BluePrint. Not finished. """ # TODO: tidy up this method and make it use the description property if self._durslist is None: dl = [None] * len(self._namelist) else: dl = self._durslist datalists = [self._namelist, self._funlist, self._argslist, dl] lzip = zip(*datalists) print("Legend: Name, function, arguments, timesteps, durations") for ind, (name, fun, args, dur) in enumerate(lzip): ind_p = ind + 1 if fun == "waituntil": fun_p = fun else: fun_p = fun.__str__().split(" ")[1] list_p = [ind_p, name, fun_p, args, dur] print('Segment {}: "{}", {}, {}, {}'.format(*list_p)) print("-" * 10)
[docs] def changeArg(self, name, arg, value, replaceeverywhere=False): """ Change an argument of one or more of the functions in the blueprint. Args: name (str): The name of the segment in which to change an argument arg (Union[int, str]): Either the position (int) or name (str) of the argument to change value (Union[int, float]): The new value of the argument replaceeverywhere (bool): If True, the same argument is overwritten in ALL segments where the name matches. E.g. 'gaussian1' will match 'gaussian', 'gaussian2', etc. If False, only the segment with exact name match gets a replacement. Raises: ValueError: If the argument can not be matched (either the argument name does not match or the argument number is wrong). ValueError: If the name can not be matched. """ # TODO: is there any reason to use tuples internally? if replaceeverywhere: basename = BluePrint._basename name = basename(name) nmlst = self._namelist replacelist = [nm for nm in nmlst if basename(nm) == name] else: replacelist = [name] # Validation if name not in self._namelist: raise ValueError( "No segment of that name in blueprint." f" Contains segments: {self._namelist}" ) for name in replacelist: position = self._namelist.index(name) function = self._funlist[position] sig = signature(function) # Validation if isinstance(arg, str): if arg not in sig.parameters: raise ValueError( "No such argument of function " f"{function.__name__}. Has arguments " f"{sig.parameters.keys()}." ) # Each function has two 'secret' arguments, SR and dur user_params = len(sig.parameters) - 2 if isinstance(arg, int) and (arg not in range(user_params)): raise ValueError( f"No argument {arg} " f"of function {function.__name__}." f" Has {user_params} " "arguments." ) # allow the user to input single values instead of (val,) no_of_args = len(self._argslist[position]) if not isinstance(value, tuple) and no_of_args == 1: value = (value,) if isinstance(arg, str): for ii, param in enumerate(sig.parameters): if arg == param: arg = ii break # Mutating the immutable... larg = list(self._argslist[position]) larg[arg] = value self._argslist[position] = tuple(larg)
[docs] def changeDuration(self, name, dur, replaceeverywhere=False): """ Change the duration of one or more segments in the blueprint Args: name (str): The name of the segment in which to change duration dur (Union[float, int]): The new duration. replaceeverywhere (Optional[bool]): If True, the duration(s) is(are) overwritten in ALL segments where the name matches. E.g. 'gaussian1' will match 'gaussian', 'gaussian2', etc. If False, only the segment with exact name match gets a replacement. Raises: ValueError: If durations are not specified for the blueprint ValueError: If too many or too few durations are given. ValueError: If no segment matches the name. ValueError: If dur is not positive ValueError: If SR is given for the blueprint and dur is less than 1/SR. """ if not (isinstance(dur, float)) and not (isinstance(dur, int)): raise ValueError( f"New duration must be an int or a float. Received {type(dur)}" ) if replaceeverywhere: basename = BluePrint._basename name = basename(name) nmlst = self._namelist replacelist = [nm for nm in nmlst if basename(nm) == name] else: replacelist = [name] # Validation if name not in self._namelist: raise ValueError( "No segment of that name in blueprint." f" Contains segments: {self._namelist}" ) for name in replacelist: position = self._namelist.index(name) if dur <= 0: raise ValueError("Duration must be strictly greater than zero.") if self.SR is not None: if dur * self.SR < 1: raise ValueError( "Duration too short! Must be at least 1/sample rate." ) self._durslist[position] = dur
[docs] def setSR(self, SR): """ Set the associated sample rate Args: SR (Union[int, float]): The sample rate in Sa/s. """ self._SR = SR
[docs] def setSegmentMarker(self, name, specs, markerID): """ Bind a marker to a specific segment. Args: name (str): Name of the segment specs (tuple): Marker specification tuple, (delay, duration), where the delay is relative to the segment start markerID (int): Which marker channel to output on. Must be 1 or 2. """ if markerID not in [1, 2]: raise ValueError(f"MarkerID must be either 1 or 2. Received {markerID}.") markerselect = {1: self._segmark1, 2: self._segmark2} position = self._namelist.index(name) # TODO: Do we need more than one bound marker per segment? markerselect[markerID][position] = specs
[docs] def removeSegmentMarker(self, name: str, markerID: int) -> None: """ Remove all bound markers from a specific segment Args: name (str): Name of the segment markerID (int): Which marker channel to remove from (1 or 2). number (int): The number of the marker, in case several markers are bound to one element. Default: 1 (the first marker). """ if markerID not in [1, 2]: raise ValueError("MarkerID must be either 1 or 2. Received {markerID}.") markerselect = {1: self._segmark1, 2: self._segmark2} try: position = self._namelist.index(name) except ValueError: raise KeyError(f"No segment named {name} in this BluePrint.") markerselect[markerID][position] = (0, 0)
[docs] def copy(self): """ Returns a copy of the BluePrint """ # Needed because of input validation in __init__ namelist = [self._basename(name) for name in self._namelist.copy()] return BluePrint( self._funlist.copy(), self._argslist.copy(), namelist, self.marker1.copy(), self.marker2.copy(), self._segmark1.copy(), self._segmark2.copy(), self._SR, self._durslist, )
[docs] def insertSegment(self, pos, func, args=(), dur=None, name=None, durs=None): """ Insert a segment into the bluePrint. Args: pos (int): The position at which to add the segment. Counts like a python list; 0 is first, -1 is last. Values below -1 are not allowed, though. func (function): Function describing the segment. Must have its duration as the last argument (unless its a special function). args (Optional[Tuple[Any]]): Tuple of arguments BESIDES duration. Default: () dur (Optional[Union[int, float]]): The duration of the segment. Must be given UNLESS the segment is 'waituntil' or 'ensureaverage_fixed_level' name Optional[str]: Name of the segment. If none is given, the segment will receive the name of its function, possibly with a number appended. Raises: ValueError: If the position is negative ValueError: If the name ends in a number """ # Validation has_ensureavg = ( "ensureaverage_fixed_level" in self._funlist or "ensureaverage_fixed_dur" in self._funlist ) if func == "ensureaverage_fixed_level" and has_ensureavg: raise ValueError( 'Can not have more than one "ensureaverage" segment in a blueprint.' ) if durs is not None: warnings.warn( 'Deprecation warning: please specify "dur" rather ' 'than "durs" when inserting a segment' ) if dur is None: dur = durs else: raise ValueError('You can not specify "durs" AND "dur"!') # Take care of 'waituntil' # allow users to input single values if not isinstance(args, tuple): args = (args,) if pos < -1: raise ValueError("Position must be strictly larger than -1") if name is None or name == "": if func == "waituntil": name = "waituntil" else: name = func.__name__ elif isinstance(name, str): if len(name) > 0: if name[-1].isdigit(): raise ValueError("Segment name must not end in a number") if pos == -1: self._namelist.append(name) self._namelist = self._make_names_unique(self._namelist) self._funlist.append(func) self._argslist.append(args) self._segmark1.append((0, 0)) self._segmark2.append((0, 0)) self._durslist.append(dur) else: self._namelist.insert(pos, name) self._namelist = self._make_names_unique(self._namelist) self._funlist.insert(pos, func) self._argslist.insert(pos, args) self._segmark1.insert(pos, (0, 0)) self._segmark2.insert(pos, (0, 0)) self._durslist.insert(pos, dur)
[docs] def removeSegment(self, name): """ Remove the specified segment from the blueprint. Args: name (str): The name of the segment to remove. """ try: position = self._namelist.index(name) except ValueError: raise KeyError(f"No segment called {name} in blueprint.") del self._funlist[position] del self._argslist[position] del self._namelist[position] del self._segmark1[position] del self._segmark2[position] del self._durslist[position] self._namelist = self._make_names_unique(self._namelist)
def __add__(self, other): """ Add two BluePrints. The second argument is appended to the first and a new BluePrint is returned. Args: other (BluePrint): A BluePrint instance Returns: BluePrint: A new blueprint. Raises: ValueError: If the input is not a BluePrint instance """ if not isinstance(other, BluePrint): raise ValueError( f""" BluePrint can only be added to another Blueprint. Received an object of type {type(other)} """ ) nl = [self._basename(name) for name in self._namelist] nl += [self._basename(name) for name in other._namelist] al = self._argslist + other._argslist fl = self._funlist + other._funlist m1 = self.marker1 + other.marker1 m2 = self.marker2 + other.marker2 sm1 = self._segmark1 + other._segmark1 sm2 = self._segmark2 + other._segmark2 dl = self._durslist + other._durslist new_bp = BluePrint() new_bp._namelist = new_bp._make_names_unique(nl.copy()) new_bp._funlist = fl.copy() new_bp._argslist = al.copy() new_bp.marker1 = m1.copy() new_bp.marker2 = m2.copy() new_bp._segmark1 = sm1.copy() new_bp._segmark2 = sm2.copy() new_bp._durslist = dl.copy() if self.SR is not None: new_bp.setSR(self.SR) return new_bp def __eq__(self, other): """ Compare two blueprints. They are the same iff all lists are identical. Args: other (BluePrint): A BluePrint instance Returns: bool: whether the two blueprints are identical Raises: ValueError: If the input is not a BluePrint instance """ if not isinstance(other, BluePrint): raise ValueError( f""" Blueprint can only be compared to another Blueprint. Received an object of type {type(other)} """ ) if not self._namelist == other._namelist: return False if not self._funlist == other._funlist: return False if not self._argslist == other._argslist: return False if not self.marker1 == other.marker1: return False if not self.marker2 == other.marker2: return False if not self._segmark1 == other._segmark1: return False if not self._segmark2 == other._segmark2: return False return True
def _subelementBuilder( blueprint: BluePrint, SR: int, durs: list[float] ) -> dict[str, np.ndarray]: """ The function building a blueprint, returning a numpy array. This is the core translater from description of pulse to actual data points All arrays must be made with this function """ # Important: building the element must NOT modify any of the mutable # inputs, therefore all lists are copied funlist = blueprint._funlist.copy() argslist = blueprint._argslist.copy() namelist = blueprint._namelist.copy() marker1 = blueprint.marker1.copy() marker2 = blueprint.marker2.copy() segmark1 = blueprint._segmark1.copy() segmark2 = blueprint._segmark2.copy() durations = durs.copy() no_of_waits = funlist.count("waituntil") # handle waituntil by translating it into a normal function waitpositions = [ii for ii, el in enumerate(funlist) if el == "waituntil"] # Calculate elapsed times for nw in range(no_of_waits): pos = waitpositions[nw] funlist[pos] = PulseAtoms.waituntil elapsed_time = sum(durations[:pos]) wait_time = argslist[pos][0] dur = wait_time - elapsed_time if dur < 0: raise ValueError( "Inconsistent timing. Can not wait until " f"{wait_time} at position {pos}." f" {elapsed_time} elapsed already" ) else: durations[pos] = dur # When special segments like 'waituntil' and 'ensureaverage' get # evaluated, the list of durations gets updated. That new list # is newdurations newdurations = np.array(durations) # All waveforms must ultimately have an integer number of samples # Now figure out from the durations what these integers are # # The most honest thing to do is to simply round off dur*SR # and raise an exception if the segment ends up with less than # two points intdurations = np.zeros(len(newdurations), dtype=int) for ii, dur in enumerate(newdurations): int_dur = round(dur * SR) if int_dur < 2: raise SegmentDurationError( "Too short segment detected! " f'Segment "{namelist[ii]}" at position {ii} ' f"has a duration of {newdurations[ii]} which at " f"an SR of {SR:.3E} leads to just {int_dur} " "point(s). There must be at least " "2 points in each segment." "" ) else: intdurations[ii] = int_dur newdurations[ii] = int_dur / SR # The actual forging of the waveform wf_length = np.sum(intdurations) parts = [ft.partial(fun, *args) for (fun, args) in zip(funlist, argslist)] blocks = [p(SR, d) for (p, d) in zip(parts, intdurations)] output = np.fromiter( (block for sl in blocks for block in sl), float, count=wf_length ) # now make the markers time = np.linspace(0, sum(newdurations), wf_length, endpoint=False) m1 = np.zeros_like(time) m2 = np.zeros_like(time) # update the 'absolute time' marker list with 'relative time' # (segment bound) markers converted to absolute time elapsed_times = np.cumsum([0.0] + list(newdurations)) for pos, spec in enumerate(segmark1): if spec[1] != 0: ontime = elapsed_times[pos] + spec[0] # spec is (delay, duration) marker1.append((ontime, spec[1])) for pos, spec in enumerate(segmark2): if spec[1] != 0: ontime = elapsed_times[pos] + spec[0] # spec is (delay, duration) marker2.append((ontime, spec[1])) msettings = [marker1, marker2] marks = [m1, m2] for marker, setting in zip(marks, msettings): for t, dur in setting: ind = np.abs(time - t).argmin() chunk = int(np.round(dur * SR)) marker[ind : ind + chunk] = 1 outdict = { "wfm": output, "m1": m1, "m2": m2, "time": time, "newdurations": newdurations, } return outdict