Source code for qupulse.hardware.dacs.alazar

import dataclasses
from typing import Dict, Any, Optional, Tuple, List, Iterable, Callable, Sequence
from collections import defaultdict
import copy
import warnings
import math
import functools
import abc
import logging

import numpy as np

from atsaverage.config import ScanlineConfiguration
from atsaverage.masks import CrossBufferMask, Mask

from qupulse.utils.types import TimeType
from qupulse.hardware.dacs.dac_base import DAC
from qupulse.hardware.util import traced
from qupulse.utils.performance import time_windows_to_samples

logger = logging.getLogger(__name__)


def _windows_to_samples(begins: np.ndarray, lengths: np.ndarray,
                        sample_rate: TimeType) -> Tuple[np.ndarray, np.ndarray]:
    return time_windows_to_samples(begins, lengths, float(sample_rate))


[docs]@dataclasses.dataclass class AcquisitionProgram: _sample_rate: Optional[TimeType] = dataclasses.field(default=None) _masks: dict = dataclasses.field(default_factory=dict) @property def sample_rate(self) -> Optional[TimeType]: return self._sample_rate
[docs] def set_measurement_mask(self, mask_name: str, sample_rate: TimeType, begins: np.ndarray, lengths: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Raise error if sample factor has changed""" if self._sample_rate is None: self._sample_rate = sample_rate elif sample_rate != self.sample_rate: raise RuntimeError('class AcquisitionProgram has already masks with differing sample rate.') assert begins.dtype == float and lengths.dtype == float begins, lengths = self._masks[mask_name] = _windows_to_samples(begins, lengths, sample_rate) return begins, lengths
[docs] def clear_masks(self): self._masks.clear() self._sample_rate = None
[docs]@dataclasses.dataclass class AlazarProgram(AcquisitionProgram): operations: Sequence = dataclasses.field(default_factory=list) _total_length: Optional[int] = dataclasses.field(default=None) _auto_rearm_count: int = dataclasses.field(default=1) buffer_strategy: Optional = dataclasses.field(default=None)
[docs] def masks(self, mask_maker: Callable[[str, np.ndarray, np.ndarray], Mask]) -> List[Mask]: return [mask_maker(mask_name, *data) for mask_name, data in self._masks.items()]
@property def total_length(self) -> int: if not self._total_length: total_length = 0 for begins, lengths in self._masks.values(): total_length = max(begins[-1] + lengths[-1], total_length) return total_length else: return self._total_length @total_length.setter def total_length(self, val: int): self._total_length = val @property def auto_rearm_count(self) -> int: """This is passed to AlazarCard.startAcquisition. The card will (re-)arm automatically for this many times.""" return self._auto_rearm_count @auto_rearm_count.setter def auto_rearm_count(self, value: int): trigger_count = int(value) if trigger_count == 0: raise ValueError("Trigger count of 0 is not supported in qupulse (yet) because tracking the number of " "remaining triggers is too hard in case of infinity :(") if not 0 < trigger_count < 2**64: raise ValueError("Trigger count has to be in the interval [0, 2**64-1]") self._auto_rearm_count = trigger_count
[docs] def iter(self, mask_maker): yield self.masks(mask_maker) yield self.operations yield self.total_length
[docs]def gcd_set(data): return functools.reduce(math.gcd, data)
[docs]class BufferStrategy(metaclass=abc.ABCMeta): """This class defines the strategy how the buffer size is chosen. Buffers might impact the signal due to hardware imperfections. The aim of this class is to allow the user to work around that."""
[docs] @abc.abstractmethod def calculate_acquisition_properties(self, masks: Sequence[CrossBufferMask], buffer_length_divisor: int) -> Tuple[int, int]: """ Args: windows: Measurement windows in samples buffer_length_divisor: Necessary divisor of the buffer length Returns: A tuple (buffer_length, total_acquisition_length) """
[docs] @staticmethod def minimum_total_length(masks: Sequence[CrossBufferMask]) -> int: mtl = 0 for mask in masks: mtl = max(mtl, mask.begin[-1] + mask.length[-1]) return mtl
[docs]class ForceBufferSize(BufferStrategy):
[docs] def __init__(self, target_size: int): """ Args: aimed_size: Try to use that length """ super().__init__() self.target_buffer_size = target_size
[docs] def calculate_acquisition_properties(self, masks: Sequence[CrossBufferMask], buffer_length_divisor: int) -> Tuple[int, int]: buffer_size = int(self.target_buffer_size or buffer_length_divisor) if buffer_size % buffer_length_divisor: raise ValueError('Target size not possible for required buffer length divisor', buffer_size, buffer_length_divisor) mtl = self.minimum_total_length(masks) total_length = int(math.ceil(mtl / buffer_size) * buffer_size) return buffer_size, total_length
def __repr__(self): return 'ForceBufferSize(target_size=%r)' % self.target_buffer_size
[docs]class AvoidSingleBufferAcquisition(BufferStrategy): def __init__(self, wrapped_strategy: BufferStrategy): self.wrapped_strategy = wrapped_strategy
[docs] def calculate_acquisition_properties(self, masks: Sequence[CrossBufferMask], buffer_length_divisor: int) -> Tuple[int, int]: buffer_size, total_length = self.wrapped_strategy.calculate_acquisition_properties(masks, buffer_length_divisor) if buffer_size == total_length and buffer_size != buffer_length_divisor: # resize the buffer and recalculate total length # n is at least 2 n = total_length // buffer_length_divisor buffer_size = (n // 2) * buffer_length_divisor mtl = self.minimum_total_length(masks) total_length = int(math.ceil(mtl / buffer_size) * buffer_size) return buffer_size, total_length
def __repr__(self): return 'AvoidSingleBufferAcquisition(wrapped_strategy=%r)' % self.wrapped_strategy
[docs]class OneBufferPerWindow(BufferStrategy): """Choose the greatest common divisor of all window periods (diff(begin)) as buffer size. Aim is to only have an integer number of buffers in a measurement window."""
[docs] def calculate_acquisition_properties(self, masks: Sequence[CrossBufferMask], buffer_length_divisor: int) -> Tuple[int, int]: gcd = None for mask in masks: c_gcd = gcd_set(np.unique(np.diff(mask.begin.as_ndarray()))) if gcd is None: gcd = c_gcd else: gcd = math.gcd(gcd, c_gcd) buffer_size = max((gcd // buffer_length_divisor) * buffer_length_divisor, buffer_length_divisor) mtl = self.minimum_total_length(masks) total_length = int(math.ceil(mtl / buffer_size) * buffer_size) return buffer_size, total_length
def __repr__(self): return 'OneBufferPerWindow()'
[docs]@traced class AlazarCard(DAC): def __init__(self, card, config: Optional[ScanlineConfiguration]=None): self.__card = card self.__armed_program = None self.update_settings = True self.__definitions = dict() # this ScanlineConfig is used by default for each program # masks and operations are overwritten self.default_config = config # the currently active ScanlineConfig self._current_config = None self._buffer_strategy = None self._remaining_auto_triggers = 0 self._mask_prototypes = dict() # type: Dict self._registered_programs = defaultdict(AlazarProgram) # type: Dict[str, AlazarProgram] # defaults to self.__card.minimum_record_size if None # we use a page size here because this is allocated anyways for a buffer # This might lead to problems with small sample rates self._record_size_factor = 1024 * 4 @property def card(self) -> Any: return self.__card @property def record_size_factor(self) -> int: """The total record size of each measurement gets extended to be a multiple of this. None means that the minimal value supported by the card is taken.""" if self._record_size_factor is None: return self.__card.minimum_record_size else: return self._record_size_factor @record_size_factor.setter def record_size_factor(self, value: Optional[int]): self._record_size_factor = value @property def config(self): warnings.warn("AlazarCard.config is deprecated. Use AlazarCard.default_config or AlazarCard.current_config", DeprecationWarning) if self._current_config is None: return self.default_config else: return self._current_config @property def current_config(self): return self._current_config @property def buffer_strategy(self) -> BufferStrategy: if self._buffer_strategy is None: return AvoidSingleBufferAcquisition(ForceBufferSize(self.default_config.aimedBufferSize)) else: return self._buffer_strategy @buffer_strategy.setter def buffer_strategy(self, strategy): if strategy is not None and not isinstance(strategy, BufferStrategy): raise TypeError('Buffer strategy must be of type BufferStrategy or None') self._buffer_strategy = strategy def _make_mask(self, mask_id: str, begins, lengths) -> Mask: if mask_id not in self._mask_prototypes: raise KeyError('Measurement window {} can not be converted as it is not registered.'.format(mask_id)) hardware_channel, mask_type = self._mask_prototypes[mask_id] if mask_type not in ('auto', 'cross_buffer', None): warnings.warn("Currently only CrossBufferMask is implemented.") if np.any(begins[:-1]+lengths[:-1] > begins[1:]): raise ValueError('Found overlapping windows in begins') mask = CrossBufferMask() mask.identifier = mask_id mask.begin = begins mask.length = lengths mask.channel = hardware_channel return mask
[docs] def set_measurement_mask(self, program_name, mask_name, begins, lengths) -> Tuple[np.ndarray, np.ndarray]: sample_rate = TimeType.from_fraction(int(self.default_config.captureClockConfiguration.numeric_sample_rate(self.card.model)), 10**9) return self._registered_programs[program_name].set_measurement_mask(mask_name, sample_rate, begins, lengths)
[docs] def register_measurement_windows(self, program_name: str, windows: Dict[str, Tuple[np.ndarray, np.ndarray]]) -> None: program = self._registered_programs[program_name] sample_rate = TimeType.from_fraction(int(self.default_config.captureClockConfiguration.numeric_sample_rate(self.card.model)), 10 ** 9) program.clear_masks() for mask_name, (begins, lengths) in windows.items(): program.set_measurement_mask(mask_name, sample_rate, begins, lengths)
[docs] def register_operations(self, program_name: str, operations) -> None: self._registered_programs[program_name].operations = operations
[docs] def arm_program(self, program_name: str) -> None: logger.debug("Arming program %s on %r", program_name, self.__card) to_arm = self._registered_programs[program_name] if self.update_settings or self.__armed_program is not to_arm: logger.info("Arming %r by calling applyConfiguration. Update settings flag: %r", self.__card, self.update_settings) config = copy.deepcopy(self.default_config) config.masks, config.operations, total_record_size = self._registered_programs[program_name].iter( self._make_mask) sample_rate_in_hz = config.captureClockConfiguration.numeric_sample_rate(self.card.model) # sample rate in GHz sample_rate = TimeType.from_fraction(sample_rate_in_hz, 10 ** 9) if not config.operations: raise RuntimeError("No operations: Arming program without operations is an error as there will " "be no result: %r" % program_name) elif not config.masks: raise RuntimeError("No masks although there are operations in program: %r" % program_name) elif self._registered_programs[program_name].sample_rate != sample_rate: raise RuntimeError("Masks were registered with a different sample rate {}!={}".format( self._registered_programs[program_name].sample_rate, sample_rate)) assert total_record_size > 0 # extend the total record size to be a multiple of record_size_factor record_size_factor = self.record_size_factor total_record_size = (((total_record_size - 1) // record_size_factor) + 1) * record_size_factor if config.totalRecordSize == 0: config.totalRecordSize = total_record_size elif config.totalRecordSize < total_record_size: raise ValueError('specified total record size is smaller than needed {} < {}'.format(config.totalRecordSize, total_record_size)) self.__card.applyConfiguration(config, True) self._current_config = config self.update_settings = False self.__armed_program = to_arm elif self.__armed_program is to_arm and self._remaining_auto_triggers > 0: self._remaining_auto_triggers -= 1 logger.info("Relying on atsaverage auto-arm with %d auto triggers remaining after this one", self._remaining_auto_triggers) return self.__card.startAcquisition(to_arm.auto_rearm_count) self._remaining_auto_triggers = to_arm.auto_rearm_count - 1
[docs] def delete_program(self, program_name: str) -> None: self._registered_programs.pop(program_name)
# todo [2018-06-14]: what if program to delete is currently armed?
[docs] def clear(self) -> None: self._registered_programs.clear() self.__armed_program = None
@property def mask_prototypes(self) -> Dict[str, Tuple[int, str]]: return self._mask_prototypes
[docs] def register_mask_for_channel(self, mask_id: str, hw_channel: int, mask_type='auto') -> None: """ Args: mask_id: Identifier of the measurement windows hw_channel: Associated hardware channel (0, 1, 2, 3) mask_type: Either 'auto' or 'periodical """ if hw_channel not in range(4): raise ValueError('{} is not a valid hw channel'.format(hw_channel)) if mask_type not in ('auto', 'cross_buffer', None): raise NotImplementedError('Currently only can do cross buffer mask') self._mask_prototypes[mask_id] = (hw_channel, mask_type)
[docs] def measure_program(self, channels: Iterable[str]) -> Dict[str, np.ndarray]: """ Get all measurements at once and write them in a dictionary. """ scanline_data = self.__card.extractNextScanline() scanline_definition = scanline_data.definition operation_definitions = {operation.identifier: operation for operation in scanline_definition.operations} mask_definitions = {mask.identifier: mask for mask in scanline_definition.masks} def get_input_range(operation_id: str): # currently does not work for ComputeMomentDefinition :( mask_id = operation_definitions[operation_id].maskID hw_channel = int(mask_definitions[mask_id].channel) # This fails if new changes have been applied to the card in the meantime # It is better than self.config.inputConfiguration but still return self.__card.scanConfiguration.inputConfiguration[hw_channel].inputRange data = {} for op_name in channels: input_range = get_input_range(op_name) data[op_name] = scanline_data.operationResults[op_name].getAsVoltage(input_range) return data