Source code for qupulse._program.transformation

from typing import Mapping, Set, Tuple, Sequence
from abc import abstractmethod
from numbers import Real

import numpy as np

from qupulse import ChannelID
from qupulse.comparable import Comparable
from qupulse.utils.types import SingletonABCMeta


[docs]class Transformation(Comparable): _identity_singleton = None """Transforms numeric time-voltage values for multiple channels to other time-voltage values. The number and names of input and output channels might differ.""" @abstractmethod def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]: """Apply transformation to data Args: time: data: Returns: transformed: A DataFrame that has been transformed with index == output_channels """
[docs] @abstractmethod def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]: """Return the channel identifiers"""
[docs] @abstractmethod def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]: """Channels that are required for getting data for the requested output channel"""
[docs] def chain(self, next_transformation: 'Transformation') -> 'Transformation': if next_transformation is IdentityTransformation(): return self else: return chain_transformations(self, next_transformation)
[docs]class IdentityTransformation(Transformation, metaclass=SingletonABCMeta): def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]: return data
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]: return input_channels
@property def compare_key(self) -> None: return None
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]: return output_channels
[docs] def chain(self, next_transformation: Transformation) -> Transformation: return next_transformation
[docs]class ChainedTransformation(Transformation): def __init__(self, *transformations: Transformation): self._transformations = transformations @property def transformations(self) -> Tuple[Transformation, ...]: return self._transformations
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]: for transformation in self._transformations: input_channels = transformation.get_output_channels(input_channels) return input_channels
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]: for transformation in reversed(self._transformations): output_channels = transformation.get_input_channels(output_channels) return output_channels
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]: for transformation in self._transformations: data = transformation(time, data) return data @property def compare_key(self) -> Tuple[Transformation, ...]: return self._transformations
[docs] def chain(self, next_transformation) -> 'ChainedTransformation': return chain_transformations(*self.transformations, next_transformation)
[docs]class LinearTransformation(Transformation):
[docs] def __init__(self, transformation_matrix: np.ndarray, input_channels: Sequence[ChannelID], output_channels: Sequence[ChannelID]): """ Args: transformation_matrix: columns are input and index are output channels """ assert transformation_matrix.shape == (len(output_channels), len(input_channels)) output_sorter = np.argsort(output_channels) transformation_matrix = transformation_matrix[output_sorter, :] input_sorter = np.argsort(input_channels) transformation_matrix = transformation_matrix[:, input_sorter] self._matrix = transformation_matrix self._input_channels = tuple(sorted(input_channels)) self._output_channels = tuple(sorted(output_channels))
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]: data_out = {forwarded_channel: data[forwarded_channel] for forwarded_channel in set(data.keys()).difference(self._input_channels)} if len(data_out) == len(data): # only forwarded data return data_out try: data_in = np.stack(data[in_channel] for in_channel in self._input_channels) except KeyError as error: raise KeyError('Invalid input channels', set(data.keys()), set(self._input_channels)) from error transformed_data = self._matrix @ data_in for idx, out_channel in enumerate(self._output_channels): data_out[out_channel] = transformed_data[idx, :] return data_out
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]: if not input_channels.issuperset(self._input_channels): raise KeyError('Invalid input channels', input_channels, set(self._input_channels)) return input_channels.difference(self._input_channels).union(self._output_channels)
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]: forwarded = output_channels.difference(self._output_channels) if not forwarded.isdisjoint(self._input_channels): raise KeyError('Is input channel', forwarded.intersection(self._input_channels)) elif output_channels.isdisjoint(self._output_channels): return output_channels else: return forwarded.union(self._input_channels)
@property def compare_key(self) -> Tuple[Tuple[ChannelID], Tuple[ChannelID], bytes]: return self._input_channels, self._output_channels, self._matrix.tobytes()
try: import pandas def linear_transformation_from_pandas(transformation: pandas.DataFrame) -> LinearTransformation: """ Creates a LinearTransformation object out of a pandas data frame. Args: transformation (pandas.DataFrame): The pandas.DataFrame object out of which a LinearTransformation will be formed. Returns: the created LinearTransformation instance """ return LinearTransformation(transformation.values, transformation.columns, transformation.index) LinearTransformation.from_pandas = linear_transformation_from_pandas except ImportError: pass
[docs]class ParallelConstantChannelTransformation(Transformation):
[docs] def __init__(self, channels: Mapping[ChannelID, Real]): """Set channel values to given values regardless their former existence Args: channels: """ self._channels = {channel: float(value) for channel, value in channels.items()}
def __call__(self, time: np.ndarray, data: Mapping[ChannelID, np.ndarray]) -> Mapping[ChannelID, np.ndarray]: overwritten = {channel: np.full_like(time, fill_value=value, dtype=float) for channel, value in self._channels.items()} return {**data, **overwritten} @property def compare_key(self) -> Tuple[Tuple[ChannelID, float], ...]: return tuple(sorted(self._channels.items()))
[docs] def get_input_channels(self, output_channels: Set[ChannelID]) -> Set[ChannelID]: return output_channels - self._channels.keys()
[docs] def get_output_channels(self, input_channels: Set[ChannelID]) -> Set[ChannelID]: return input_channels | self._channels.keys()
[docs]def chain_transformations(*transformations: Transformation) -> Transformation: parsed_transformations = [] for transformation in transformations: if transformation is IdentityTransformation(): pass elif isinstance(transformation, ChainedTransformation): parsed_transformations.extend(transformation.transformations) else: parsed_transformations.append(transformation) if len(parsed_transformations) == 0: return IdentityTransformation() elif len(parsed_transformations) == 1: return parsed_transformations[0] else: return ChainedTransformation(*parsed_transformations)