Source code for qupulse.hardware.awgs.zihdawg

import numbers
from pathlib import Path
import functools
from typing import Tuple, Set, Callable, Optional, Mapping, Generator, Union, Sequence, Dict
from enum import Enum
import weakref
import logging
import warnings
import pathlib
import hashlib
import argparse
import re
from abc import abstractmethod, ABC

try:
    # zhinst fires a DeprecationWarning from its own code in some versions...
    with warnings.catch_warnings():
        warnings.simplefilter('ignore', DeprecationWarning)
        import zhinst.utils
except ImportError:
    warnings.warn('Zurich Instruments LabOne python API is distributed via the Python Package Index. Install with pip.')
    raise

try:
    from zhinst import core as zhinst_core
except ImportError:
    # backward compability
    from zhinst import ziPython as zhinst_core

import time

from qupulse.utils.types import ChannelID, TimeType, time_from_float
from qupulse.program.loop import Loop, make_compatible
from qupulse._program.seqc import HDAWGProgramManager, UserRegister, WaveformFileSystem
from qupulse.hardware.awgs.base import AWG, ChannelNotFoundException, AWGAmplitudeOffsetHandling
from qupulse.hardware.util import traced


logger = logging.getLogger('qupulse.hdawg')


[docs]def valid_channel(function_object): """Check if channel is a valid AWG channels. Expects channel to be 2nd argument after self.""" @functools.wraps(function_object) def valid_fn(*args, **kwargs): if len(args) < 2: raise HDAWGTypeError('Channel is an required argument.') channel = args[1] # Expect channel to be second positional argument after self. if channel not in range(1, 9): raise ChannelNotFoundException(channel) value = function_object(*args, **kwargs) return value return valid_fn
def _amplitude_scales(api_session, serial: str): return tuple( api_session.getDouble(f'/{serial}/awgs/{ch // 2:d}/outputs/{ch % 2:d}/amplitude') for ch in range(8) ) def _sigout_double(api_session, prop: str, serial: str, channel: int, value: float = None) -> float: """Query channel offset voltage and optionally set it.""" node_path = f'/{serial}/sigouts/{channel-1:d}/{prop}' if value is not None: api_session.setDouble(node_path, value) api_session.sync() # Global sync: Ensure settings have taken effect on the device. return api_session.getDouble(node_path) def _sigout_range(api_session, serial: str, channel: int, voltage: float = None) -> float: return _sigout_double(api_session, 'range', serial, channel, voltage) def _sigout_offset(api_session, serial: str, channel: int, voltage: float = None) -> float: return _sigout_double(api_session, 'offset', serial, channel, voltage) def _sigout_on(api_session, serial: str, channel: int, value: bool = None) -> bool: """Query channel signal output status (enabled/disabled) and optionally set it. Corresponds to front LED.""" node_path = f'/{serial}/sigouts/{channel-1:d}/on' if value is not None: api_session.setInt(node_path, value) api_session.sync() # Global sync: Ensure settings have taken effect on the device. return bool(api_session.getInt(node_path))
[docs]@traced class HDAWGRepresentation: """HDAWGRepresentation represents an HDAWG8 instruments and manages a LabOne data server api session. A data server must be running and the device be discoverable. Channels are per default grouped into pairs."""
[docs] def __init__(self, device_serial: str = None, device_interface: str = '1GbE', data_server_addr: str = 'localhost', data_server_port: int = 8004, api_level_number: int = 6, reset: bool = False, timeout: float = 20, grouping: 'HDAWGChannelGrouping' = None) -> None: """ :param device_serial: Device serial that uniquely identifies this device to the LabOne data server :param device_interface: Either '1GbE' for ethernet or 'USB' :param data_server_addr: Data server address. Must be already running. Default: localhost :param data_server_port: Data server port. Default: 8004 for HDAWG, MF and UHF devices :param api_level_number: Version of API to use for the session, higher number, newer. Default: 6 most recent :param reset: Reset device before initialization :param timeout: Timeout in seconds for uploading """ self._api_session = zhinst_core.ziDAQServer(data_server_addr, data_server_port, api_level_number) assert zhinst.utils.api_server_version_check(self.api_session) # Check equal data server and api version. self.api_session.connectDevice(device_serial, device_interface) self.default_timeout = timeout self._dev_ser = device_serial if reset: # Create a base configuration: Disable all available outputs, awgs, demods, scopes,... zhinst.utils.disable_everything(self.api_session, self.serial) self._initialize() waveform_path = pathlib.Path(self.api_session.awgModule().getString('directory'), 'awg', 'waves') self._waveform_file_system = WaveformFileSystem.get_waveform_file_system(waveform_path) self._channel_groups: Dict[HDAWGChannelGrouping, Tuple[HDAWGChannelGroup, ...]] = {} # TODO: lookup method to find channel count n_channels = 8 for grouping in HDAWGChannelGrouping: group_size = grouping.group_size() if group_size is None: # MDS groups = [ MDSChannelGroup(self.group_name(0, None), self.default_timeout) ] else: groups = [] for group_idx in range(n_channels // group_size): groups.append(SingleDeviceChannelGroup(group_idx, group_size, identifier=self.group_name(group_idx, group_size), timeout=self.default_timeout)) self._channel_groups[grouping] = tuple(groups) if grouping is None: grouping = self.channel_grouping # activates channel groups self.channel_grouping = grouping
@property def waveform_file_system(self) -> WaveformFileSystem: return self._waveform_file_system @property def channel_tuples(self) -> Tuple['HDAWGChannelGroup', ...]: return self._get_groups(self.channel_grouping) @property def channel_pair_AB(self) -> 'HDAWGChannelGroup': return self._channel_groups[HDAWGChannelGrouping.CHAN_GROUP_4x2][0] @property def channel_pair_CD(self) -> 'HDAWGChannelGroup': return self._channel_groups[HDAWGChannelGrouping.CHAN_GROUP_4x2][1] @property def channel_pair_EF(self) -> 'HDAWGChannelGroup': return self._channel_groups[HDAWGChannelGrouping.CHAN_GROUP_4x2][2] @property def channel_pair_GH(self) -> 'HDAWGChannelGroup': return self._channel_groups[HDAWGChannelGrouping.CHAN_GROUP_4x2][3] @property def api_session(self) -> zhinst_core.ziDAQServer: return self._api_session @property def serial(self) -> str: return self._dev_ser def _initialize(self) -> None: settings = [(f'/{self.serial}/awgs/*/userregs/*', 0), # Reset all user registers to 0. (f'/{self.serial}/*/single', 1)] # Single execution mode of sequence. for ch in range(0, 8): # Route marker 1 signal for each channel to marker output. if ch % 2 == 0: output = HDAWGTriggerOutSource.OUT_1_MARK_1.value else: output = HDAWGTriggerOutSource.OUT_1_MARK_2.value settings.append(['/{}/triggers/out/{}/source'.format(self.serial, ch), output]) self.api_session.set(settings) self.api_session.sync() # Global sync: Ensure settings have taken effect on the device.
[docs] def reset(self) -> None: zhinst.utils.disable_everything(self.api_session, self.serial) self._initialize() for tuple in self.channel_tuples: tuple.clear() self.api_session.set([ (f'/{self.serial}/awgs/*/time', 0), (f'/{self.serial}/sigouts/*/range', HDAWGVoltageRange.RNG_1V.value), (f'/{self.serial}/awgs/*/outputs/*/amplitude', 1.0), (f'/{self.serial}/outputs/*/modulation/mode', HDAWGModulationMode.OFF.value), ]) # marker outputs marker_settings = [] for ch in range(0, 8): # Route marker 1 signal for each channel to marker output. if ch % 2 == 0: output = HDAWGTriggerOutSource.OUT_1_MARK_1.value else: output = HDAWGTriggerOutSource.OUT_1_MARK_2.value marker_settings.append([f'/{self.serial}/triggers/out/{ch}/source', output]) self.api_session.set(marker_settings) self.api_session.sync()
[docs] def group_name(self, group_idx, group_size) -> str: if group_size is None: return f'{self.serial}_MDS' return str(self.serial) + '_' + 'ABCDEFGH'[group_idx*group_size:][:group_size]
def _get_groups(self, grouping: 'HDAWGChannelGrouping') -> Tuple['HDAWGChannelGroup', ...]: try: return self._channel_groups[grouping] except KeyError: # python reload... for grouping_key, group in self._channel_groups.items(): if grouping_key.value == grouping.value: return group else: raise @property def channel_grouping(self) -> 'HDAWGChannelGrouping': grouping = self.api_session.getInt(f'/{self.serial}/SYSTEM/AWG/CHANNELGROUPING') return HDAWGChannelGrouping(grouping) @channel_grouping.setter def channel_grouping(self, channel_grouping: 'HDAWGChannelGrouping'): # ipython reload ... if not type(channel_grouping).__name__ == 'HDAWGChannelGrouping': raise HDAWGTypeError('Channel grouping must be an enum of type "HDAWGChannelGrouping" to avoid confusions ' 'between enum value and group size.') old_channel_grouping = self.channel_grouping if old_channel_grouping != channel_grouping: self.api_session.setInt(f'/{self.serial}/AWGS/*/ENABLE', 0) self.api_session.setInt(f'/{self.serial}/SYSTEM/AWG/CHANNELGROUPING', channel_grouping.value) # disable old groups for group in self._get_groups(old_channel_grouping): group.disconnect_group() if channel_grouping.value == HDAWGChannelGrouping.MDS.value and not self._is_mds_master(): # do not connect channel group return for group in self._get_groups(channel_grouping): if not group.is_connected(): group.connect_group(self)
[docs] @valid_channel def offset(self, channel: int, voltage: float = None) -> float: """Query channel offset voltage and optionally set it.""" return _sigout_offset(self.api_session, self.serial, channel, voltage)
[docs] @valid_channel def range(self, channel: int, voltage: float = None) -> float: """Query channel voltage range and optionally set it. The instruments selects the next higher available range. This is the one-sided range Vp. Total range: -Vp...Vp""" return _sigout_range(self.api_session, self.serial, channel, voltage)
[docs] @valid_channel def output(self, channel: int, status: bool = None) -> bool: """Query channel signal output status (enabled/disabled) and optionally set it. Corresponds to front LED.""" return _sigout_on(self.api_session, self.serial, channel, status)
[docs] def get_status_table(self): """Return node tree of instrument with all important settings, as well as each channel group as tuple.""" return (self.api_session.get('/{}/*'.format(self.serial)), self.channel_pair_AB.awg_module.get('awgModule/*'), self.channel_pair_CD.awg_module.get('awgModule/*'), self.channel_pair_EF.awg_module.get('awgModule/*'), self.channel_pair_GH.awg_module.get('awgModule/*'))
def _get_mds_group_idx(self) -> Optional[int]: idx = 0 while True: try: if self.serial in self.api_session.getString(f'/ZI/MDS/GROUPS/{idx}/DEVICES'): return idx except RuntimeError: break idx += 1 def _is_mds_master(self) -> Optional[bool]: idx = 0 while True: try: devices = self.api_session.getString(f'/ZI/MDS/GROUPS/{idx}/DEVICES').split(',') except RuntimeError: break if self.serial in devices: return devices[0] == self.serial idx += 1 def __repr__(self): return f"{type(self).__name__}({self.serial}, ... {self.api_session})"
[docs]class HDAWGTriggerOutSource(Enum): """Assign a signal to a marker output. This is per AWG Core.""" AWG_TRIG_1 = 0 # Trigger output assigned to AWG trigger 1, controlled by AWG sequencer commands. AWG_TRIG_2 = 1 # Trigger output assigned to AWG trigger 2, controlled by AWG sequencer commands. AWG_TRIG_3 = 2 # Trigger output assigned to AWG trigger 3, controlled by AWG sequencer commands. AWG_TRIG_4 = 3 # Trigger output assigned to AWG trigger 4, controlled by AWG sequencer commands. OUT_1_MARK_1 = 4 # Trigger output assigned to output 1 marker 1. OUT_1_MARK_2 = 5 # Trigger output assigned to output 1 marker 2. OUT_2_MARK_1 = 6 # Trigger output assigned to output 2 marker 1. OUT_2_MARK_2 = 7 # Trigger output assigned to output 2 marker 2. TRIG_IN_1 = 8 # Trigger output assigned to trigger inout 1. TRIG_IN_2 = 9 # Trigger output assigned to trigger inout 2. TRIG_IN_3 = 10 # Trigger output assigned to trigger inout 3. TRIG_IN_4 = 11 # Trigger output assigned to trigger inout 4. TRIG_IN_5 = 12 # Trigger output assigned to trigger inout 5. TRIG_IN_6 = 13 # Trigger output assigned to trigger inout 6. TRIG_IN_7 = 14 # Trigger output assigned to trigger inout 7. TRIG_IN_8 = 15 # Trigger output assigned to trigger inout 8. HIGH = 17 # Trigger output is set to high. LOW = 18 # Trigger output is set to low.
[docs]class HDAWGChannelGrouping(Enum): """How many independent sequencers should run on the AWG and how the outputs should be grouped by sequencer.""" MDS = -1 # All channels that are in the current multi device synchronized group CHAN_GROUP_4x2 = 0 # 4x2 with HDAWG8; 2x2 with HDAWG4. /dev.../awgs/0..3/ CHAN_GROUP_2x4 = 1 # 2x4 with HDAWG8; 1x4 with HDAWG4. /dev.../awgs/0 & 2/ CHAN_GROUP_1x8 = 2 # 1x8 with HDAWG8. /dev.../awgs/0/
[docs] def group_size(self) -> int: return { HDAWGChannelGrouping.CHAN_GROUP_4x2: 2, HDAWGChannelGrouping.CHAN_GROUP_2x4: 4, HDAWGChannelGrouping.CHAN_GROUP_1x8: 8, HDAWGChannelGrouping.MDS: None }[self]
[docs]class HDAWGVoltageRange(Enum): """All available voltage ranges for the HDAWG wave outputs. Define maximum output voltage.""" RNG_5V = 5 RNG_4V = 4 RNG_3V = 3 RNG_2V = 2 RNG_1V = 1 RNG_800mV = 0.8 RNG_600mV = 0.6 RNG_400mV = 0.4 RNG_200mV = 0.2
[docs]class HDAWGModulationMode(Enum): """Modulation mode of waveform generator.""" OFF = 0 # AWG output goes directly to signal output. SINE_1 = 1 # AWG output multiplied with sine generator signal 0. SINE_2 = 2 # AWG output multiplied with sine generator signal 1. FG_1 = 3 # AWG output multiplied with function generator signal 0. Requires FG option. FG_2 = 4 # AWG output multiplied with function generator signal 1. Requires FG option. ADVANCED = 5 # AWG output modulates corresponding sines from modulation carriers.
[docs]@traced class HDAWGChannelGroup(AWG): MIN_WAVEFORM_LEN = 192 WAVEFORM_LEN_QUANTUM = 16 def __init__(self, identifier: str, timeout: float) -> None: super().__init__(identifier) self.timeout = timeout self._awg_module = None self._program_manager = HDAWGProgramManager() self._elf_manager = None self._required_seqc_source = self._program_manager.to_seqc_program() self._uploaded_seqc_source = None self._current_program = None # Currently armed program. self._upload_generator = () self._master_device = None def _initialize_awg_module(self): """Only run once""" if self._awg_module: self._awg_module.clear() self._awg_module = self.master_device.api_session.awgModule() self._awg_module.set('awgModule/device', self.master_device.serial) self._awg_module.set('awgModule/index', self.awg_group_index) self._awg_module.execute() self._elf_manager = ELFManager.DEFAULT_CLS(self._awg_module) self._upload_generator = () @property def master_device(self) -> HDAWGRepresentation: """Reference to HDAWG representation.""" if self._master_device is None: raise HDAWGValueError('Channel group is currently not connected') return self._master_device @property def awg_module(self) -> zhinst_core.AwgModule: """Each AWG channel group has its own awg module to manage program compilation and upload.""" if self._awg_module is None: raise HDAWGValueError('Channel group is not connected and was never initialized') return self._awg_module @property @abstractmethod def awg_group_index(self) -> int: raise NotImplementedError() @property def num_markers(self) -> int: """Number of marker channels""" return 2 * self.num_channels
[docs] def upload(self, name: str, program: Loop, channels: Tuple[Optional[ChannelID], ...], markers: Tuple[Optional[ChannelID], ...], voltage_transformation: Tuple[Callable, ...], force: bool = False) -> None: """Upload a program to the AWG. Physically uploads all waveforms required by the program - excluding those already present - to the device and sets up playback sequences accordingly. This method should be cheap for program already on the device and can therefore be used for syncing. Programs that are uploaded should be fast(~1 sec) to arm. Args: name: A name for the program on the AWG. program: The program (a sequence of instructions) to upload. channels: Tuple of length num_channels that ChannelIDs of in the program to use. Position in the list corresponds to the AWG channel markers: List of channels in the program to use. Position in the List in the list corresponds to the AWG channel voltage_transformation: transformations applied to the waveforms extracted rom the program. Position in the list corresponds to the AWG channel force: If a different sequence is already present with the same name, it is overwritten if force is set to True. (default = False) Known programs are handled in host memory most of the time. Only when uploading the device memory is touched at all. Returning from setting user register in seqc can take from 50ms to 60 ms. Fluctuates heavily. Not a good way to have deterministic behaviour "setUserReg(PROG_SEL, PROG_IDLE);". """ if len(channels) != self.num_channels: raise HDAWGValueError('Channel ID not specified') if len(markers) != self.num_markers: raise HDAWGValueError('Markers not specified') if len(voltage_transformation) != self.num_channels: raise HDAWGValueError('Wrong number of voltage transformations') if name in self.programs and not force: raise HDAWGValueError('{} is already known on {}'.format(name, self.identifier)) # Go to qupulse nanoseconds time base. q_sample_rate = self.sample_rate / 10**9 # Adjust program to fit criteria. make_compatible(program, minimal_waveform_length=self.MIN_WAVEFORM_LEN, waveform_quantum=self.WAVEFORM_LEN_QUANTUM, sample_rate=q_sample_rate) if self._amplitude_offset_handling == AWGAmplitudeOffsetHandling.IGNORE_OFFSET: voltage_offsets = (0.,) * self.num_channels elif self._amplitude_offset_handling == AWGAmplitudeOffsetHandling.CONSIDER_OFFSET: voltage_offsets = self.offsets() else: raise ValueError('{} is invalid as AWGAmplitudeOffsetHandling'.format(self._amplitude_offset_handling)) amplitudes = self.amplitudes() if name in self._program_manager.programs: self._program_manager.remove(name) self._program_manager.add_program(name, program, channels=channels, markers=markers, voltage_transformations=voltage_transformation, sample_rate=q_sample_rate, amplitudes=amplitudes, offsets=voltage_offsets) self._required_seqc_source = self._program_manager.to_seqc_program() self._program_manager.waveform_memory.sync_to_file_system(self.master_device.waveform_file_system) # start compiling the source (non-blocking) self._start_compile_and_upload()
def _start_compile_and_upload(self): self._uploaded_seqc_source = None self._upload_generator = self._elf_manager.compile_and_upload(self._required_seqc_source) logger.debug(f"_start_compile_and_upload: %r", next(self._upload_generator, "Finished")) def _wait_for_compile_and_upload(self): for state in self._upload_generator: logger.debug("wait_for_compile_and_upload: %r", state) time.sleep(.1) self._uploaded_seqc_source = self._required_seqc_source logger.debug("AWG %d: wait_for_compile_and_upload has finished", self.awg_group_index)
[docs] def was_current_program_finished(self) -> bool: """Return true if the current program has finished at least once""" playback_finished_mask = int(HDAWGProgramManager.Constants.PLAYBACK_FINISHED_MASK, 2) return bool(self.user_register(HDAWGProgramManager.Constants.PROG_SEL_REGISTER) & playback_finished_mask)
[docs] def set_volatile_parameters(self, program_name: str, parameters: Mapping[str, numbers.Real]): """Set the values of parameters which were marked as volatile on program creation.""" new_register_values = self._program_manager.get_register_values_to_update_volatile_parameters(program_name, parameters) if self._current_program == program_name: for register, value in new_register_values.items(): self.user_register(register, value)
[docs] def remove(self, name: str) -> None: """Remove a program from the AWG. Also discards all waveforms referenced only by the program identified by name. Args: name: The name of the program to remove. """ self._program_manager.remove(name) self._required_seqc_source = self._program_manager.to_seqc_program()
[docs] def clear(self) -> None: """Removes all programs and waveforms from the AWG. Caution: This affects all programs and waveforms on the AWG, not only those uploaded using qupulse! """ self._program_manager.clear() self._current_program = None self._required_seqc_source = self._program_manager.to_seqc_program() self._start_compile_and_upload() self.arm(None)
[docs] def arm(self, name: Optional[str]) -> None: """Load the program 'name' and arm the device for running it. If name is None the awg will "dearm" its current program. Currently hardware triggering is not implemented. The HDAWGProgramManager needs to emit code that calls `waitDigTrigger` to do that. """ if self.num_channels > 8: if name is None: self._required_seqc_source = "" else: self._required_seqc_source = self._program_manager.to_seqc_program(name) self._start_compile_and_upload() if self._required_seqc_source != self._uploaded_seqc_source: self._wait_for_compile_and_upload() assert self._required_seqc_source == self._uploaded_seqc_source, "_wait_for_compile_and_upload did not work " \ "as expected." self.user_register(self._program_manager.Constants.TRIGGER_REGISTER, 0) if name is None: self.user_register(self._program_manager.Constants.PROG_SEL_REGISTER, self._program_manager.Constants.PROG_SEL_NONE) self._current_program = None else: if name not in self.programs: raise HDAWGValueError('{} is unknown on {}'.format(name, self.identifier)) self._current_program = name # set the registers of initial repetition counts for register, value in self._program_manager.get_register_values(name).items(): assert register not in (self._program_manager.Constants.PROG_SEL_REGISTER, self._program_manager.Constants.TRIGGER_REGISTER) self.user_register(register, value) self.user_register(self._program_manager.Constants.PROG_SEL_REGISTER, self._program_manager.name_to_index(name) | int(self._program_manager.Constants.NO_RESET_MASK, 2)) if name is not None: self.enable(True)
[docs] def run_current_program(self) -> None: """Run armed program.""" if self._current_program is not None: if self._current_program not in self.programs: raise HDAWGValueError('{} is unknown on {}'.format(self._current_program, self.identifier)) if not self.enable(): self.enable(True) self.user_register(self._program_manager.Constants.TRIGGER_REGISTER, int(self._program_manager.Constants.TRIGGER_RESET_MASK, 2)) else: raise HDAWGRuntimeError('No program active')
@property def programs(self) -> Set[str]: """The set of program names that can currently be executed on the hardware AWG.""" return set(self._program_manager.programs.keys()) @property def sample_rate(self) -> TimeType: """The default sample rate of the AWG channel group.""" node_path = '/{}/awgs/{}/time'.format(self.master_device.serial, self.awg_group_index) sample_rate_num = self.master_device.api_session.getInt(node_path) node_path = '/{}/system/clocks/sampleclock/freq'.format(self.master_device.serial) sample_clock = self.master_device.api_session.getDouble(node_path) """Calculate exact rational number based on (sample_clock Sa/s) / 2^sample_rate_num. Otherwise numerical imprecision will give rise to errors for very long pulses. fractions.Fraction does not accept floating point numerator, which sample_clock could potentially be.""" return time_from_float(sample_clock) / 2 ** sample_rate_num
[docs] def connect_group(self, hdawg_device: HDAWGRepresentation): self.disconnect_group() self._master_device = weakref.proxy(hdawg_device) self._initialize_awg_module() # Seems creating AWG module sets SINGLE (single execution mode of sequence) to 0 per default. self.master_device.api_session.setInt(f'/{self.master_device.serial}/awgs/0/single', 1)
[docs] def disconnect_group(self): """Disconnect this group from device so groups of another size can be used""" if self._awg_module: self.awg_module.clear() self._master_device = None self._elf_manager = None self._upload_generator = ()
[docs] def is_connected(self) -> bool: return self._master_device is not None
[docs] def user_register(self, reg: UserRegister, value: int = None) -> int: """Query user registers (1-16) and optionally set it. Args: reg: User register. If it is an int, a warning is raised and it is interpreted as a one based index value: Value to set Returns: User Register value after setting it """ if isinstance(reg, int): warnings.warn("User register is not a UserRegister instance. It is interpreted as one based index.") reg = UserRegister(one_based_value=reg) if reg.to_web_interface() not in range(1, 17): raise HDAWGValueError(f'{reg:!r} not a valid (1-16) register.') node_path = '/{}/awgs/{:d}/userregs/{:labone}'.format(self.master_device.serial, self.awg_group_index, reg) if value is not None: self.master_device.api_session.setInt(node_path, value) # hackedy for mds_serial in getattr(self, '_mds_devices', [])[1:]: self.master_device.api_session.setInt(node_path.replace(self.master_device.serial, mds_serial), value) self.master_device.api_session.sync() # Global sync: Ensure settings have taken effect on the device. return self.master_device.api_session.getInt(node_path)
[docs]@traced class MDSChannelGroup(HDAWGChannelGroup): def __init__(self, identifier: str, timeout: float) -> None: super().__init__(identifier, timeout) self._master_device = None self._mds_devices = None @property def num_channels(self) -> int: """Number of channels""" return len(self._mds_devices) * 8 @property def awg_group_index(self): return 0
[docs] def disconnect_group(self): super().disconnect_group() self._mds_devices = None
[docs] def connect_group(self, hdawg_device: HDAWGRepresentation): mds_group = hdawg_device._get_mds_group_idx() if mds_group is None: raise HDAWGException("AWG not in any MDS group", hdawg_device) mds_devices = hdawg_device.api_session.getString(f'/ZI/MDS/GROUPS/{mds_group}/DEVICES').split(',') if hdawg_device.serial != mds_devices[0]: raise HDAWGException("Only the master device can connect to the HDAWG MDS channel group.") super().connect_group(hdawg_device) self._mds_devices = mds_devices
[docs] def enable(self, status: bool = None) -> bool: """Start the AWG sequencer.""" # There is also 'awgModule/awg/enable', which seems to have the same functionality. node_path = '/{}/awgs/{:d}/enable'.format(self.master_device.serial, 0) if status is not None: self.awg_module.set('awg/enable', int(status)) else: status = self.awg_module.get('awg/module') #return bool(status) """ if status is not None: self.master_device.api_session.setInt(node_path, int(status)) for mds_device in self._mds_devices[1:]: self.master_device.api_session.setInt(node_path.replace(self._mds_devices[0], mds_device), int(status)) self.master_device.api_session.sync() # Global sync: Ensure settings have taken effect on the device. """ return bool(self.master_device.api_session.getInt(node_path))
[docs] def amplitudes(self) -> Tuple[float, ...]: """Query AWG channel amplitude value (not peak to peak). From manual: The final signal amplitude is given by the product of the full scale output range of 1 V[in this example], the dimensionless amplitude scaling factor 1.0, and the actual dimensionless signal amplitude stored in the waveform memory.""" amplitudes = [] api_session = self.master_device.api_session for mds_device in self._mds_devices: amplitude_scales = _amplitude_scales(api_session, mds_device) ranges = [_sigout_range(api_session, mds_device, ch) for ch in range(1, 9)] amplitudes.extend(zi_amplitude * zi_range / 2 for zi_amplitude, zi_range in zip(amplitude_scales, ranges)) return tuple(amplitudes)
[docs] def offsets(self) -> Tuple[float, ...]: offsets = [] api_session = self.master_device.api_session for mds_device in self._mds_devices: offsets.extend(_sigout_offset(api_session, mds_device, ch) for ch in range(1, 9)) return tuple(offsets)
[docs]class SingleDeviceChannelGroup(HDAWGChannelGroup): def __init__(self, group_idx: int, group_size: int, identifier: str, timeout: float) -> None: super().__init__(identifier, timeout) self._device = None assert group_idx in range(4) assert group_size in (2, 4, 8) self._group_idx = group_idx self._group_size = group_size @property def num_channels(self) -> int: """Number of channels""" return self._group_size def _channels(self, index_start=1) -> Tuple[int, ...]: """1 indexed channel""" offset = index_start + self._group_size * self._group_idx return tuple(ch + offset for ch in range(self.num_channels)) @property def awg_group_index(self) -> int: """AWG node group index assuming 4x2 channel grouping. Then 0...3 will give appropriate index of group.""" return self._group_idx @property def user_directory(self) -> str: """LabOne user directory with subdirectories: "awg/src" (seqc sourcefiles), "awg/elf" (compiled AWG binaries), "awag/waves" (user defined csv waveforms).""" return self.awg_module.getString('awgModule/directory')
[docs] def enable(self, status: bool = None) -> bool: """Start the AWG sequencer.""" # There is also 'awgModule/awg/enable', which seems to have the same functionality. node_path = '/{}/awgs/{:d}/enable'.format(self.master_device.serial, self.awg_group_index) if status is not None: self.master_device.api_session.setInt(node_path, int(status)) self.master_device.api_session.sync() # Global sync: Ensure settings have taken effect on the device. return bool(self.master_device.api_session.getInt(node_path))
[docs] def amplitudes(self) -> Tuple[float, ...]: """Query AWG channel amplitude value (not peak to peak). From manual: The final signal amplitude is given by the product of the full scale output range of 1 V[in this example], the dimensionless amplitude scaling factor 1.0, and the actual dimensionless signal amplitude stored in the waveform memory.""" amplitudes = [] for ch, zi_amplitude in zip(self._channels(), _amplitude_scales(self.master_device.api_session, self.master_device.serial)): zi_range = self.master_device.range(ch) amplitudes.append(zi_amplitude * zi_range / 2) return tuple(amplitudes)
[docs] def offsets(self) -> Tuple[float, ...]: return tuple(map(self.master_device.offset, self._channels()))
[docs]class ELFManager(ABC): DEFAULT_CLS = None
[docs] class AWGModule:
[docs] def __init__(self, awg_module: zhinst_core.AwgModule): """Provide an easily mockable interface to the zhinst AwgModule object""" self._module = awg_module
@property def src_dir(self) -> pathlib.Path: return pathlib.Path(self._module.getString('directory'), 'awg', 'src') @property def elf_dir(self) -> pathlib.Path: return pathlib.Path(self._module.getString('directory'), 'awg', 'elf') @property def compiler_start(self) -> bool: """True if the compiler is running""" return self._module.getInt('compiler/start') == 1 @compiler_start.setter def compiler_start(self, value: bool): """Set true to start the compiler""" self._module.set('compiler/start', value) @property def compiler_status(self) -> Tuple[int, str]: return self._module.getInt('compiler/status'), self._module.getString('compiler/statusstring') @property def compiler_source_file(self) -> str: return self._module.getString('compiler/sourcefile') @compiler_source_file.setter def compiler_source_file(self, source_file: str): self._module.set('compiler/sourcefile', source_file) @property def compiler_source_string(self) -> str: return self._module.getString('compiler/sourcestring') @compiler_source_string.setter def compiler_source_string(self, source_string: str): self._module.set('compiler/sourcestring', source_string) @property def compiler_upload(self) -> bool: """auto upload after compiling""" return self._module.getInt('compiler/upload') == 1 @compiler_upload.setter def compiler_upload(self, value: bool): self._module.set('compiler/upload', value) @property def elf_file(self) -> str: return self._module.getString('elf/file') @elf_file.setter def elf_file(self, elf_file: str): self._module.set('elf/file', elf_file) @property def elf_upload(self) -> bool: return bool(self._module.getInt('elf/upload')) @elf_upload.setter def elf_upload(self, value: bool): self._module.set('elf/upload', value) @property def elf_status(self) -> Tuple[int, float]: return self._module.getInt('elf/status'), self._module.getDouble('progress') @property def index(self) -> int: return self._module.getInt('index')
[docs] def __init__(self, awg_module: zhinst_core.AwgModule): """This class organizes compiling and uploading of compiled programs. The source code file is named based on the code hash to cache compilation results. This requires that the waveform names are unique. The compilation and upload itself are done asynchronously by zhinst.core. To avoid spawning a useless thread for updating the status the method :py:meth:`~ELFManager.compile_and_upload` returns a generator which talks to the undelying library when needed.""" self.awg_module = self.AWGModule(awg_module) # automatically upload after successful compilation self.awg_module.compiler_upload = True self._compile_job = None # type: Optional[Union[str, Tuple[str, int, str]]] self._upload_job = None # type: Optional[Union[Tuple[str, float], Tuple[str, int]]]
[docs] def clear(self): """Deletes all files with a SHA512 hash name""" src_regex = re.compile(r'[a-z0-9]{128}\.seqc') elf_regex = re.compile(r'[a-z0-9]{128}\.elf') for p in self.awg_module.src_dir.iterdir(): if src_regex.match(p.name): p.unlink() for p in self.awg_module.elf_dir.iterdir(): if elf_regex.match(p.name): p.unlink()
@staticmethod def _source_hash(source_string: str) -> str: """Calulate the SHA512 hash of the given source. Args: source_string: seqc source code Returns: hex representation of SHA512 `source_string` hash """ # use utf-16 because str is UTF16 on most relevant machines (Windows) return hashlib.sha512(bytes(source_string, 'utf-16')).hexdigest()
[docs] @abstractmethod def compile_and_upload(self, source_string: str) -> Generator[str, str, None]: """The function returns a generator that yields the current state of the progress. The generator is empty iff the upload is complete. An exception is raised if there is an error. To abort send 'abort' to the generator. (not implemented :P) Example: >>> my_source = 'playWave("my_wave");' >>> for state in elf_manager.compile_and_upload(my_source): ... print('Current state:', state) ... time.sleep(1) Args: source_string: Source code to compile Returns: Generator object that needs to be consumed """
[docs]class SimpleELFManager(ELFManager):
[docs] def __init__(self, awg_module: zhinst.ziPython.AwgModule): """This implementation does not attempt to do something clever like caching.""" super().__init__(awg_module)
[docs] def compile_and_upload(self, source_string: str) -> Generator[str, str, None]: self.awg_module.compiler_upload = True self.awg_module.compiler_source_string = source_string while True: status, msg = self.awg_module.compiler_status if status == - 1: yield 'compiling' elif status == 0: break elif status == 1: raise HDAWGCompilationException(msg) elif status == 2: logger.warning("Compiler warings: %s", msg) break else: raise RuntimeError("Unexpected status", status, msg) while True: status_int, progress = self.awg_module.elf_status if progress == 1.0: break elif status_int == 1: HDAWGUploadException(self.awg_module.compiler_status) else: yield 'uploading @ %d%%' % (100*progress)
ELFManager.DEFAULT_CLS = SimpleELFManager
[docs]class CachingELFManager(ELFManager):
[docs] def __init__(self, awg_module: zhinst.ziPython.AwgModule): """FAILS TO UPLOAD THE CORRECT ELF FOR SOME REASON""" super().__init__(awg_module) # automatically upload after successful compilation self.awg_module.compiler_upload = True self._compile_job = None # type: Optional[Union[str, Tuple[str, int, str]]] self._upload_job = None # type: Optional[Union[Tuple[str, float], Tuple[str, int]]]
def _update_compile_job_status(self): """Store current compile status in self._compile_job.""" compiler_start = self.awg_module.compiler_start if self._compile_job is None: assert compiler_start == 0 elif isinstance(self._compile_job, str): if compiler_start: logger.debug("Compiler is running.") else: compiler_status, status_string = self.awg_module.compiler_status assert compiler_status in (-1, 0, 1, 2) if compiler_status == -1: raise RuntimeError('Compile job is set but no compilation is running', status_string) elif compiler_status == 2: logger.warning("AWG %d: Compilation finished with warning: %s", self.awg_module.index, status_string) self._compile_job = (self._compile_job, compiler_status, status_string) def _start_compile_job(self, source_file): logger.debug("Starting compilation of %r", source_file) self._update_compile_job_status() assert not isinstance(self._compile_job, str) self.awg_module.compiler_source_file = source_file self.awg_module.compiler_start = True self._compile_job = source_file logger.debug("AWG %d: Compilation of %r started", self.awg_module.index, source_file) def _compile(self, source_file) -> Generator[str, str, None]: self._start_compile_job(source_file) while True: self._update_compile_job_status() if not isinstance(self._compile_job, str): # finished compiling logger.debug("AWG %d: Compilation of %r finished", self.awg_module.index, source_file) break cmd = yield 'compiling' if cmd is None: logger.debug('No command received during compiling') elif cmd == 'abort': raise NotImplementedError('clean abort not implemented') else: raise HDAWGValueError('Unknown command', cmd) _, status_int, status_str = self._compile_job if status_int == 1: raise HDAWGRuntimeError('Compilation failed', status_str) logger.info("AWG %d: Compilation of %r successful", self.awg_module.index, source_file) def _start_elf_upload(self, elf_file): logger.debug("Uploading %r", elf_file) current_elf = self.awg_module.elf_file if current_elf != elf_file: logger.info("AWG %d: Overwriting elf file", self.awg_module.index) self.awg_module.elf_file = elf_file self.awg_module.elf_upload = True self._upload_job = (elf_file, None) time.sleep(.001) def _update_upload_job_status(self): elf_upload = self.awg_module.elf_upload if self._upload_job is None: assert not elf_upload return elf_file, old_status = self._upload_job assert self.awg_module.elf_file == elf_file if isinstance(old_status, float) or old_status is None: status_int, progress = self.awg_module.elf_status if status_int == 2: # in progress assert elf_upload == 1 self._upload_job = elf_file, progress else: # fetch new value here self._upload_job = elf_file, status_int else: logger.debug('AWG %d: _update_upload_job_status called on finished upload', self.awg_module.index) assert elf_upload == 0 def _upload(self, elf_file) -> Generator[str, str, None]: if self.awg_module.compiler_upload: pass else: self._start_elf_upload(elf_file) while True: self._update_upload_job_status() _, status = self._upload_job if isinstance(status, int): assert status in (-1, 0, 1) if status == 1: raise RuntimeError('ELF upload failed') else: break else: progress = status logger.debug('AWG %d: Upload progress is %d%%', self.awg_module.index, progress*100) cmd = yield 'uploading @ %d%%' % (100*progress) if cmd is None: logger.debug("No command received during upload") if cmd == 'abort': # TODO: check if this stops the upload self.awg_module.elf_upload = False raise NotImplementedError('Abort upload not cleanly implemented') else: raise HDAWGValueError('Unknown command', cmd) # enable auto upload on compilation again # TODO: research whether this is necessary # self.awg_module.elf_file = ''
[docs] def compile_and_upload(self, source_string: str) -> Generator[str, str, None]: """The source code is saved to a file determined by the source hash, compiled and uploaded to the instrument. The function returns a generator that yields the current state of the progress. The generator is empty iff the upload is complete. An exception is raised if there is an error. To abort send 'abort' to the generator. Example: >>> my_source = 'playWave("my_wave");' >>> for state in elf_manager.compile_and_upload(my_source): ... print('Current state:', state) ... time.sleep(1) Args: source_string: Source code to compile Returns: Generator object that needs to be consumed """ self._update_compile_job_status() if isinstance(self._compile_job, str): raise NotImplementedError('cannot upload: compilation in progress') source_hash = self._source_hash(source_string) seqc_file_name = '%s.seqc' % source_hash elf_file_name = '%s.elf' % source_hash full_source_name = self.awg_module.src_dir.joinpath(seqc_file_name) full_elf_name = self.awg_module.elf_dir.joinpath(elf_file_name) if not full_source_name.exists(): full_source_name.write_text(source_string, 'utf-8') # we assume same source == same program here if not full_elf_name.exists(): yield from self._compile(seqc_file_name) else: # set this so the web interface shows the correct source # self.awg_module.compiler_source_file = seqc_file_name logger.info('Already compiled. ELF: %r', elf_file_name) yield from self._upload(elf_file_name)
[docs]class HDAWGException(Exception): """Base exception class for HDAWG errors.""" pass
[docs]class HDAWGValueError(HDAWGException, ValueError): pass
[docs]class HDAWGTypeError(HDAWGException, TypeError): pass
[docs]class HDAWGRuntimeError(HDAWGException, RuntimeError): pass
[docs]class HDAWGIOError(HDAWGException, IOError): pass
[docs]class HDAWGTimeoutError(HDAWGException, TimeoutError): pass
[docs]class HDAWGCompilationException(HDAWGException): def __init__(self, msg): self.msg = msg def __str__(self) -> str: return "Compilation failed: {}".format(self.msg)
[docs]class HDAWGUploadException(HDAWGException): def __str__(self) -> str: return "Upload to the instrument failed."
[docs]def get_group_for_channels(hdawg: HDAWGRepresentation, channels: Set[int]) -> HDAWGChannelGroup: channels = set(channels) assert not channels - set(range(8)), "Channels must be in 0..=7" channel_range = range(min(channels) // 2 * 2, (max(channels) + 2) // 2 * 2) if len(channel_range) > 4 or len(channel_range) == 4 and channel_range.start == 2: c = (HDAWGChannelGrouping.CHAN_GROUP_1x8, 0) elif len(channel_range) == 4: assert channel_range.start in (0, 4) c = (HDAWGChannelGrouping.CHAN_GROUP_2x4, channel_range.start // 4) else: assert len(channel_range) == 2 c = (HDAWGChannelGrouping.CHAN_GROUP_4x2, channel_range.start // 2) hdawg.channel_grouping = c[0] return hdawg.channel_tuples[c[1]]
[docs]def example_upload(hdawg_kwargs: dict, channels: Set[int], markers: Set[Tuple[int, int]]): # pragma: no cover from qupulse.pulses import TablePT, SequencePT, RepetitionPT if isinstance(hdawg_kwargs, dict): hdawg = HDAWGRepresentation(**hdawg_kwargs) else: hdawg = hdawg_kwargs assert not set(channels) - set(range(8)), "Channels must be in 0..=7" channels = sorted(channels) required_channels = {*channels, *(ch for ch, _ in markers)} channel_group = get_group_for_channels(hdawg, required_channels) channel_group_channels = range(channel_group.awg_group_index * channel_group.num_channels, (channel_group.awg_group_index + 1) * channel_group.num_channels) # choose length based on minimal sample rate sample_rate = channel_group.sample_rate / 10**9 min_t = channel_group.MIN_WAVEFORM_LEN / sample_rate quant_t = channel_group.WAVEFORM_LEN_QUANTUM / sample_rate assert min_t > 4 * quant_t, "Example not updated" entry_list1 = [(0, 0), (quant_t * 2, .2, 'hold'), (min_t, .3, 'linear'), (min_t + 3*quant_t, 0, 'jump')] entry_list2 = [(0, 0), (quant_t * 3, -.2, 'hold'), (min_t, -.3, 'linear'), (min_t + 4*quant_t, 0, 'jump')] entry_list3 = [(0, 0), (quant_t * 1, -.2, 'linear'), (min_t, -.3, 'linear'), (2*min_t, 0, 'jump')] entry_lists = [entry_list1, entry_list2, entry_list3] entry_dict1 = {ch: entry_lists[:2][i % 2] for i, ch in enumerate(channels)} entry_dict2 = {ch: entry_lists[1::-1][i % 2] for i, ch in enumerate(channels)} entry_dict3 = {ch: entry_lists[2:0:-1][i % 2] for i, ch in enumerate(channels)} tpt1 = TablePT(entry_dict1, measurements=[('m', 20, 30)]) tpt2 = TablePT(entry_dict2) tpt3 = TablePT(entry_dict3, measurements=[('m', 10, 50)]) rpt = RepetitionPT(tpt1, 4) spt = SequencePT(tpt2, rpt) rpt2 = RepetitionPT(spt, 2) spt2 = SequencePT(rpt2, tpt3) p = spt2.create_program() upload_ch = tuple(ch if ch in channels else None for ch in channel_group_channels) upload_mk = (None,) * channel_group.num_markers upload_vt = (lambda x: x,) * channel_group.num_channels channel_group.upload('pulse_test1', p, upload_ch, upload_mk, upload_vt) if markers: markers = sorted(markers) assert len(markers) == len(set(markers)) channel_group_markers = tuple((ch, mk) for ch in channel_group_channels for mk in (0, 1)) full_on = [(0, 1), (min_t, 1)] two_3rd = [(0, 1), (min_t*2/3, 0), (min_t, 0)] one_3rd = [(0, 0), (min_t*2/3, 1), (min_t, 1)] marker_start = TablePT({'m0': full_on, 'm1': full_on}) marker_body = TablePT({'m0': two_3rd, 'm1': one_3rd}) marker_test_pulse = marker_start @ RepetitionPT(marker_body, 10000) marker_program = marker_test_pulse.create_program() upload_ch = (None, ) * channel_group.num_channels upload_mk = tuple(f"m{mk}" if (ch, mk) in markers else None for (ch, mk) in channel_group_markers) channel_group.upload('marker_test', marker_program, upload_ch, upload_mk, upload_vt) try: while True: for program in channel_group.programs: print(f'playing {program}') channel_group.arm(program) channel_group.run_current_program() while not channel_group.was_current_program_finished(): print(f'waiting for {program} to finish') time.sleep(1e-2) finally: channel_group.enable(False)
if __name__ == "__main__": import sys args = argparse.ArgumentParser('Upload an example pulse to a HDAWG') args.add_argument('device_serial', help='device serial of the form dev1234') args.add_argument('device_interface', help='device interface', choices=['USB', '1GbE'], default='1GbE', nargs='?') args.add_argument('--channels', help='channels to use', choices=range(8), default=[0, 1], type=int, nargs='+') args.add_argument('--markers', help='markers to use', choices=range(8*2), default=[], type=int, nargs='*') parsed = vars(args.parse_args()) channels = parsed.pop('channels') markers = [(m // 2, m % 2) for m in parsed.pop('markers')] logging.basicConfig(stream=sys.stdout) logger.setLevel(logging.DEBUG) example_upload(hdawg_kwargs=parsed, channels=channels, markers=markers)