Skip to content

Data processor logger #526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion qiskit_experiments/data_processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
:toctree: ../stubs/

DataProcessor
DataLogger
DataAction
TrainableDataAction

Expand Down Expand Up @@ -54,4 +55,4 @@
BasisExpectationValue,
)

from .data_processor import DataProcessor
from .data_processor import DataProcessor, DataLogger
148 changes: 83 additions & 65 deletions qiskit_experiments/data_processing/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

"""Actions done on the data to bring it in a usable form."""

from typing import Any, Dict, List, Set, Tuple, Union
from typing import Any, Dict, List, Tuple, Union, Callable, Optional

from qiskit_experiments.data_processing.data_action import DataAction, TrainableDataAction
from qiskit_experiments.data_processing.exceptions import DataProcessorError
Expand Down Expand Up @@ -69,84 +69,42 @@ def is_trained(self) -> bool:

return True

def __call__(self, data: Union[Dict, List[Dict]], **options) -> Tuple[Any, Any]:
def __call__(
self,
data: Union[Dict, List[Dict]],
up_to_index: Optional[int] = None,
callback: Optional[Callable[[int, str, Any, Any], None]] = None,
) -> Tuple[Any, Any]:
"""
Call self on the given datum. This method sequentially calls the stored data actions
on the datum.

Args:
data: The data, typically from ExperimentData.data(...), that needs to be processed.
This dict or list of dicts also contains the metadata of each experiment.
options: Run-time options given as keyword arguments that will be passed to the nodes.

Returns:
processed data: The data processed by the data processor.
"""
return self._call_internal(data, **options)

def call_with_history(
self, data: Union[Dict, List[Dict]], history_nodes: Set = None
) -> Tuple[Any, Any, List]:
"""
Call self on the given datum. This method sequentially calls the stored data actions
on the datum and also returns the history of the processed data.

Args:
data: The data, typically from ExperimentData.data(...), that needs to be processed.
This dict or list of dicts also contains the metadata of each experiment.
history_nodes: The nodes, specified by index in the data processing chain, to
include in the history. If None is given then all nodes will be included
in the history.

Returns:
processed data: The datum processed by the data processor.
history: The datum processed at each node of the data processor.
"""
return self._call_internal(data, True, history_nodes)

def _call_internal(
self,
data: Union[Dict, List[Dict]],
with_history: bool = False,
history_nodes: Set = None,
call_up_to_node: int = None,
) -> Union[Tuple[Any, Any], Tuple[Any, Any, List]]:
"""Process the data with or without storing the history of the computation.

Args:
data: The data, typically from ExperimentData.data(...), that needs to be processed.
This dict or list of dicts also contains the metadata of each experiment.
with_history: if True the history is returned otherwise it is not.
history_nodes: The nodes, specified by index in the data processing chain, to
include in the history. If None is given then all nodes will be included
in the history.
call_up_to_node: The data processor will use each node in the processing chain
This dict or list of dicts also contains the metadata of each experiment.
up_to_index: The data processor will use each node in the processing chain
up to the node indexed by call_up_to_node. If this variable is not specified
then all nodes in the data processing chain will be called.
callback: Arbitrary python callable that is called after each node execution.
Processor passes (index of node, name of node, nominal values, standard errors)
to the callback. This can be used to log the history of intermediate data.
See :class:`qiskit_experiments.data_processing.data_processor.DataLogandger`
for the preset logger.

Returns:
datum_ and history if with_history is True or datum_ if with_history is False.
A tuple of (nominal values, standard errors) processed by the processor.
"""
if call_up_to_node is None:
call_up_to_node = len(self._nodes)
if up_to_index is None:
up_to_index = len(self._nodes)

datum_, error_ = self._data_extraction(data), None
for index, node in enumerate(self._nodes[:up_to_index]):
datum_, error_ = node(datum_, error_)

history = []
for index, node in enumerate(self._nodes):

if index < call_up_to_node:
datum_, error_ = node(datum_, error_)

if with_history and (
history_nodes is None or (history_nodes and index in history_nodes)
):
history.append((node.__class__.__name__, datum_, error_, index))
if callback:
callback(index, node.__class__.__name__, datum_, error_)

if with_history:
return datum_, error_, history
else:
return datum_, error_
return datum_, error_

