Source code for qupulse._program.transformation
from typing import Mapping, Set, Tuple, Sequence
from abc import abstractmethod
from numbers import Real
import numpy as np
from qupulse import ChannelID
from qupulse.comparable import Comparable
from qupulse.utils.types import SingletonABCMeta
[docs]class Transformation(Comparable):
_identity_singleton = None
"""Transforms numeric time-voltage values for multiple channels to other time-voltage values. The number and names
of input and output channels might differ."""
@abstractmethod
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
"""Apply transformation to data
Args:
time:
data:
Returns:
transformed: A DataFrame that has been transformed with index == output_channels
"""
[docs] @abstractmethod
def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
"""Return the channel identifiers"""
[docs] @abstractmethod
def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
"""Channels that are required for getting data for the requested output channel"""
[docs] def chain(self, next_transformation: 'Transformation') -> 'Transformation':
if next_transformation is IdentityTransformation():
return self
else:
return chain_transformations(self, next_transformation)
[docs]class IdentityTransformation(Transformation, metaclass=SingletonABCMeta):
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
return data
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
return input_channels
@property
def compare_key(self) -> None:
return None
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
return output_channels
[docs] def chain(self, next_transformation: Transformation) -> Transformation:
return next_transformation
def __repr__(self):
return 'IdentityTransformation()'
[docs]class ChainedTransformation(Transformation):
def __init__(self, *transformations: Transformation):
self._transformations = transformations
@property
def transformations(self) -> Tuple[Transformation, ...]:
return self._transformations
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
for transformation in self._transformations:
input_channels = transformation.get_output_channels(input_channels)
return input_channels
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
for transformation in reversed(self._transformations):
output_channels = transformation.get_input_channels(output_channels)
return output_channels
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
for transformation in self._transformations:
data = transformation(time, data)
return data
@property
def compare_key(self) -> Tuple[Transformation, ...]:
return self._transformations
[docs] def chain(self, next_transformation) -> 'ChainedTransformation':
return chain_transformations(*self.transformations, next_transformation)
def __repr__(self):
return 'ChainedTransformation%r' % (self._transformations,)
[docs]class LinearTransformation(Transformation):
[docs] def __init__(self,
transformation_matrix: np.ndarray,
input_channels: Sequence[ChannelID],
output_channels: Sequence[ChannelID]):
"""
Args:
transformation_matrix: Matrix describing the transformation with shape (output_channels, input_channels)
input_channels: Channel ids of the columns
output_channels: Channel ids of the rows
"""
transformation_matrix = np.asarray(transformation_matrix)
if transformation_matrix.shape != (len(output_channels), len(input_channels)):
raise ValueError('Shape of transformation matrix does not match to the given channels')
output_sorter = np.argsort(output_channels)
transformation_matrix = transformation_matrix[output_sorter, :]
input_sorter = np.argsort(input_channels)
transformation_matrix = transformation_matrix[:, input_sorter]
self._matrix = transformation_matrix
self._input_channels = tuple(sorted(input_channels))
self._output_channels = tuple(sorted(output_channels))
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
data_out = {forwarded_channel: data[forwarded_channel]
for forwarded_channel in set(data.keys()).difference(self._input_channels)}
if len(data_out) == len(data):
# only forwarded data
return data_out
try:
data_in = np.stack(data[in_channel] for in_channel in self._input_channels)
except KeyError as error:
raise KeyError('Invalid input channels', set(data.keys()), set(self._input_channels)) from error
transformed_data = self._matrix @ data_in
for idx, out_channel in enumerate(self._output_channels):
data_out[out_channel] = transformed_data[idx, :]
return data_out
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
if not input_channels.issuperset(self._input_channels):
raise KeyError('Invalid input channels', input_channels, set(self._input_channels))
return input_channels.difference(self._input_channels).union(self._output_channels)
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
forwarded = output_channels.difference(self._output_channels)
if not forwarded.isdisjoint(self._input_channels):
raise KeyError('Is input channel', forwarded.intersection(self._input_channels))
elif output_channels.isdisjoint(self._output_channels):
return output_channels
else:
return forwarded.union(self._input_channels)
@property
def compare_key(self) -> Tuple[Tuple[ChannelID], Tuple[ChannelID], bytes]:
return self._input_channels, self._output_channels, self._matrix.tobytes()
def __repr__(self):
return ('LinearTransformation('
'transformation_matrix={transformation_matrix},'
'input_channels={input_channels},'
'output_channels={output_channels})').format(transformation_matrix=self._matrix.tolist(),
input_channels=self._input_channels,
output_channels=self._output_channels)
[docs]class OffsetTransformation(Transformation):
[docs] def __init__(self, offsets: Mapping[ChannelID, Real]):
"""Adds an offset to each channel specified in offsets.
Channels not in offsets are forewarded
Args:
offsets: Channel -> offset mapping
"""
self._offsets = dict(offsets.items())
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
return {channel: channel_values + self._offsets[channel] if channel in self._offsets else channel_values
for channel, channel_values in data.items()}
@property
def compare_key(self) -> frozenset:
return frozenset(self._offsets.items())
def __repr__(self):
return 'OffsetTransformation(%r)' % self._offsets
[docs]class ScalingTransformation(Transformation):
def __init__(self, factors: Mapping[ChannelID, Real]):
self._factors = dict(factors.items())
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
return {channel: channel_values * self._factors[channel] if channel in self._factors else channel_values
for channel, channel_values in data.items()}
@property
def compare_key(self) -> frozenset:
return frozenset(self._factors.items())
def __repr__(self):
return 'ScalingTransformation(%r)' % self._factors
try:
import pandas
def linear_transformation_from_pandas(transformation: pandas.DataFrame) -> LinearTransformation:
""" Creates a LinearTransformation object out of a pandas data frame.
Args:
transformation (pandas.DataFrame): The pandas.DataFrame object out of which a LinearTransformation will be formed.
Returns:
the created LinearTransformation instance
"""
return LinearTransformation(transformation.values, transformation.columns, transformation.index)
LinearTransformation.from_pandas = linear_transformation_from_pandas
except ImportError:
pass
[docs]class ParallelConstantChannelTransformation(Transformation):
[docs] def __init__(self, channels: Mapping[ChannelID, Real]):
"""Set channel values to given values regardless their former existence
Args:
channels: Channels present in this map are set to the given value.
"""
self._channels = {channel: float(value)
for channel, value in channels.items()}
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]:
overwritten = {channel: np.full_like(time, fill_value=value, dtype=float)
for channel, value in self._channels.items()}
return {**data, **overwritten}
@property
def compare_key(self) -> Tuple[Tuple[ChannelID, float], ...]:
return tuple(sorted(self._channels.items()))
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]:
return output_channels - self._channels.keys()
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]:
return input_channels | self._channels.keys()
def __repr__(self):
return 'ParallelConstantChannelTransformation(%r)' % self._channels
[docs]def chain_transformations(*transformations: Transformation) -> Transformation:
parsed_transformations = []
for transformation in transformations:
if transformation is IdentityTransformation() or transformation is None:
pass
elif isinstance(transformation, ChainedTransformation):
parsed_transformations.extend(transformation.transformations)
else:
parsed_transformations.append(transformation)
if len(parsed_transformations) == 0:
return IdentityTransformation()
elif len(parsed_transformations) == 1:
return parsed_transformations[0]
else:
return ChainedTransformation(*parsed_transformations)