Source code for qupulse.program.transformation

from typing import Any, Mapping, Set, Tuple, Sequence, AbstractSet, Union, TYPE_CHECKING, Hashable
from abc import abstractmethod
from numbers import Real

import numpy as np

from qupulse import ChannelID
from qupulse.comparable import Comparable
from qupulse.utils.types import SingletonABCMeta, frozendict
from qupulse.expressions import ExpressionScalar


_TrafoValue = Union[Real, ExpressionScalar]


__all__ = ['Transformation', 'IdentityTransformation', 'LinearTransformation', 'ScalingTransformation',
           'OffsetTransformation', 'ParallelChannelTransformation', 'ChainedTransformation',
           'chain_transformations']


[docs]class Transformation(Comparable): _identity_singleton = None """Transforms numeric time-voltage values for multiple channels to other time-voltage values. The number and names of input and output channels might differ.""" @abstractmethod def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: """Apply transformation to data Args: time: data: Returns: transformed: A DataFrame that has been transformed with index == output_channels """
[docs] @abstractmethod def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: """Return the channel identifiers"""
[docs] @abstractmethod def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: """Channels that are required for getting data for the requested output channel"""
[docs] def chain(self, next_transformation: 'Transformation') -> 'Transformation': if next_transformation is IdentityTransformation(): return self else: return chain_transformations(self, next_transformation)
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return False
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return frozenset()
[docs]class IdentityTransformation(Transformation, metaclass=SingletonABCMeta): def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: return data
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels
@property def compare_key(self) -> None: return None
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return output_channels
[docs] def chain(self, next_transformation: Transformation) -> Transformation: return next_transformation
def __repr__(self): return 'IdentityTransformation()'
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return True
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels
[docs]class ChainedTransformation(Transformation): def __init__(self, *transformations: Transformation): self._transformations = transformations @property def transformations(self) -> Tuple[Transformation, ...]: return self._transformations
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: for transformation in self._transformations: input_channels = transformation.get_output_channels(input_channels) return input_channels
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: for transformation in reversed(self._transformations): output_channels = transformation.get_input_channels(output_channels) return output_channels
def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: for transformation in self._transformations: data = transformation(time, data) return data @property def compare_key(self) -> Tuple[Transformation, ...]: return self._transformations
[docs] def chain(self, next_transformation) -> Transformation: return chain_transformations(*self.transformations, next_transformation)
def __repr__(self): return f'{type(self).__name__}{self._transformations!r}'
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return all(trafo.is_constant_invariant() for trafo in self._transformations)
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: for trafo in self._transformations: input_channels = trafo.get_constant_output_channels(input_channels) return input_channels
[docs]class LinearTransformation(Transformation):
[docs] def __init__(self, transformation_matrix: np.ndarray, input_channels: Sequence[ChannelID], output_channels: Sequence[ChannelID]): """ Args: transformation_matrix: Matrix describing the transformation with shape (output_channels, input_channels) input_channels: Channel ids of the columns output_channels: Channel ids of the rows """ transformation_matrix = np.asarray(transformation_matrix) if transformation_matrix.shape != (len(output_channels), len(input_channels)): raise ValueError('Shape of transformation matrix does not match to the given channels') output_sorter = np.argsort(output_channels) transformation_matrix = transformation_matrix[output_sorter, :] input_sorter = np.argsort(input_channels) transformation_matrix = transformation_matrix[:, input_sorter] self._matrix = transformation_matrix self._input_channels = tuple(sorted(input_channels)) self._output_channels = tuple(sorted(output_channels)) self._input_channels_set = frozenset(self._input_channels) self._output_channels_set = frozenset(self._output_channels)
def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: data_out = {forwarded_channel: data[forwarded_channel] for forwarded_channel in set(data.keys()).difference(self._input_channels)} if len(data_out) == len(data): # only forwarded data return data_out try: data_in = np.stack([data[in_channel] for in_channel in self._input_channels]) except KeyError as error: raise KeyError('Invalid input channels', set(data.keys()), set(self._input_channels)) from error transformed_data = self._matrix @ data_in for idx, out_channel in enumerate(self._output_channels): data_out[out_channel] = transformed_data[idx, ...] return data_out
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: if not input_channels >= self._input_channels_set: # input_channels is not a superset of the required input channels raise KeyError('Invalid input channels', input_channels, self._input_channels_set) return (input_channels - self._input_channels_set) | self._output_channels_set
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: forwarded = output_channels - self._output_channels_set if not forwarded.isdisjoint(self._input_channels): raise KeyError('Is input channel', forwarded & self._input_channels_set) elif output_channels.isdisjoint(self._output_channels): return output_channels else: return forwarded | self._input_channels_set
@property def compare_key(self) -> Tuple[Tuple[ChannelID], Tuple[ChannelID], bytes]: return self._input_channels, self._output_channels, self._matrix.tobytes() def __repr__(self): return ('LinearTransformation(' 'transformation_matrix={transformation_matrix},' 'input_channels={input_channels},' 'output_channels={output_channels})').format(transformation_matrix=self._matrix.tolist(), input_channels=self._input_channels, output_channels=self._output_channels)
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return True
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels
[docs]class OffsetTransformation(Transformation):
[docs] def __init__(self, offsets: Mapping[ChannelID, _TrafoValue]): """Adds an offset to each channel specified in offsets. Channels not in offsets are forewarded Args: offsets: Channel -> offset mapping """ self._offsets = frozendict(offsets) assert _are_valid_transformation_expressions(self._offsets), f"Not valid transformation expressions: {self._offsets}"
def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: offsets = _instantiate_expression_dict(time, self._offsets) return {channel: channel_values + offsets[channel] if channel in offsets else channel_values for channel, channel_values in data.items()}
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return output_channels
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels
@property def compare_key(self) -> Hashable: return self._offsets def __repr__(self): return f'{type(self).__name__}({dict(self._offsets)!r})'
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return not _has_time_dependent_values(self._offsets)
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return _get_constant_output_channels(self._offsets, input_channels)
[docs]class ScalingTransformation(Transformation): def __init__(self, factors: Mapping[ChannelID, _TrafoValue]): self._factors = frozendict(factors) assert _are_valid_transformation_expressions(self._factors), f"Not valid transformation expressions: {self._factors}" def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: factors = _instantiate_expression_dict(time, self._factors) return {channel: channel_values * factors[channel] if channel in factors else channel_values for channel, channel_values in data.items()}
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return output_channels
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels
@property def compare_key(self) -> Hashable: return self._factors def __repr__(self): return f'{type(self).__name__}({dict(self._factors)!r})'
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return not _has_time_dependent_values(self._factors)
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return _get_constant_output_channels(self._factors, input_channels)
try: if TYPE_CHECKING: import pandas PandasDataFrameType = pandas.DataFrame else: PandasDataFrameType = Any def linear_transformation_from_pandas(transformation: PandasDataFrameType) -> LinearTransformation: """ Creates a LinearTransformation object out of a pandas data frame. Args: transformation (pandas.DataFrame): The pandas.DataFrame object out of which a LinearTransformation will be formed. Returns: the created LinearTransformation instance """ return LinearTransformation(transformation.values, transformation.columns, transformation.index) LinearTransformation.from_pandas = linear_transformation_from_pandas except ImportError: pass
[docs]class ParallelChannelTransformation(Transformation):
[docs] def __init__(self, channels: Mapping[ChannelID, _TrafoValue]): """Set channel values to given values regardless their former existence. The values can be time dependent expressions. Args: channels: Channels present in this map are set to the given value. """ self._channels: Mapping[ChannelID, _TrafoValue] = frozendict(channels.items()) assert _are_valid_transformation_expressions(self._channels), f"Not valid transformation expressions: {self._channels}"
def __call__(self, time: Union[np.ndarray, float], data: Mapping[ChannelID, Union[np.ndarray, float]]) -> Mapping[ChannelID, Union[np.ndarray, float]]: overwritten = self._instantiated_values(time) return {**data, **overwritten} def _instantiated_values(self, time): scope = {'t': time} return {channel: value.evaluate_in_scope(scope) if hasattr(value, 'evaluate_in_scope') else np.full_like(time, fill_value=value, dtype=float) for channel, value in self._channels.items()} @property def compare_key(self) -> Hashable: return self._channels
[docs] def get_input_channels(self, output_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return output_channels - self._channels.keys()
[docs] def get_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return input_channels | self._channels.keys()
def __repr__(self): return f'{type(self).__name__}({dict(self._channels)!r})'
[docs] def is_constant_invariant(self): """Signals if the transformation always maps constants to constants.""" return not _has_time_dependent_values(self._channels)
[docs] def get_constant_output_channels(self, input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: output_channels = set(input_channels) for ch, value in self._channels.items(): if hasattr(value, 'variables'): output_channels.discard(ch) else: output_channels.add(ch) return output_channels
[docs]def chain_transformations(*transformations: Transformation) -> Transformation: parsed_transformations = [] for transformation in transformations: if transformation is IdentityTransformation() or transformation is None: pass elif isinstance(transformation, ChainedTransformation): parsed_transformations.extend(transformation.transformations) else: parsed_transformations.append(transformation) if len(parsed_transformations) == 0: return IdentityTransformation() elif len(parsed_transformations) == 1: return parsed_transformations[0] else: return ChainedTransformation(*parsed_transformations)
def _instantiate_expression_dict(time, expressions: Mapping[str, _TrafoValue]) -> Mapping[str, Union[Real, np.ndarray]]: scope = {'t': time} modified_expressions = {} for name, value in expressions.items(): if hasattr(value, 'evaluate_in_scope'): modified_expressions[name] = value.evaluate_in_scope(scope) if modified_expressions: return {**expressions, **modified_expressions} else: return expressions def _has_time_dependent_values(expressions: Mapping[ChannelID, _TrafoValue]) -> bool: return any(hasattr(value, 'variables') for value in expressions.values()) def _get_constant_output_channels(expressions: Mapping[ChannelID, _TrafoValue], constant_input_channels: AbstractSet[ChannelID]) -> AbstractSet[ChannelID]: return {ch for ch in constant_input_channels if not hasattr(expressions.get(ch, None), 'variables')} def _are_valid_transformation_expressions(expressions: Mapping[ChannelID, _TrafoValue]) -> bool: return all(expr.variables == ('t',) for expr in expressions.values() if hasattr(expr, 'variables'))