From 707f21d6cd61df4e2852421791a104ca704c4836 Mon Sep 17 00:00:00 2001 From: knzwnao Date: Sat, 13 Nov 2021 05:49:27 +0900 Subject: [PATCH] add data logger --- .../data_processing/__init__.py | 3 +- .../data_processing/data_processor.py | 148 ++++++++++-------- test/data_processing/test_data_processing.py | 30 ++-- 3 files changed, 106 insertions(+), 75 deletions(-) diff --git a/qiskit_experiments/data_processing/__init__.py b/qiskit_experiments/data_processing/__init__.py index adbbbd716d..d3a81a5249 100644 --- a/qiskit_experiments/data_processing/__init__.py +++ b/qiskit_experiments/data_processing/__init__.py @@ -27,6 +27,7 @@ :toctree: ../stubs/ DataProcessor + DataLogger DataAction TrainableDataAction @@ -54,4 +55,4 @@ BasisExpectationValue, ) -from .data_processor import DataProcessor +from .data_processor import DataProcessor, DataLogger diff --git a/qiskit_experiments/data_processing/data_processor.py b/qiskit_experiments/data_processing/data_processor.py index 4cb45d71ad..6ccf2133ca 100644 --- a/qiskit_experiments/data_processing/data_processor.py +++ b/qiskit_experiments/data_processing/data_processor.py @@ -12,7 +12,7 @@ """Actions done on the data to bring it in a usable form.""" -from typing import Any, Dict, List, Set, Tuple, Union +from typing import Any, Dict, List, Tuple, Union, Callable, Optional from qiskit_experiments.data_processing.data_action import DataAction, TrainableDataAction from qiskit_experiments.data_processing.exceptions import DataProcessorError @@ -69,84 +69,42 @@ def is_trained(self) -> bool: return True - def __call__(self, data: Union[Dict, List[Dict]], **options) -> Tuple[Any, Any]: + def __call__( + self, + data: Union[Dict, List[Dict]], + up_to_index: Optional[int] = None, + callback: Optional[Callable[[int, str, Any, Any], None]] = None, + ) -> Tuple[Any, Any]: """ Call self on the given datum. This method sequentially calls the stored data actions on the datum. Args: data: The data, typically from ExperimentData.data(...), that needs to be processed. - This dict or list of dicts also contains the metadata of each experiment. - options: Run-time options given as keyword arguments that will be passed to the nodes. - - Returns: - processed data: The data processed by the data processor. - """ - return self._call_internal(data, **options) - - def call_with_history( - self, data: Union[Dict, List[Dict]], history_nodes: Set = None - ) -> Tuple[Any, Any, List]: - """ - Call self on the given datum. This method sequentially calls the stored data actions - on the datum and also returns the history of the processed data. - - Args: - data: The data, typically from ExperimentData.data(...), that needs to be processed. - This dict or list of dicts also contains the metadata of each experiment. - history_nodes: The nodes, specified by index in the data processing chain, to - include in the history. If None is given then all nodes will be included - in the history. - - Returns: - processed data: The datum processed by the data processor. - history: The datum processed at each node of the data processor. - """ - return self._call_internal(data, True, history_nodes) - - def _call_internal( - self, - data: Union[Dict, List[Dict]], - with_history: bool = False, - history_nodes: Set = None, - call_up_to_node: int = None, - ) -> Union[Tuple[Any, Any], Tuple[Any, Any, List]]: - """Process the data with or without storing the history of the computation. - - Args: - data: The data, typically from ExperimentData.data(...), that needs to be processed. - This dict or list of dicts also contains the metadata of each experiment. - with_history: if True the history is returned otherwise it is not. - history_nodes: The nodes, specified by index in the data processing chain, to - include in the history. If None is given then all nodes will be included - in the history. - call_up_to_node: The data processor will use each node in the processing chain + This dict or list of dicts also contains the metadata of each experiment. + up_to_index: The data processor will use each node in the processing chain up to the node indexed by call_up_to_node. If this variable is not specified then all nodes in the data processing chain will be called. + callback: Arbitrary python callable that is called after each node execution. + Processor passes (index of node, name of node, nominal values, standard errors) + to the callback. This can be used to log the history of intermediate data. + See :class:`qiskit_experiments.data_processing.data_processor.DataLogandger` + for the preset logger. Returns: - datum_ and history if with_history is True or datum_ if with_history is False. + A tuple of (nominal values, standard errors) processed by the processor. """ - if call_up_to_node is None: - call_up_to_node = len(self._nodes) + if up_to_index is None: + up_to_index = len(self._nodes) datum_, error_ = self._data_extraction(data), None + for index, node in enumerate(self._nodes[:up_to_index]): + datum_, error_ = node(datum_, error_) - history = [] - for index, node in enumerate(self._nodes): - - if index < call_up_to_node: - datum_, error_ = node(datum_, error_) - - if with_history and ( - history_nodes is None or (history_nodes and index in history_nodes) - ): - history.append((node.__class__.__name__, datum_, error_, index)) + if callback: + callback(index, node.__class__.__name__, datum_, error_) - if with_history: - return datum_, error_, history - else: - return datum_, error_ + return datum_, error_ def train(self, data: List[Dict[str, Any]]): """Train the nodes of the data processor. @@ -159,7 +117,7 @@ def train(self, data: List[Dict[str, Any]]): if isinstance(node, TrainableDataAction): if not node.is_trained: # Process the data up to the untrained node. - node.train(self._call_internal(data, call_up_to_node=index)[0]) + node.train(self.__call__(data, up_to_index=index)[0]) def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List: """Extracts the data on which to run the nodes. @@ -204,3 +162,63 @@ def __repr__(self): names = ", ".join(node.__class__.__name__ for node in self._nodes) return f"{self.__class__.__name__}(input_key={self._input_key}, nodes=[{names}])" + + +class DataLogger: + """Data processor logger. + + This class implements the :meth:``__call__`` method so that it can be used as a callback. + Once this instance is called with data in the data processor, + this records intermediate data generated by a specific processor node. + That can be accessed via :meth:`data` method after the processor is executed. + """ + + def __init__(self, history_nodes: Optional[List[int]] = None): + """Create new data logger. + + Args: + history_nodes: List of node index to record data. + """ + self._history = list() + self._history_nodes = history_nodes + + def __call__(self, index: int, name: str, nominals: Any, stdevs: Any): + """Record data. This is invoked by the data processor. + + Args: + index: Position of processing node in the entire processing chain. + name: Name of node. + nominals: Nominal values. + stdevs: Standard errors. + """ + if self._history_nodes is None or index in self._history_nodes: + self._history.append((name, nominals, stdevs, index)) + + def clear(self): + """Clear previous data.""" + self._history.clear() + + def data( + self, index: Optional[Union[str, int]] = None + ) -> Union[Tuple[Any, Any], List[Tuple[Any, Any]]]: + """Get intermediate data. + + Args: + index: Index of target data, either node index or node name. + Return all data if not specified. + + Returns: + A tuple of (nominal values, standard errors) or list of it. + + Raises: + DataProcessorError: When index is not found. + """ + if index is None: + return self._history + + colum = 0 if isinstance(index, str) else 3 + for data in self._history: + if data[colum] == index: + return data[1], data[2] + + raise DataProcessorError(f"Index {index} is not found.") diff --git a/test/data_processing/test_data_processing.py b/test/data_processing/test_data_processing.py index a4771a159a..efd4a9a454 100644 --- a/test/data_processing/test_data_processing.py +++ b/test/data_processing/test_data_processing.py @@ -21,7 +21,7 @@ from qiskit.result import Result from qiskit_experiments.framework import ExperimentData -from qiskit_experiments.data_processing.data_processor import DataProcessor +from qiskit_experiments.data_processing.data_processor import DataProcessor, DataLogger from qiskit_experiments.data_processing.exceptions import DataProcessorError from qiskit_experiments.data_processing.nodes import ( AverageData, @@ -84,9 +84,13 @@ def test_empty_processor(self): self.assertEqual(datum, [{"00": 4, "10": 6}]) self.assertIsNone(error) - datum, error, history = data_processor.call_with_history(self.exp_data_lvl2.data(0)) + history = DataLogger() + datum, error = data_processor( + data=self.exp_data_lvl2.data(0), + callback=history, + ) self.assertEqual(datum, [{"00": 4, "10": 6}]) - self.assertEqual(history, []) + self.assertEqual(history.data(), []) def test_to_real(self): """Test scaling and conversion to real part.""" @@ -117,13 +121,17 @@ def test_to_real(self): self.assertIsNone(error) # Test that we can call with history. - new_data, error, history = processor.call_with_history(exp_data.data(0)) + history = DataLogger() + new_data, error = processor( + data=exp_data.data(0), + callback=history, + ) self.assertEqual(exp_data.data(0), expected_old) self.assertTrue(np.allclose(new_data, expected_new)) - self.assertEqual(history[0][0], "ToReal") - self.assertTrue(np.allclose(history[0][1], expected_new)) + self.assertEqual(history.data()[0][0], "ToReal") + self.assertTrue(np.allclose(history.data()[0][1], expected_new)) # Test to real on more than one datum new_data, error = processor(exp_data.data()) @@ -172,12 +180,16 @@ def test_to_imag(self): self.assertIsNone(error) # Test that we can call with history. - new_data, error, history = processor.call_with_history(exp_data.data(0)) + history = DataLogger() + new_data, error = processor( + data=exp_data.data(0), + callback=history, + ) self.assertEqual(exp_data.data(0), expected_old) self.assertTrue(np.allclose(new_data, expected_new)) - self.assertEqual(history[0][0], "ToImag") - self.assertTrue(np.allclose(history[0][1], expected_new)) + self.assertEqual(history.data()[0][0], "ToImag") + self.assertTrue(np.allclose(history.data()[0][1], expected_new)) # Test to imaginary on more than one datum new_data, error = processor(exp_data.data())