import functools
import logging
import numbers
import sys
import weakref
import warnings
from typing import List, Tuple, Set, Callable, Optional, Any, cast, Union, Dict, Mapping, NamedTuple, Iterable,\
Collection, Sequence
from collections import OrderedDict
import numpy as np
from qupulse import ChannelID
from qupulse.program.loop import Loop, make_compatible
from qupulse.hardware.feature_awg.channel_tuple_wrapper import ChannelTupleAdapter
from qupulse.hardware.feature_awg.features import ChannelSynchronization, AmplitudeOffsetHandling, VoltageRange, \
ProgramManagement, ActivatableChannels, DeviceControl, StatusTable, SCPI, VolatileParameters, \
ReadProgram, RepetitionMode
from qupulse.hardware.util import voltage_to_uint16, find_positions
from qupulse.utils.types import TimeType
from qupulse.hardware.feature_awg.base import AWGChannelTuple, AWGChannel, AWGDevice, AWGMarkerChannel
from qupulse._program.tabor import TaborSegment, TaborException, TaborProgram, PlottableProgram, TaborSequencing, \
make_combined_wave
import tabor_control.device
import pyvisa
assert (sys.byteorder == "little")
__all__ = ["TaborDevice", "TaborChannelTuple", "TaborChannel"]
TaborProgramMemory = NamedTuple("TaborProgramMemory", [("waveform_to_segment", np.ndarray),
("program", TaborProgram)])
def with_configuration_guard(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[
["TaborChannelTuple"], Any]:
"""This decorator assures that the AWG is in configuration mode while the decorated method runs."""
@functools.wraps(function_object)
def guarding_method(channel_pair: "TaborChannelTuple", *args, **kwargs) -> Any:
if channel_pair._configuration_guard_count == 0:
channel_pair._enter_config_mode()
channel_pair._configuration_guard_count += 1
try:
return function_object(channel_pair, *args, **kwargs)
finally:
channel_pair._configuration_guard_count -= 1
if channel_pair._configuration_guard_count == 0:
channel_pair._exit_config_mode()
return guarding_method
def with_select(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[["TaborChannelTuple"], Any]:
"""Asserts the channel pair is selcted when the wrapped function is called"""
@functools.wraps(function_object)
def selector(channel_tuple: "TaborChannelTuple", *args, **kwargs) -> Any:
channel_tuple._select()
return function_object(channel_tuple, *args, **kwargs)
return selector
########################################################################################################################
# Device
########################################################################################################################
# Features
class TaborSCPI(SCPI):
def __init__(self, device: "TaborDevice", visa: pyvisa.resources.MessageBasedResource):
super().__init__(visa)
self._parent = weakref.ref(device)
def send_cmd(self, cmd_str, paranoia_level=None):
for instr in self._parent().all_devices:
instr.send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level)
def send_query(self, query_str, query_mirrors=False) -> Any:
if query_mirrors:
return tuple(instr.send_query(query_str) for instr in self._parent().all_devices)
else:
return self._parent().main_instrument.send_query(query_str)
def _send_cmd(self, cmd_str, paranoia_level=None) -> Any:
"""Overwrite send_cmd for paranoia_level > 3"""
if paranoia_level is None:
paranoia_level = self._parent().paranoia_level
if paranoia_level < 3:
self._parent().super().send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) # pragma: no cover
else:
cmd_str = cmd_str.rstrip()
if len(cmd_str) > 0:
ask_str = cmd_str + "; *OPC?; :SYST:ERR?"
else:
ask_str = "*OPC?; :SYST:ERR?"
*answers, opc, error_code_msg = self._parent()._visa_inst.ask(ask_str).split(";")
error_code, error_msg = error_code_msg.split(",")
error_code = int(error_code)
if error_code != 0:
_ = self._parent()._visa_inst.ask("*CLS; *OPC?")
if error_code == -450:
# query queue overflow
self.send_cmd(cmd_str)
else:
raise RuntimeError("Cannot execute command: {}\n{}: {}".format(cmd_str, error_code, error_msg))
assert len(answers) == 0
class TaborChannelSynchronization(ChannelSynchronization):
"""This Feature is used to synchronise a certain ammount of channels"""
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = weakref.ref(device)
def synchronize_channels(self, group_size: int) -> None:
"""
Synchronize in groups of `group_size` channels. Groups of synchronized channels will be provided as
AWGChannelTuples. The channel_size must be evenly dividable by the number of channels
Args:
group_size: Number of channels per channel tuple
"""
if group_size == 2:
self._parent()._channel_tuples = []
for i in range((int)(len(self._parent().channels) / group_size)):
self._parent()._channel_tuples.append(
TaborChannelTuple((i + 1),
self._parent(),
self._parent().channels[(i * group_size):((i * group_size) + group_size)],
self._parent().marker_channels[(i * group_size):((i * group_size) + group_size)])
)
self._parent()[SCPI].send_cmd(":INST:COUP:STAT OFF")
elif group_size == 4:
self._parent()._channel_tuples = [TaborChannelTuple(1,
self._parent(),
self._parent().channels,
self._parent().marker_channels)]
self._parent()[SCPI].send_cmd(":INST:COUP:STAT ON")
else:
raise TaborException("Invalid group size")
class TaborDeviceControl(DeviceControl):
"""This feature is used for basic communication with a AWG"""
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = weakref.ref(device)
def reset(self) -> None:
"""
Resetting the whole device. A command for resetting is send to the Device, the device is initialized again and
all channel tuples are cleared.
"""
self._parent()[SCPI].send_cmd(":RES")
self._parent()._coupled = None
self._parent()._initialize()
for channel_tuple in self._parent().channel_tuples:
channel_tuple[TaborProgramManagement].clear()
def trigger(self) -> None:
"""
This method triggers a device remotely.
"""
self._parent()[SCPI].send_cmd(":TRIG")
class TaborStatusTable(StatusTable):
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = device
def get_status_table(self) -> Dict[str, Union[str, float, int]]:
"""
Send a lot of queries to the AWG about its settings. A good way to visualize is using pandas.DataFrame
Returns:
An ordered dictionary with the results
"""
name_query_type_list = [("channel", ":INST:SEL?", int),
("coupling", ":OUTP:COUP?", str),
("volt_dc", ":SOUR:VOLT:LEV:AMPL:DC?", float),
("volt_hv", ":VOLT:HV?", float),
("offset", ":VOLT:OFFS?", float),
("outp", ":OUTP?", str),
("mode", ":SOUR:FUNC:MODE?", str),
("shape", ":SOUR:FUNC:SHAPE?", str),
("dc_offset", ":SOUR:DC?", float),
("freq_rast", ":FREQ:RAST?", float),
("gated", ":INIT:GATE?", str),
("continuous", ":INIT:CONT?", str),
("continuous_enable", ":INIT:CONT:ENAB?", str),
("continuous_source", ":INIT:CONT:ENAB:SOUR?", str),
("marker_source", ":SOUR:MARK:SOUR?", str),
("seq_jump_event", ":SOUR:SEQ:JUMP:EVEN?", str),
("seq_adv_mode", ":SOUR:SEQ:ADV?", str),
("aseq_adv_mode", ":SOUR:ASEQ:ADV?", str),
("marker", ":SOUR:MARK:SEL?", int),
("marker_high", ":MARK:VOLT:HIGH?", str),
("marker_low", ":MARK:VOLT:LOW?", str),
("marker_width", ":MARK:WIDT?", int),
("marker_state", ":MARK:STAT?", str)]
data = OrderedDict((name, []) for name, *_ in name_query_type_list)
for ch in (1, 2, 3, 4):
self._parent.channels[ch - 1]._select()
self._parent.marker_channels[(ch - 1) % 2]._select()
for name, query, dtype in name_query_type_list:
data[name].append(dtype(self._parent[SCPI].send_query(query)))
return data
# Implementation
[docs]class TaborDevice(AWGDevice):
[docs] def __init__(self, device_name: str, instr_addr=None, paranoia_level=1, external_trigger=False, reset=False,
mirror_addresses=()):
"""
Constructor for a Tabor device
Args:
device_name (str): Name of the device
instr_addr: Instrument address that is forwarded to tabor_control
paranoia_level (int): Paranoia level that is forwarded to tabor_control
external_trigger (bool): Not supported yet
reset (bool):
mirror_addresses: list of devices on which the same things as on the main device are done.
For example you can a simulator and a real Device at once
"""
super().__init__(device_name)
self._instr = tabor_control.device.TEWXAwg(tabor_control.open_session(instr_addr), paranoia_level)
self._mirrors = tuple(tabor_control.device.TEWXAwg(tabor_control.open_session(address), paranoia_level)
for address in mirror_addresses)
self._coupled = None
self._clock_marker = [0, 0, 0, 0]
self.add_feature(TaborSCPI(self, self.main_instrument._visa_inst))
self.add_feature(TaborDeviceControl(self))
self.add_feature(TaborStatusTable(self))
if reset:
self[SCPI].send_cmd(":RES")
# Channel
self._channels = [TaborChannel(i + 1, self) for i in range(4)]
# MarkerChannels
self._marker_channels = [TaborMarkerChannel(i + 1, self) for i in range(4)]
self._initialize()
# ChannelTuple
self._channel_tuples = []
self.add_feature(TaborChannelSynchronization(self))
self[TaborChannelSynchronization].synchronize_channels(2)
if external_trigger:
raise NotImplementedError() # pragma: no cover
[docs] def enable(self) -> None:
"""
This method immediately generates the selected output waveform, if the device is in continuous and armed
repetition mode.
"""
self[SCPI].send_cmd(":ENAB")
[docs] def abort(self) -> None:
"""
With abort you can terminate the current generation of the output waveform. When the output waveform is
terminated the output starts generating an idle waveform.
"""
self[SCPI].send_cmd(":ABOR")
[docs] def set_coupled(self, coupled: bool) -> None:
"""
Thats the coupling of the device to 'coupled'
"""
if coupled:
self[SCPI].send_cmd("INST:COUP:STAT ON")
else:
self[SCPI].send_cmd("INST:COUP:STAT OFF")
def _is_coupled(self) -> bool:
"""
Returns true if the coupling of the device is 'coupled' otherwise false
"""
if self._coupled is None:
return self[SCPI].send_query(":INST:COUP:STAT?") == "ON"
else:
return self._coupled
[docs] def cleanup(self) -> None:
for channel_tuple in self.channel_tuples:
channel_tuple.cleanup()
@property
def channels(self) -> Collection["TaborChannel"]:
"""Returns a list of all channels of a Device"""
return self._channels
@property
def marker_channels(self) -> Collection["TaborMarkerChannel"]:
"""Returns a list of all marker channels of a device. The collection may be empty"""
return self._marker_channels
@property
def channel_tuples(self) -> Collection["TaborChannelTuple"]:
"""Returns a list of all channel tuples of a list"""
return self._channel_tuples
@property
def main_instrument(self) -> tabor_control.device.TEWXAwg:
return self._instr
@property
def mirrored_instruments(self) -> Sequence[tabor_control.device.TEWXAwg]:
return self._mirrors
@property
def all_devices(self) -> Sequence[tabor_control.device.TEWXAwg]:
return (self._instr,) + self._mirrors
@property
def _paranoia_level(self) -> tabor_control.ParanoiaLevel:
return self._instr.paranoia_level
@_paranoia_level.setter
def _paranoia_level(self, val):
for instr in self.all_devices:
instr.paranoia_level = val
@property
def dev_properties(self) -> dict:
return self._instr.dev_properties.as_dict()
def _send_binary_data(self, bin_dat, paranoia_level=None):
for instr in self.all_devices:
instr.write_segment_data(bin_dat, paranoia_level=paranoia_level)
def _download_segment_lengths(self, seg_len_list, paranoia_level=None):
for instr in self.all_devices:
instr.write_segment_lengths(seg_len_list, paranoia_level=paranoia_level)
def _download_sequencer_table(self, seq_table, paranoia_level=None):
for instr in self.all_devices:
instr.write_sequencer_table(seq_table, paranoia_level=paranoia_level)
def _download_adv_seq_table(self, seq_table, paranoia_level=None):
for instr in self.all_devices:
instr.write_advanced_sequencer_table(seq_table, paranoia_level=paranoia_level)
def _initialize(self) -> None:
# 1. Select channel
# 2. Turn off gated mode
# 3. Turn on continous mode
# 4. Armed mode (only generate waveforms after enab command)
# 5. Expect enable signal from (USB / LAN / GPIB)
# 6. Use arbitrary waveforms as marker source
# 7. Expect jump command for sequencing from (USB / LAN / GPIB)
setup_command = (
":INIT:GATE OFF; :INIT:CONT ON; "
":INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR BUS;"
":SOUR:MARK:SOUR USER; :SOUR:SEQ:JUMP:EVEN BUS ")
self[SCPI].send_cmd(":INST:SEL 1")
self[SCPI].send_cmd(setup_command)
self[SCPI].send_cmd(":INST:SEL 3")
self[SCPI].send_cmd(setup_command)
def _get_readable_device(self, simulator=True) -> tabor_control.device.TEWXAwg:
"""
A method to get the first readable device out of all devices.
A readable device is a device which you can read data from like a simulator.
Returns:
The first readable device out of all devices
Throws:
TaborException: this exception is thrown if there is no readable device in the list of all devices
"""
for device in self.all_devices:
if device.supports_basic_reading():
if simulator:
if device.is_simulator:
return device
else:
return device
raise TaborException("No device capable of device data read")
########################################################################################################################
# Channel
########################################################################################################################
# Features
class TaborVoltageRange(VoltageRange):
def __init__(self, channel: "TaborChannel"):
super().__init__()
self._parent = weakref.ref(channel)
@property
@with_select
def offset(self) -> float:
"""Get offset of AWG channel"""
return float(
self._parent().device[SCPI].send_query(":VOLT:OFFS?".format(channel=self._parent().idn)))
@property
@with_select
def amplitude(self) -> float:
"""Get amplitude of AWG channel"""
coupling = self._parent().device[SCPI].send_query(":OUTP:COUP?")
if coupling == "DC":
return float(self._parent().device[SCPI].send_query(":VOLT?"))
elif coupling == "HV":
return float(self._parent().device[SCPI].send_query(":VOLT:HV?"))
else:
raise TaborException("Unknown coupling: {}".format(coupling))
@property
def amplitude_offset_handling(self) -> AmplitudeOffsetHandling:
"""
Gets the amplitude and offset handling of this channel. The amplitude-offset controls if the amplitude and
offset settings are constant or if these should be optimized by the driver
"""
return self._parent()._amplitude_offset_handling
@amplitude_offset_handling.setter
def amplitude_offset_handling(self, amp_offs_handling: Union[AmplitudeOffsetHandling, str]) -> None:
"""
amp_offs_handling: See possible values at `AWGAmplitudeOffsetHandling`
"""
amp_offs_handling = AmplitudeOffsetHandling(AmplitudeOffsetHandling)
self._parent()._amplitude_offset_handling = amp_offs_handling
def _select(self) -> None:
self._parent()._select()
class TaborActivatableChannels(ActivatableChannels):
def __init__(self, channel: "TaborChannel"):
super().__init__()
self._parent = weakref.ref(channel)
@property
def enabled(self) -> bool:
"""
Returns the the state a channel has at the moment. A channel is either activated or deactivated
True stands for activated and false for deactivated
"""
return self._parent().device[SCPI].send_query(":OUTP ?") == "ON"
@with_select
def enable(self):
"""Enables the output of a certain channel"""
command_string = ":OUTP ON".format(ch_id=self._parent().idn)
self._parent().device[SCPI].send_cmd(command_string)
@with_select
def disable(self):
"""Disables the output of a certain channel"""
command_string = ":OUTP OFF".format(ch_id=self._parent().idn)
self._parent().device[SCPI].send_cmd(command_string)
def _select(self) -> None:
self._parent()._select()
# Implementation
[docs]class TaborChannel(AWGChannel):
def __init__(self, idn: int, device: TaborDevice):
super().__init__(idn)
self._device = weakref.ref(device)
self._amplitude_offset_handling = AmplitudeOffsetHandling.IGNORE_OFFSET
# adding Features
self.add_feature(TaborVoltageRange(self))
self.add_feature(TaborActivatableChannels(self))
@property
def device(self) -> TaborDevice:
"""Returns the device that the channel belongs to"""
return self._device()
@property
def channel_tuple(self) -> "TaborChannelTuple":
"""Returns the channel tuple that this channel belongs to"""
return self._channel_tuple()
def _set_channel_tuple(self, channel_tuple: "TaborChannelTuple") -> None:
"""
The channel tuple "channel_tuple" is assigned to this channel
Args:
channel_tuple (TaborChannelTuple): the channel tuple that this channel belongs to
"""
self._channel_tuple = weakref.ref(channel_tuple)
def _select(self) -> None:
self.device[SCPI].send_cmd(":INST:SEL {channel}".format(channel=self.idn))
########################################################################################################################
# ChannelTuple
########################################################################################################################
# Features
class TaborProgramManagement(ProgramManagement):
def __init__(self, channel_tuple: "TaborChannelTuple"):
super().__init__(channel_tuple)
self._programs = {}
self._armed_program = None
self._idle_sequence_table = [(1, 1, 0), (1, 1, 0), (1, 1, 0)]
self._trigger_source = 'BUS'
def get_repetition_mode(self, program_name: str) -> str:
"""
Returns the default repetition mode of a certain program
Args:
program_name (str): name of the program whose repetition mode should be returned
"""
return self._channel_tuple._known_programs[program_name].program._repetition_mode
def set_repetition_mode(self, program_name: str, repetition_mode: str) -> None:
"""
Changes the default repetition mode of a certain program
Args:
program_name (str): name of the program whose repetition mode should be changed
Throws:
ValueError: this Exception is thrown when an invalid repetition mode is given
"""
if repetition_mode in ("infinite", "once"):
self._channel_tuple._known_programs[program_name].program._repetition_mode = repetition_mode
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
@property
def supported_repetition_modes(self) -> Set[RepetitionMode]:
return {RepetitionMode.INFINITE}
@with_configuration_guard
@with_select
def upload(self, name: str,
program: Loop,
channels: Tuple[Optional[ChannelID], Optional[ChannelID]],
marker_channels: Tuple[Optional[ChannelID], Optional[ChannelID]],
voltage_transformation: Tuple[Callable, Callable],
repetition_mode: str = None,
force: bool = False) -> None:
"""
Upload a program to the AWG.
The policy is to prefer amending the unknown waveforms to overwriting old ones.
"""
if repetition_mode is None:
repetition_mode = self._default_repetition_mode
else:
repetition_mode = RepetitionMode(repetition_mode)
if repetition_mode not in self.supported_repetition_modes:
raise ValueError(f"{repetition_mode} is not supported on {self._channel_tuple}")
if len(channels) != len(self._channel_tuple.channels):
raise ValueError("Wrong number of channels")
if len(marker_channels) != len(self._channel_tuple.marker_channels):
raise ValueError("Wrong number of marker")
if len(voltage_transformation) != len(self._channel_tuple.channels):
raise ValueError("Wrong number of voltage transformations")
# adjust program to fit criteria
sample_rate = self._channel_tuple.device.channel_tuples[0].sample_rate
make_compatible(program,
minimal_waveform_length=192,
waveform_quantum=16,
sample_rate=sample_rate / 10 ** 9)
if name in self._channel_tuple._known_programs:
if force:
self._channel_tuple.free_program(name)
else:
raise ValueError('{} is already known on {}'.format(name, self._channel_tuple.idn))
# They call the peak to peak range amplitude
ranges = tuple(ch[VoltageRange].amplitude for ch in self._channel_tuple.channels)
voltage_amplitudes = tuple(range / 2 for range in ranges)
voltage_offsets = []
for channel in self._channel_tuple.channels:
if channel._amplitude_offset_handling == AmplitudeOffsetHandling.IGNORE_OFFSET:
voltage_offsets.append(0)
elif channel._amplitude_offset_handling == AmplitudeOffsetHandling.CONSIDER_OFFSET:
voltage_offsets.append(channel[VoltageRange].offset)
else:
raise NotImplementedError(
'{} is invalid as AWGAmplitudeOffsetHandling'.format(channel._amplitude_offset_handling))
voltage_offsets = tuple(voltage_offsets)
# parse to tabor program
tabor_program = TaborProgram(program,
channels=tuple(channels),
markers=marker_channels,
device_properties=self._channel_tuple.device.dev_properties,
sample_rate=sample_rate / 10 ** 9,
amplitudes=voltage_amplitudes,
offsets=voltage_offsets,
voltage_transformations=voltage_transformation)
segments, segment_lengths = tabor_program.get_sampled_segments()
waveform_to_segment, to_amend, to_insert = self._channel_tuple._find_place_for_segments_in_memory(segments,
segment_lengths)
self._channel_tuple._segment_references[waveform_to_segment[waveform_to_segment >= 0]] += 1
for wf_index in np.flatnonzero(to_insert > 0):
segment_index = to_insert[wf_index]
self._channel_tuple._upload_segment(to_insert[wf_index], segments[wf_index])
waveform_to_segment[wf_index] = segment_index
if np.any(to_amend):
segments_to_amend = [segments[idx] for idx in np.flatnonzero(to_amend)]
waveform_to_segment[to_amend] = self._channel_tuple._amend_segments(segments_to_amend)
self._channel_tuple._known_programs[name] = TaborProgramMemory(waveform_to_segment=waveform_to_segment,
program=tabor_program)
# set the default repetionmode for a programm
self.set_repetition_mode(program_name=name, repetition_mode=repetition_mode)
def remove(self, name: str) -> None:
"""
Remove a program from the AWG.
Also discards all waveforms referenced only by the program identified by name.
Args:
name (str): The name of the program to remove.
"""
self._channel_tuple.free_program(name)
self._channel_tuple.cleanup()
def clear(self) -> None:
"""
Removes all programs and waveforms from the AWG.
Caution: This affects all programs and waveforms on the AWG, not only those uploaded using qupulse!
"""
self._channel_tuple.device.channels[0]._select()
self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEL:ALL")
self._channel_tuple.device[SCPI].send_cmd(":SOUR:SEQ:DEL:ALL")
self._channel_tuple.device[SCPI].send_cmd(":ASEQ:DEL")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEF 1, 192")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:SEL 1")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:MODE COMB")
self._channel_tuple.device._send_binary_data(bin_dat=self._channel_tuple._idle_segment.get_as_binary())
self._channel_tuple._segment_lengths = 192 * np.ones(1, dtype=np.uint32)
self._channel_tuple._segment_capacity = 192 * np.ones(1, dtype=np.uint32)
self._channel_tuple._segment_hashes = np.ones(1, dtype=np.int64) * hash(self._channel_tuple._idle_segment)
self._channel_tuple._segment_references = np.ones(1, dtype=np.uint32)
self._channel_tuple._advanced_sequence_table = []
self._channel_tuple._sequencer_tables = []
self._channel_tuple._known_programs = dict()
self._change_armed_program(None)
@with_select
def arm(self, name: Optional[str]) -> None:
"""
Load the program 'name' and arm the device for running it.
Args:
name (str): the program the device should change to
"""
if self._channel_tuple._current_program == name:
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1")
else:
self._change_armed_program(name)
@property
def programs(self) -> Set[str]:
"""The set of program names that can currently be executed on the hardware AWG."""
return set(program.name for program in self._channel_tuple._known_programs.keys())
@with_select
def run_current_program(self) -> None:
"""
This method starts running the active program
Throws:
RuntimeError: This exception is thrown if there is no active program for this device
"""
if (self._channel_tuple.device._is_coupled()):
# channel tuple is the first channel tuple
if (self._channel_tuple.device._channel_tuples[0] == self):
if self._channel_tuple._current_program:
repetition_mode = self._channel_tuple._known_programs[
self._channel_tuple._current_program].program._repetition_mode
if repetition_mode == "infinite":
self._cont_repetition_mode()
self._channel_tuple.device[SCPI].send_cmd(':TRIG',
paranoia_level=self._channel_tuple.internal_paranoia_level)
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
else:
raise RuntimeError("No program active")
else:
warnings.warn(
"TaborWarning - run_current_program() - the device is coupled - runthe program via the first channel tuple")
else:
if self._channel_tuple._current_program:
repetition_mode = self._channel_tuple._known_programs[
self._channel_tuple._current_program].program._repetition_mode
if repetition_mode == "infinite":
self._cont_repetition_mode()
self._channel_tuple.device[SCPI].send_cmd(':TRIG', paranoia_level=self._channel_tuple.internal_paranoia_level)
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
else:
raise RuntimeError("No program active")
@with_select
@with_configuration_guard
def _change_armed_program(self, name: Optional[str]) -> None:
"""The armed program of the channel tuple is changed to the program with the name 'name'"""
if name is None:
sequencer_tables = [self._idle_sequence_table]
advanced_sequencer_table = [(1, 1, 0)]
else:
waveform_to_segment_index, program = self._channel_tuple._known_programs[name]
waveform_to_segment_number = waveform_to_segment_index + 1
# translate waveform number to actual segment
sequencer_tables = [[(rep_count, waveform_to_segment_number[wf_index], jump_flag)
for ((rep_count, wf_index, jump_flag), _) in sequencer_table]
for sequencer_table in program.get_sequencer_tables()]
# insert idle sequence
sequencer_tables = [self._idle_sequence_table] + sequencer_tables
# adjust advanced sequence table entries by idle sequence table offset
advanced_sequencer_table = [(rep_count, seq_no + 1, jump_flag)
for rep_count, seq_no, jump_flag in program.get_advanced_sequencer_table()]
if program.waveform_mode == TaborSequencing.SINGLE:
assert len(advanced_sequencer_table) == 1
assert len(sequencer_tables) == 2
while len(sequencer_tables[1]) < self._channel_tuple.device.dev_properties["min_seq_len"]:
assert advanced_sequencer_table[0][0] == 1
sequencer_tables[1].append((1, 1, 0))
# insert idle sequence in advanced sequence table
advanced_sequencer_table = [(1, 1, 0)] + advanced_sequencer_table
while len(advanced_sequencer_table) < self._channel_tuple.device.dev_properties["min_aseq_len"]:
advanced_sequencer_table.append((1, 1, 0))
self._channel_tuple.device[SCPI].send_cmd("SEQ:DEL:ALL", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple._sequencer_tables = []
self._channel_tuple.device[SCPI].send_cmd("ASEQ:DEL", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple._advanced_sequence_table = []
# download all sequence tables
for i, sequencer_table in enumerate(sequencer_tables):
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL {}".format(i + 1),
paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple.device._download_sequencer_table(sequencer_table)
self._channel_tuple._sequencer_tables = sequencer_tables
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple.device._download_adv_seq_table(advanced_sequencer_table)
self._channel_tuple._advanced_sequence_table = advanced_sequencer_table
self._channel_tuple._current_program = name
def _select(self):
self._channel_tuple.channels[0]._select()
@property
def _configuration_guard_count(self):
return self._channel_tuple._configuration_guard_count
@_configuration_guard_count.setter
def _configuration_guard_count(self, configuration_guard_count):
self._channel_tuple._configuration_guard_count = configuration_guard_count
def _enter_config_mode(self):
self._channel_tuple._enter_config_mode()
def _exit_config_mode(self):
self._channel_tuple._exit_config_mode()
@with_select
def _cont_repetition_mode(self):
"""Changes the run mode of this channel tuple to continous mode"""
self._channel_tuple.device[SCPI].send_cmd(f":TRIG:SOUR:ADV EXT")
self._channel_tuple.device[SCPI].send_cmd(
f":INIT:GATE OFF; :INIT:CONT ON; :INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR {self._trigger_source}")
class TaborVolatileParameters(VolatileParameters):
def __init__(self, channel_tuple: "TaborChannelTuple", ):
super().__init__(channel_tuple=channel_tuple)
def set_volatile_parameters(self, program_name: str, parameters: Mapping[str, numbers.Number]) -> None:
""" Set the values of parameters which were marked as volatile on program creation. Sets volatile parameters
in program memory and device's (adv.) sequence tables if program is current program.
If set_volatile_parameters needs to run faster, set CONFIG_MODE_PARANOIA_LEVEL to 0 which causes the device to
enter the configuration mode with paranoia level 0 (Note: paranoia level 0 does not work for the simulator)
and set device._is_coupled.
Args:
program_name: Name of program which should be changed.
parameters: Names of volatile parameters and respective values to which they should be set.
"""
waveform_to_segment_index, program = self._channel_tuple._known_programs[program_name]
modifications = program.update_volatile_parameters(parameters)
self._channel_tuple.logger.debug("parameter modifications: %r" % modifications)
if not modifications:
self._channel_tuple.logger.info(
"There are no volatile parameters to update. Either there are no volatile parameters with "
"these names,\nthe respective repetition counts already have the given values or the "
"volatile parameters were dropped during upload.")
return
if program_name == self._channel_tuple._current_program:
commands = []
for position, entry in modifications.items():
if not entry.repetition_count > 0:
raise ValueError("Repetition must be > 0")
if isinstance(position, int):
commands.append(":ASEQ:DEF {},{},{},{}".format(position + 1, entry.element_number + 1,
entry.repetition_count, entry.jump_flag))
else:
table_num, step_num = position
commands.append(":SEQ:SEL {}".format(table_num + 2))
commands.append(":SEQ:DEF {},{},{},{}".format(step_num,
waveform_to_segment_index[entry.element_id] + 1,
entry.repetition_count, entry.jump_flag))
self._channel_tuple._execute_multiple_commands_with_config_guard(commands)
# Wait until AWG is finished
_ = self._channel_tuple.device.main_instrument._visa_inst.query("*OPC?")
class TaborReadProgram(ReadProgram):
def __init__(self, channel_tuple: "TaborChannelTuple", ):
super().__init__(channel_tuple=channel_tuple)
def read_complete_program(self):
return PlottableProgram.from_read_data(self._channel_tuple.read_waveforms(),
self._channel_tuple.read_sequence_tables(),
self._channel_tuple.read_advanced_sequencer_table())
# Implementation
[docs]class TaborChannelTuple(AWGChannelTuple):
CONFIG_MODE_PARANOIA_LEVEL = None
def __init__(self, idn: int, device: TaborDevice, channels: Iterable["TaborChannel"],
marker_channels: Iterable["TaborMarkerChannel"]):
super().__init__(idn)
self._device = weakref.ref(device)
self._configuration_guard_count = 0
self._is_in_config_mode = False
self._channels = tuple(channels)
self._marker_channels = tuple(marker_channels)
# the channel and channel marker are assigned to this channel tuple
for channel in self.channels:
channel._set_channel_tuple(self)
for marker_ch in self.marker_channels:
marker_ch._set_channel_tuple(self)
# adding Features
self.add_feature(TaborProgramManagement(self))
self.add_feature(TaborVolatileParameters(self))
self._idle_segment = TaborSegment.from_sampled(voltage_to_uint16(voltage=np.zeros(192),
output_amplitude=0.5,
output_offset=0., resolution=14),
voltage_to_uint16(voltage=np.zeros(192),
output_amplitude=0.5,
output_offset=0., resolution=14),
None, None)
self._known_programs = dict() # type: Dict[str, TaborProgramMemory]
self._current_program = None
self._segment_lengths = None
self._segment_capacity = None
self._segment_hashes = None
self._segment_references = None
self._sequencer_tables = None
self._advanced_sequence_table = None
self._internal_paranoia_level = 0
self[TaborProgramManagement].clear()
self._channel_tuple_adapter: ChannelTupleAdapter
@property
def internal_paranoia_level(self) -> Optional[int]:
return self._internal_paranoia_level
@property
def logger(self):
return logging.getLogger("qupulse.tabor")
@property
def channel_tuple_adapter(self) -> ChannelTupleAdapter:
if self._channel_tuple_adapter is None:
self._channel_tuple_adapter = ChannelTupleAdapter(self)
return self._channel_tuple_adapter
def _select(self) -> None:
"""The channel tuple is selected, which means that the first channel of the channel tuple is selected"""
self.channels[0]._select()
@property
def device(self) -> TaborDevice:
"""Returns the device that the channel tuple belongs to"""
return self._device()
@property
def channels(self) -> Collection["TaborChannel"]:
"""Returns all channels of the channel tuple"""
return self._channels
@property
def marker_channels(self) -> Collection["TaborMarkerChannel"]:
"""Returns all marker channels of the channel tuple"""
return self._marker_channels
@property
@with_select
def sample_rate(self) -> TimeType:
"""Returns the sample rate that the channels of a channel tuple have"""
return TimeType.from_float(
float(self.device[SCPI].send_query(":FREQ:RAST?".format(channel=self.channels[0].idn))))
@property
def total_capacity(self) -> int:
return int(self.device.dev_properties["max_arb_mem"]) // 2
[docs] def free_program(self, name: str) -> TaborProgramMemory:
if name is None:
raise TaborException("Removing 'None' program is forbidden.")
program = self._known_programs.pop(name)
self._segment_references[program.waveform_to_segment] -= 1
if self._current_program == name:
self[TaborProgramManagement]._change_armed_program(None)
return program
@property
def _segment_reserved(self) -> np.ndarray:
return self._segment_references > 0
@property
def _free_points_in_total(self) -> int:
return self.total_capacity - np.sum(self._segment_capacity[self._segment_reserved])
@property
def _free_points_at_end(self) -> int:
reserved_index = np.flatnonzero(self._segment_reserved)
if len(reserved_index):
return self.total_capacity - np.sum(self._segment_capacity[:reserved_index[-1]])
else:
return self.total_capacity
[docs] @with_select
def read_sequence_tables(self) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
device = self.device._get_readable_device(simulator=True)
old_sequence = device.send_query(":SEQ:SEL?")
sequences = []
uploaded_sequence_indices = np.arange(len(self._sequencer_tables)) + 1
for sequence in uploaded_sequence_indices:
device.send_cmd(":SEQ:SEL {}".format(sequence), paranoia_level=self.internal_paranoia_level)
table = device.read_sequencer_table()
sequences.append((table['repeats'], table['segment_no'], table['jump_flag']))
device.send_cmd(":SEQ:SEL {}".format(old_sequence), paranoia_level=self.internal_paranoia_level)
return sequences
[docs] @with_select
def read_advanced_sequencer_table(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
table = self.device._get_readable_device(simulator=True).read_advanced_sequencer_table()
return table['repeats'], table['segment_no'], table['jump_flag']
[docs] def read_complete_program(self) -> PlottableProgram:
return PlottableProgram.from_read_data(self.read_waveforms(),
self.read_sequence_tables(),
self.read_advanced_sequencer_table())
def _find_place_for_segments_in_memory(self, segments: Sequence, segment_lengths: np.ndarray) -> \
Tuple[np.ndarray, np.ndarray, np.ndarray]:
# TODO: comment was not finished
"""
1. Find known segments
2. Find empty spaces with fitting length
3. Find empty spaces with bigger length
4. Amend remaining segments
Args:
segments (Sequence):
segment_length (Sequence):
Returns:
"""
segment_hashes = np.fromiter((hash(segment) for segment in segments), count=len(segments), dtype=np.int64)
waveform_to_segment = find_positions(self._segment_hashes, segment_hashes)
# separate into known and unknown
unknown = (waveform_to_segment == -1)
known = ~unknown
known_pos_in_memory = waveform_to_segment[known]
assert len(known_pos_in_memory) == 0 or np.all(
self._segment_hashes[known_pos_in_memory] == segment_hashes[known])
new_reference_counter = self._segment_references.copy()
new_reference_counter[known_pos_in_memory] += 1
to_upload_size = np.sum(segment_lengths[unknown] + 16)
free_points_in_total = self.total_capacity - np.sum(self._segment_capacity[self._segment_references > 0])
if free_points_in_total < to_upload_size:
raise MemoryError("Not enough free memory",
free_points_in_total,
to_upload_size,
self._free_points_in_total)
to_amend = cast(np.ndarray, unknown)
to_insert = np.full(len(segments), fill_value=-1, dtype=np.int64)
reserved_indices = np.flatnonzero(new_reference_counter > 0)
first_free = reserved_indices[-1] + 1 if len(reserved_indices) else 0
free_segments = new_reference_counter[:first_free] == 0
free_segment_count = np.sum(free_segments)
# look for a free segment place with the same length
for segment_idx in np.flatnonzero(to_amend):
if free_segment_count == 0:
break
pos_of_same_length = np.logical_and(free_segments,
segment_lengths[segment_idx] == self._segment_capacity[:first_free])
idx_same_length = np.argmax(pos_of_same_length)
if pos_of_same_length[idx_same_length]:
free_segments[idx_same_length] = False
free_segment_count -= 1
to_amend[segment_idx] = False
to_insert[segment_idx] = idx_same_length
# try to find places that are larger than the segments to fit in starting with the large segments and large
# free spaces
segment_indices = np.flatnonzero(to_amend)[np.argsort(segment_lengths[to_amend])[::-1]]
capacities = self._segment_capacity[:first_free]
for segment_idx in segment_indices:
free_capacities = capacities[free_segments]
free_segments_indices = np.flatnonzero(free_segments)[np.argsort(free_capacities)[::-1]]
if len(free_segments_indices) == 0:
break
fitting_segment = np.argmax((free_capacities >= segment_lengths[segment_idx])[::-1])
fitting_segment = free_segments_indices[fitting_segment]
if self._segment_capacity[fitting_segment] >= segment_lengths[segment_idx]:
free_segments[fitting_segment] = False
to_amend[segment_idx] = False
to_insert[segment_idx] = fitting_segment
free_points_at_end = self.total_capacity - np.sum(self._segment_capacity[:first_free])
if np.sum(segment_lengths[to_amend] + 16) > free_points_at_end:
raise MemoryError("Fragmentation does not allow upload.",
np.sum(segment_lengths[to_amend] + 16),
free_points_at_end,
self._free_points_at_end)
return waveform_to_segment, to_amend, to_insert
@with_select
@with_configuration_guard
def _upload_segment(self, segment_index: int, segment: TaborSegment) -> None:
if self._segment_references[segment_index] > 0:
raise ValueError("Reference count not zero")
if segment.num_points > self._segment_capacity[segment_index]:
raise ValueError("Cannot upload segment here.")
segment_no = segment_index + 1
self.device[TaborSCPI].send_cmd(":TRAC:DEF {}, {}".format(segment_no, segment.num_points),
paranoia_level=self.internal_paranoia_level)
self._segment_lengths[segment_index] = segment.num_points
self.device[TaborSCPI].send_cmd(":TRAC:SEL {}".format(segment_no),
paranoia_level=self.internal_paranoia_level)
self.device[TaborSCPI].send_cmd(":TRAC:MODE COMB",
paranoia_level=self.internal_paranoia_level)
wf_data = segment.get_as_binary()
self.device._send_binary_data(bin_dat=wf_data)
self._segment_references[segment_index] = 1
self._segment_hashes[segment_index] = hash(segment)
@with_select
@with_configuration_guard
def _amend_segments(self, segments: List[TaborSegment]) -> np.ndarray:
new_lengths = np.asarray([s.num_points for s in segments], dtype=np.uint32)
wf_data = make_combined_wave(segments)
trac_len = len(wf_data) // 2
segment_index = len(self._segment_capacity)
first_segment_number = segment_index + 1
self.device[TaborSCPI].send_cmd(":TRAC:DEF {},{}".format(first_segment_number, trac_len),
paranoia_level=self.internal_paranoia_level)
self.device[TaborSCPI].send_cmd(":TRAC:SEL {}".format(first_segment_number),
paranoia_level=self.internal_paranoia_level)
self.device[TaborSCPI].send_cmd(":TRAC:MODE COMB",
paranoia_level=self.internal_paranoia_level)
self.device._send_binary_data(bin_dat=wf_data)
old_to_update = np.count_nonzero(self._segment_capacity != self._segment_lengths)
segment_capacity = np.concatenate((self._segment_capacity, new_lengths))
segment_lengths = np.concatenate((self._segment_lengths, new_lengths))
segment_references = np.concatenate((self._segment_references, np.ones(len(segments), dtype=int)))
segment_hashes = np.concatenate((self._segment_hashes, [hash(s) for s in segments]))
if len(segments) < old_to_update:
for i, segment in enumerate(segments):
current_segment_number = first_segment_number + i
self.device[TaborSCPI].send_cmd(":TRAC:DEF {},{}".format(current_segment_number, segment.num_points),
paranoia_level=self.internal_paranoia_level)
else:
# flush the capacity
self.device._download_segment_lengths(segment_capacity)
# update non fitting lengths
for i in np.flatnonzero(segment_capacity != segment_lengths):
self.device[SCPI].send_cmd(":TRAC:DEF {},{}".format(i + 1, segment_lengths[i]))
self._segment_capacity = segment_capacity
self._segment_lengths = segment_lengths
self._segment_hashes = segment_hashes
self._segment_references = segment_references
return segment_index + np.arange(len(segments), dtype=np.int64)
[docs] @with_select
@with_configuration_guard
def cleanup(self) -> None:
"""Discard all segments after the last which is still referenced"""
reserved_indices = np.flatnonzero(self._segment_references > 0)
old_end = len(self._segment_lengths)
new_end = reserved_indices[-1] + 1 if len(reserved_indices) else 0
self._segment_lengths = self._segment_lengths[:new_end]
self._segment_capacity = self._segment_capacity[:new_end]
self._segment_hashes = self._segment_hashes[:new_end]
self._segment_references = self._segment_references[:new_end]
try:
# send max 10 commands at once
chunk_size = 10
for chunk_start in range(new_end, old_end, chunk_size):
self.device[SCPI].send_cmd("; ".join("TRAC:DEL {}".format(i + 1)
for i in range(chunk_start, min(chunk_start + chunk_size, old_end))))
except Exception as e:
raise TaborUndefinedState("Error during cleanup. Device is in undefined state.", device=self) from e
@with_configuration_guard
def _execute_multiple_commands_with_config_guard(self, commands: List[str]) -> None:
""" Joins the given commands into one and executes it with configuration guard.
Args:
commands: Commands that should be executed.
"""
cmd_str = ";".join(commands)
self.device[TaborSCPI].send_cmd(cmd_str, paranoia_level=self.internal_paranoia_level)
def _enter_config_mode(self) -> None:
"""
Enter the configuration mode if not already in. All outputs are set to the DC offset of the device and the
sequencing is disabled. The manual states this speeds up sequence validation when uploading multiple sequences.
When entering and leaving the configuration mode the AWG outputs a small (~60 mV in 4 V mode) blip.
"""
if self._is_in_config_mode is False:
# 1. Selct channel pair
# 2. Select DC as function shape
# 3. Select build-in waveform mode
if self.device._is_coupled():
out_cmd = ":OUTP:ALL OFF"
else:
out_cmd = ""
for channel in self.channels:
out_cmd = out_cmd + ":INST:SEL {ch_id}; :OUTP OFF;".format(ch_id=channel.idn)
marker_0_cmd = ":SOUR:MARK:SEL 1;:SOUR:MARK:SOUR USER;:SOUR:MARK:STAT OFF"
marker_1_cmd = ":SOUR:MARK:SEL 2;:SOUR:MARK:SOUR USER;:SOUR:MARK:STAT OFF"
wf_mode_cmd = ":SOUR:FUNC:MODE FIX"
cmd = ";".join([marker_0_cmd, marker_1_cmd, wf_mode_cmd])
cmd = out_cmd + cmd
self.device[TaborSCPI].send_cmd(cmd, paranoia_level=self.CONFIG_MODE_PARANOIA_LEVEL)
self._is_in_config_mode = True
@with_select
def _exit_config_mode(self) -> None:
"""Leave the configuration mode. Enter advanced sequence mode and turn on all outputs"""
if self.device._is_coupled():
# Coupled -> switch all channels at once
other_channel_tuple: TaborChannelTuple
if self.channels == self.device.channel_tuples[0].channels:
other_channel_tuple = self.device.channel_tuples[1]
else:
other_channel_tuple = self.device.channel_tuples[0]
if not other_channel_tuple._is_in_config_mode:
self.device[SCPI].send_cmd(":SOUR:FUNC:MODE ASEQ")
self.device[SCPI].send_cmd(":SEQ:SEL 1")
self.device[SCPI].send_cmd(":OUTP:ALL ON")
else:
self.device[SCPI].send_cmd(":SOUR:FUNC:MODE ASEQ")
self.device[SCPI].send_cmd(":SEQ:SEL 1")
for channel in self.channels:
channel[ActivatableChannels].enable()
for marker_ch in self.marker_channels:
marker_ch[ActivatableChannels].enable()
self._is_in_config_mode = False
########################################################################################################################
# Marker Channel
########################################################################################################################
# Features
class TaborActivatableMarkerChannels(ActivatableChannels):
def __init__(self, marker_channel: "TaborMarkerChannel"):
super().__init__()
self._parent = weakref.ref(marker_channel)
@property
def enabled(self) -> bool:
"""
Returns the the state a marker channel has at the moment. A channel is either activated or deactivated
True stands for activated and false for deactivated
"""
return self._parent().device[SCPI].send_query(":MARK:STAT ?") == "ON"
@with_select
def enable(self):
"""Enables the output of a certain marker channel"""
command_string = "SOUR:MARK:SOUR USER; :SOUR:MARK:STAT ON"
command_string = command_string.format(
channel=self._parent().channel_tuple.channels[0].idn,
marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1)
self._parent().device[SCPI].send_cmd(command_string)
@with_select
def disable(self):
"""Disable the output of a certain marker channel"""
command_string = ":SOUR:MARK:SOUR USER; :SOUR:MARK:STAT OFF"
command_string = command_string.format(
channel=self._parent().channel_tuple.channels[0].idn,
marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1)
self._parent().device[SCPI].send_cmd(command_string)
def _select(self) -> None:
self._parent()._select()
# Implementation
class TaborMarkerChannel(AWGMarkerChannel):
def __init__(self, idn: int, device: TaborDevice):
super().__init__(idn)
self._device = weakref.ref(device)
# adding Features
self.add_feature(TaborActivatableMarkerChannels(self))
@property
def device(self) -> TaborDevice:
"""Returns the device that this marker channel belongs to"""
return self._device()
@property
def channel_tuple(self) -> TaborChannelTuple:
"""Returns the channel tuple that this marker channel belongs to"""
return self._channel_tuple()
def _set_channel_tuple(self, channel_tuple: TaborChannelTuple) -> None:
"""
The channel tuple 'channel_tuple' is assigned to this marker channel
Args:
channel_tuple (TaborChannelTuple): the channel tuple that this marker channel belongs to
"""
self._channel_tuple = weakref.ref(channel_tuple)
def _select(self) -> None:
"""
This marker channel is selected and is now the active channel marker of the device
"""
self.device.channels[int((self.idn - 1) / 2)]._select()
self.device[SCPI].send_cmd(":SOUR:MARK:SEL {marker}".format(marker=(((self.idn - 1) % 2) + 1)))
class TaborUndefinedState(TaborException):
"""
If this exception is raised the attached tabor device is in an undefined state.
It is highly recommended to call reset it.f
"""
def __init__(self, *args, device: Union[TaborDevice, TaborChannelTuple]):
super().__init__(*args)
self.device = device
def reset_device(self):
if isinstance(self.device, TaborDevice):
self.device[TaborDeviceControl].reset()
elif isinstance(self.device, TaborChannelTuple):
self.device.cleanup()
self.device[TaborProgramManagement].clear()