Source code for qupulse._program.transformation
from typing import Mapping, Set, Tuple, Sequence
from abc import abstractmethod
from numbers import Real
import numpy as np
from qupulse import ChannelID
from qupulse.comparable import Comparable
from qupulse.utils.types import SingletonABCMeta
[docs]class Transformation(Comparable):
_identity_singleton = None
"""Transforms numeric time-voltage values for multiple channels to other time-voltage values. The number and names
of input and output channels might differ."""
@abstractmethod
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
"""Apply transformation to data
Args:
time:
data:
Returns:
transformed: A DataFrame that has been transformed with index == output_channels
"""
[docs] @abstractmethod
def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
"""Return the channel identifiers"""
[docs] @abstractmethod
def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
"""Channels that are required for getting data for the requested output channel"""
[docs] def chain(self, next_transformation: 'Transformation') -> 'Transformation':
if next_transformation is IdentityTransformation():
return self
else:
return chain_transformations(self, next_transformation)
[docs]class IdentityTransformation(Transformation, metaclass=SingletonABCMeta):
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
return data
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
return input_channels
@property
def compare_key(self) -> None:
return None
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
return output_channels
[docs] def chain(self, next_transformation: Transformation) -> Transformation:
return next_transformation
[docs]class ChainedTransformation(Transformation):
def __init__(self, *transformations: Transformation):
self._transformations = transformations
@property
def transformations(self) -> Tuple[Transformation, ...]:
return self._transformations
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
for transformation in self._transformations:
input_channels = transformation.get_output_channels(input_channels)
return input_channels
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
for transformation in reversed(self._transformations):
output_channels = transformation.get_input_channels(output_channels)
return output_channels
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
for transformation in self._transformations:
data = transformation(time, data)
return data
@property
def compare_key(self) -> Tuple[Transformation, ...]:
return self._transformations
[docs] def chain(self, next_transformation) -> 'ChainedTransformation':
return chain_transformations(*self.transformations, next_transformation)
[docs]class LinearTransformation(Transformation):
[docs] def __init__(self,
transformation_matrix: np.ndarray,
input_channels: Sequence[ChannelID],
output_channels: Sequence[ChannelID]):
"""
Args:
transformation_matrix: columns are input and index are output channels
"""
assert transformation_matrix.shape == (len(output_channels), len(input_channels))
output_sorter = np.argsort(output_channels)
transformation_matrix = transformation_matrix[output_sorter, :]
input_sorter = np.argsort(input_channels)
transformation_matrix = transformation_matrix[:, input_sorter]
self._matrix = transformation_matrix
self._input_channels = tuple(sorted(input_channels))
self._output_channels = tuple(sorted(output_channels))
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
data_out = {forwarded_channel: data[forwarded_channel]
for forwarded_channel in set(data.keys()).difference(self._input_channels)}
if len(data_out) == len(data):
# only forwarded data
return data_out
try:
data_in = np.stack(data[in_channel] for in_channel in self._input_channels)
except KeyError as error:
raise KeyError('Invalid input channels', set(data.keys()), set(self._input_channels)) from error
transformed_data = self._matrix @ data_in
for idx, out_channel in enumerate(self._output_channels):
data_out[out_channel] = transformed_data[idx, :]
return data_out
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
if not input_channels.issuperset(self._input_channels):
raise KeyError('Invalid input channels', input_channels, set(self._input_channels))
return input_channels.difference(self._input_channels).union(self._output_channels)
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
forwarded = output_channels.difference(self._output_channels)
if not forwarded.isdisjoint(self._input_channels):
raise KeyError('Is input channel', forwarded.intersection(self._input_channels))
elif output_channels.isdisjoint(self._output_channels):
return output_channels
else:
return forwarded.union(self._input_channels)
@property
def compare_key(self) -> Tuple[Tuple[ChannelID], Tuple[ChannelID], bytes]:
return self._input_channels, self._output_channels, self._matrix.tobytes()
try:
import pandas
def linear_transformation_from_pandas(transformation: pandas.DataFrame) -> LinearTransformation:
""" Creates a LinearTransformation object out of a pandas data frame.
Args:
transformation (pandas.DataFrame): The pandas.DataFrame object out of which a LinearTransformation will be formed.
Returns:
the created LinearTransformation instance
"""
return LinearTransformation(transformation.values, transformation.columns, transformation.index)
LinearTransformation.from_pandas = linear_transformation_from_pandas
except ImportError:
pass
[docs]class ParallelConstantChannelTransformation(Transformation):
[docs] def __init__(self, channels: Mapping[ChannelID, Real]):
"""Set channel values to given values regardless their former existence
Args:
channels:
"""
self._channels = {channel: float(value)
for channel, value in channels.items()}
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
overwritten = {channel: np.full_like(time, fill_value=value, dtype=float)
for channel, value in self._channels.items()}
return {**data, **overwritten}
@property
def compare_key(self) -> Tuple[Tuple[ChannelID, float], ...]:
return tuple(sorted(self._channels.items()))
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
return output_channels - self._channels.keys()
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
return input_channels | self._channels.keys()
[docs]def chain_transformations(*transformations: Transformation) -> Transformation:
parsed_transformations = []
for transformation in transformations:
if transformation is IdentityTransformation():
pass
elif isinstance(transformation, ChainedTransformation):
parsed_transformations.extend(transformation.transformations)
else:
parsed_transformations.append(transformation)
if len(parsed_transformations) == 0:
return IdentityTransformation()
elif len(parsed_transformations) == 1:
return parsed_transformations[0]
else:
return ChainedTransformation(*parsed_transformations)