import dataclasses
from typing import Union, Iterable, Dict, Tuple, Mapping, Optional
from types import MappingProxyType
import logging
import numpy
from qupulse.utils.types import TimeType
from qupulse.hardware.dacs.dac_base import DAC
from qupulse.hardware.dacs.alazar import AlazarProgram
import atsaverage
from atsaverage.masks import make_best_mask
from atsaverage.config2 import BoardConfiguration, create_scanline_definition, BufferStrategySettings
logger = logging.getLogger(__name__)
[docs]class AlazarCard(DAC):
def __init__(self, atsaverage_card: 'atsaverage.core.AlazarCard'):
super().__init__()
self._atsaverage_card = atsaverage_card
self._registered_programs = {}
# for auto retrigger
self._armed_program: Optional[str] = None
self._remaining_auto_triggers = 0
# for debugging purposes
self._raw_data_mask = None
self.default_buffer_strategy: Optional[BufferStrategySettings] = None
# sadly this is required to associate masks with their corresponding channels
# the better place for this would be in the MeasurementMask class but we do not want to touch it
# to avoid breaking experiments
# TODO: possible improvement is wildcard/regex support but this is complicated to maintain
# (competing matches etc)
self._mask_name_to_hw_channel = {}
@property
def atsaverage_card(self):
return self._atsaverage_card
@property
def registered_programs(self) -> Mapping[str, AlazarProgram]:
return MappingProxyType(self._registered_programs)
@property
def current_sample_rate_in_giga_herz(self) -> TimeType:
numeric_sample_rate = self._atsaverage_card.board_configuration_cache.get_numeric_sample_rate()
if numeric_sample_rate is None:
raise RuntimeError("The sample rate was not set yet. The instrument does not support retrieving the sample "
"rate via an API. We need to cache a set command.")
return TimeType.from_fraction(numeric_sample_rate, 10 ** 9)
[docs] def register_measurement_windows(self, program_name: str, windows: Dict[str, Tuple[numpy.ndarray,
numpy.ndarray]]) -> None:
program = self._registered_programs.setdefault(program_name, AlazarProgram())
sample_rate = self.current_sample_rate_in_giga_herz
program.clear_masks()
for mask_name, (begins, lengths) in windows.items():
program.set_measurement_mask(mask_name, sample_rate, begins, lengths)
[docs] def set_measurement_mask(self, program_name: str, mask_name: str,
begins: numpy.ndarray, lengths: numpy.ndarray) -> Tuple[numpy.ndarray, numpy.ndarray]:
program = self._registered_programs.setdefault(program_name, AlazarProgram())
return program.set_measurement_mask(mask_name, self.current_sample_rate_in_giga_herz, begins, lengths)
[docs] def register_operations(self, program_name: str, operations) -> None:
self._registered_programs.setdefault(program_name, AlazarProgram()).operations = operations
def _make_scanline_definition(self, program: AlazarProgram):
sample_rate_in_ghz = self.current_sample_rate_in_giga_herz
sample_rate_in_hz = int(sample_rate_in_ghz * 10 ** 9)
masks = program.masks(make_best_mask)
for mask in masks:
try:
mask.channel = self._mask_name_to_hw_channel[mask.identifier]
except KeyError as err:
raise KeyError(f"There was no hardware channel registered for the mask {mask!r}",
mask.identifier) from err
if sample_rate_in_ghz != program.sample_rate:
raise RuntimeError("Masks were registered with a different sample rate")
return create_scanline_definition(masks, program.operations,
raw_data_mask=self._raw_data_mask,
board_spec=self._atsaverage_card.get_board_spec(),
buffer_strategy=program.buffer_strategy,
numeric_sample_rate=sample_rate_in_hz)
def _prepare_program(self, program: AlazarProgram):
scanline_definition = self._make_scanline_definition(program)
self._atsaverage_card.configureMeasurement(scanline_definition)
[docs] def arm_program(self, program_name: str) -> None:
logger.debug("Arming program %s on %r", program_name, self._atsaverage_card)
if program_name == self._armed_program and self._remaining_auto_triggers > 0:
logger.info("Relying on atsaverage auto-arm with %d auto triggers remaining after this one: %d",
self._remaining_auto_triggers)
else:
program = self._registered_programs[program_name]
scanline_definition = self._make_scanline_definition(program)
self._atsaverage_card.configureMeasurement(scanline_definition)
self._atsaverage_card.startAcquisition(program.auto_rearm_count)
self._remaining_auto_triggers = program.auto_rearm_count - 1
[docs] def delete_program(self, program_name: str) -> None:
self._registered_programs.pop(program_name)
[docs] def clear(self) -> None:
self._registered_programs.clear()
[docs] def measure_program(self, channels: Iterable[str] = None) -> Dict[str, numpy.ndarray]:
scanline_data = self._atsaverage_card.extractNextScanline()
if channels is None:
channels = scanline_data.operationResults.keys()
scanline_definition = scanline_data.definition
operation_definitions = {operation.identifier: operation
for operation in scanline_definition.operations}
mask_definitions = {mask.identifier: mask
for mask in scanline_definition.masks}
def get_input_range(operation_id: str):
mask_id = operation_definitions[operation_id].maskID
hw_channel = int(mask_definitions[mask_id].channel)
return self.get_current_input_range(hw_channel)
data = {}
for op_name in channels:
input_range = get_input_range(op_name)
data[op_name] = scanline_data.operationResults[op_name].getAsVoltage(input_range)
return data
[docs] def register_mask_for_channel(self, mask_id: str, hw_channel: int) -> None:
"""
Args:
mask_id: Identifier of the measurement windows
hw_channel: Associated hardware channel (0, 1, 2, 3)
"""
if hw_channel not in range(4):
raise ValueError('{} is not a valid hw channel'.format(hw_channel))
self._mask_name_to_hw_channel[mask_id] = hw_channel