Source code for qcodes_contrib_drivers.drivers.Keysight.SD_common.memory_manager
# -*- coding: utf-8 -*-
from dataclasses import dataclass
from typing import List, Dict
import logging
from datetime import datetime
[docs]
class MemoryManager:
"""
Memory manager for AWG memory.
AWG memory is reserved in slots of sizes from 1e4 till 1e8 samples.
Allocation of memory takes time. So, only request a high maximum waveform size when it is needed.
Memory slots (number: size):
400: 1e4 samples
100: 1e5 samples
20: 1e6 samples
8: 1e7 samples
4: 1e8 samples
Args:
waveform_size_limit: maximum waveform size to support.
"""
verbose = False
[docs]
@dataclass
class AllocatedSlot:
number: int
allocation_ref: int
memory_manager: 'MemoryManager'
[docs]
def release(self) -> None:
self.memory_manager.release(self)
@dataclass
class _MemorySlot:
number: int
size: int
allocated: bool
initialized: bool
allocation_ref: int
'''Unique reference value when allocated.
Used to check for incorrect or missing release calls.
'''
allocation_time: str = ''
# Note (M3202A): size must be multiples of 10 and >= 2000
memory_sizes = [
(int(1e4), 400),
(int(1e5), 100),
(int(1e6), 20),
(int(1e7), 8), # Uploading 8e7 samples takes 1.5s.
(int(1e8), 4) # Uploading 4e8 samples takes 7.3s.
]
def __init__(self, log, waveform_size_limit: int = int(1e6)) -> None:
self._log = log
self._allocation_ref_count: int = 0
self._created_size: int = 0
self._max_waveform_size: int = 0
self._free_memory_slots: Dict[int, List[int]] = {}
self._slots: List[MemoryManager._MemorySlot] = []
self._slot_sizes = sorted([size for size, _ in
MemoryManager.memory_sizes])
self.set_waveform_limit(waveform_size_limit)
[docs]
def get_uninitialized_slots(self) -> List['MemoryManager._MemorySlot']:
"""
Returns list of slots that must be initialized (reserved in AWG)
"""
new_slots = []
slots = self._slots.copy()
for slot in slots:
if not slot.initialized:
new_slots.append(slot)
slot.initialized = True
return new_slots
[docs]
def allocate(self, wave_size: int) -> 'MemoryManager.AllocatedSlot':
"""
Allocates a memory slot with at least the specified wave size.
Args:
wave_size: number of samples of the waveform
Returns:
allocated slot
"""
if wave_size > self._max_waveform_size:
raise Exception(f'AWG wave with {wave_size} samples is too long. '
f'Max size={self._max_waveform_size}. Increase '
f'waveform size limit with set_waveform_limit().')
for slot_size in self._slot_sizes:
if wave_size > slot_size:
continue
if slot_size > self._created_size:
# slots of this size are not initialized.
break
if len(self._free_memory_slots[slot_size]) > 0:
slot = self._free_memory_slots[slot_size].pop(0)
self._allocation_ref_count += 1
self._slots[slot].allocation_ref = self._allocation_ref_count
self._slots[slot].allocated = True
self._slots[slot].allocation_time = datetime.now().strftime('%H:%M:%S.%f')
if MemoryManager.verbose:
self._log.debug(f'Allocated slot {slot}')
return MemoryManager.AllocatedSlot(slot, self._slots[slot].allocation_ref, self)
raise Exception(f'No free memory slots left for waveform with'
f' {wave_size} samples.')
[docs]
def release(self, allocated_slot: AllocatedSlot) -> None:
"""
Releases the `allocated_slot`.
"""
slot_number = allocated_slot.number
slot = self._slots[slot_number]
if not slot.allocated:
raise Exception(f'memory slot {slot_number} not in use')
if slot.allocation_ref != allocated_slot.allocation_ref:
raise Exception(f'memory slot {slot_number} allocation reference '
f'mismatch:{slot.allocation_ref} is not equal to '
f'{allocated_slot.allocation_ref}')
slot.allocated = False
slot.allocation_ref = 0
self._free_memory_slots[slot.size].append(slot_number)
if MemoryManager.verbose:
try:
self._log.debug(f'Released slot {slot_number}')
except:
# self._log throws exception when instrument has been closed.
logging.debug(f'Released slot {slot_number}')
[docs]
def release_all(self) -> None:
"""
Release all allocated slots regardless of external references.
"""
for slot in self._slots:
if slot.allocated:
self._log.info(f'Forced release of slot {slot.number} '
f'allocated at {slot.allocation_time}')
slot.allocated = False
slot.allocation_ref = 0
self._free_memory_slots[slot.size].append(slot.number)
def _create_memory_slots(self, max_size: int) -> None:
creation_limit = self._get_slot_size(max_size)
free_slots = self._free_memory_slots
slots = self._slots
for size, amount in sorted(MemoryManager.memory_sizes):
if size > creation_limit:
break
if size <= self._created_size:
continue
free_slots[size] = []
for i in range(amount):
number = len(slots)
free_slots[size].append(number)
slots.append(MemoryManager._MemorySlot(number, size, False, False, 0))
self._free_memory_slots = free_slots
self._slots = slots
self._created_size = creation_limit
def _get_slot_size(self, size: int) -> int:
for slot_size in self._slot_sizes:
if slot_size >= size:
return slot_size
raise Exception(f'Requested waveform size {size} is too big')
[docs]
def mem_usage(self):
'''
Example:
pprint(awg._memory_manager.mem_usage(), sort_dicts=False)
'''
result = {}
result[' Block size'] = ['Created', 'Allocated']
for size in self._slot_sizes:
result[str(size)] = [0,0,0]
for slot in self._slots:
stats = result[str(slot.size)]
stats[0] += 1
stats[1] += slot.allocated
result['Free'] = {size:len(slots) for size,slots in self._free_memory_slots.items()}
return result
[docs]
def allocation_state(self):
'''
Example:
pprint(awg._memory_manager.allocation_state())
'''
result = {}
result[' Free'] = {size:len(slots) for size,slots in self._free_memory_slots.items()}
result['Allocated'] = [slot for slot in self._slots if slot.allocated]
return result