def train(self, data: List[Dict[str, Any]]):
"""Train the nodes of the data processor.
Expand All @@ -159,7 +117,7 @@ def train(self, data: List[Dict[str, Any]]):
if isinstance(node, TrainableDataAction):
if not node.is_trained:
# Process the data up to the untrained node.
node.train(self._call_internal(data, call_up_to_node=index)[0])
node.train(self.__call__(data, up_to_index=index)[0])

def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List:
"""Extracts the data on which to run the nodes.
Expand Down Expand Up @@ -204,3 +162,63 @@ def __repr__(self):
names = ", ".join(node.__class__.__name__ for node in self._nodes)

return f"{self.__class__.__name__}(input_key={self._input_key}, nodes=[{names}])"


class DataLogger:
"""Data processor logger.

This class implements the :meth:``__call__`` method so that it can be used as a callback.
Once this instance is called with data in the data processor,
this records intermediate data generated by a specific processor node.
That can be accessed via :meth:`data` method after the processor is executed.
"""

def __init__(self, history_nodes: Optional[List[int]] = None):
"""Create new data logger.

Args:
history_nodes: List of node index to record data.
"""
self._history = list()
self._history_nodes = history_nodes

def __call__(self, index: int, name: str, nominals: Any, stdevs: Any):
"""Record data. This is invoked by the data processor.

Args:
index: Position of processing node in the entire processing chain.
name: Name of node.
nominals: Nominal values.
stdevs: Standard errors.
"""
if self._history_nodes is None or index in self._history_nodes:
self._history.append((name, nominals, stdevs, index))

def clear(self):
"""Clear previous data."""
self._history.clear()

def data(
self, index: Optional[Union[str, int]] = None
) -> Union[Tuple[Any, Any], List[Tuple[Any, Any]]]:
"""Get intermediate data.

Args:
index: Index of target data, either node index or node name.
Return all data if not specified.

Returns:
A tuple of (nominal values, standard errors) or list of it.

Raises:
DataProcessorError: When index is not found.
"""
if index is None:
return self._history

colum = 0 if isinstance(index, str) else 3
for data in self._history:
if data[colum] == index:
return data[1], data[2]

raise DataProcessorError(f"Index {index} is not found.")
30 changes: 21 additions & 9 deletions test/data_processing/test_data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from qiskit.result import Result

from qiskit_experiments.framework import ExperimentData
from qiskit_experiments.data_processing.data_processor import DataProcessor
from qiskit_experiments.data_processing.data_processor import DataProcessor, DataLogger
from qiskit_experiments.data_processing.exceptions import DataProcessorError
from qiskit_experiments.data_processing.nodes import (
AverageData,
Expand Down Expand Up @@ -84,9 +84,13 @@ def test_empty_processor(self):
self.assertEqual(datum, [{"00": 4, "10": 6}])
self.assertIsNone(error)

datum, error, history = data_processor.call_with_history(self.exp_data_lvl2.data(0))
history = DataLogger()
datum, error = data_processor(
data=self.exp_data_lvl2.data(0),
callback=history,
)
self.assertEqual(datum, [{"00": 4, "10": 6}])
self.assertEqual(history, [])
self.assertEqual(history.data(), [])

def test_to_real(self):
"""Test scaling and conversion to real part."""
Expand Down Expand Up @@ -117,13 +121,17 @@ def test_to_real(self):
self.assertIsNone(error)

# Test that we can call with history.
new_data, error, history = processor.call_with_history(exp_data.data(0))
history = DataLogger()
new_data, error = processor(
data=exp_data.data(0),
callback=history,
)

self.assertEqual(exp_data.data(0), expected_old)
self.assertTrue(np.allclose(new_data, expected_new))

self.assertEqual(history[0][0], "ToReal")
self.assertTrue(np.allclose(history[0][1], expected_new))
self.assertEqual(history.data()[0][0], "ToReal")
self.assertTrue(np.allclose(history.data()[0][1], expected_new))

# Test to real on more than one datum
new_data, error = processor(exp_data.data())
Expand Down Expand Up @@ -172,12 +180,16 @@ def test_to_imag(self):
self.assertIsNone(error)

# Test that we can call with history.
new_data, error, history = processor.call_with_history(exp_data.data(0))
history = DataLogger()
new_data, error = processor(
data=exp_data.data(0),
callback=history,
)
self.assertEqual(exp_data.data(0), expected_old)
self.assertTrue(np.allclose(new_data, expected_new))

self.assertEqual(history[0][0], "ToImag")
self.assertTrue(np.allclose(history[0][1], expected_new))
self.assertEqual(history.data()[0][0], "ToImag")
self.assertTrue(np.allclose(history.data()[0][1], expected_new))

# Test to imaginary on more than one datum
new_data, error = processor(exp_data.data())
Expand Down