From 149f1eb634f4f33abfb9df725b3b79e4635b4ab0 Mon Sep 17 00:00:00 2001 From: ddddddanni Date: Thu, 15 May 2025 18:37:11 -0700 Subject: [PATCH 1/5] Add run_sweep_with_readout_benchmarking method --- .../cirq/contrib/shuffle_circuits/__init__.py | 1 + ...ffle_circuits_with_readout_benchmarking.py | 214 ++++++++++-- ...circuits_with_readout_benchmarking_test.py | 320 +++++++++++++----- 3 files changed, 425 insertions(+), 110 deletions(-) diff --git a/cirq-core/cirq/contrib/shuffle_circuits/__init__.py b/cirq-core/cirq/contrib/shuffle_circuits/__init__.py index 7f97f8834a4..ee9b9777d67 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/__init__.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/__init__.py @@ -15,4 +15,5 @@ from cirq.contrib.shuffle_circuits.shuffle_circuits_with_readout_benchmarking import ( run_shuffled_with_readout_benchmarking as run_shuffled_with_readout_benchmarking, + run_sweep_with_readout_benchmarking as run_sweep_with_readout_benchmarking, ) diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py index 58b50feb7f5..f8dac4ad4bc 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py @@ -16,8 +16,9 @@ from typing import Dict, List, Optional, Tuple, Union import numpy as np +import sympy -from cirq import circuits, ops, protocols, work +from cirq import circuits, ops, protocols, work, study from cirq.experiments import SingleQubitReadoutCalibrationResult from cirq.study import ResultDict @@ -59,6 +60,24 @@ def _validate_input( raise ValueError("Must provide non-zero readout_repetitions for readout calibration.") +def _validate_input_with_sweep( + input_circuits: list[circuits.Circuit], + sweep_params: list[study.Sweepable], + circuit_repetitions: Union[int, list[int]], + rng_or_seed: Union[np.random.Generator, int], + num_random_bitstrings: int, + readout_repetitions: int, +): + """Validates the input for the run_sweep_with_readout_benchmarking function.""" + # if len(input_circuits) != len(sweep_params): + # raise ValueError( + # "Number of input parameterized circuits must match the number of sweep parameters." + # ) + return _validate_input( + input_circuits, circuit_repetitions, rng_or_seed, num_random_bitstrings, readout_repetitions + ) + + def _generate_readout_calibration_circuits( qubits: list[ops.Qid], rng: np.random.Generator, num_random_bitstrings: int ) -> tuple[list[circuits.Circuit], np.ndarray]: @@ -78,6 +97,82 @@ def _generate_readout_calibration_circuits( return readout_calibration_circuits, random_bitstrings +def _generate_parameterized_readout_calibration_circuit_with_sweep( + qubits: list[ops.Qid], rng: np.random.Generator, num_random_bitstrings: int +) -> tuple[circuits.Circuit, study.Sweepable, np.ndarray]: + """Generates a parameterized readout calibration circuit, sweep parameters, + and the random bitstrings.""" + random_bitstrings = rng.integers(0, 2, size=(num_random_bitstrings, len(qubits))) + + exp_symbols = [sympy.Symbol(f'exp_{qubit}') for qubit in qubits] + parameterized_readout_calibration_circuit = circuits.Circuit( + [ops.X(qubit) ** exp for exp, qubit in zip(exp_symbols, qubits)], ops.M(*qubits, key="m") + ) + sweep_params = [] + for bitstr in random_bitstrings: + sweep_params.append({exp: bit for exp, bit in zip(exp_symbols, bitstr)}) + + return parameterized_readout_calibration_circuit, sweep_params, random_bitstrings + + +def _generate_all_readout_calibration_circuits( + rng: np.random.Generator, + num_random_bitstrings: int, + qubits_to_measure: List[List[ops.Qid]], + is_sweep: bool, +) -> Tuple[List[circuits.Circuit], List[List[List[int]]], List[List[study.Sweepable]]]: + """Generates all readout calibration circuits and random bitstrings.""" + all_readout_calibration_circuits = [] + all_random_bitstrings = [] + all_readout_sweep_params = [] + + if num_random_bitstrings <= 0: + return all_readout_calibration_circuits, all_random_bitstrings, all_readout_sweep_params + + if not is_sweep: + for qubit_group in qubits_to_measure: + readout_calibration_circuits, random_bitstrings = ( + _generate_readout_calibration_circuits(qubit_group, rng, num_random_bitstrings) + ) + all_readout_calibration_circuits.extend(readout_calibration_circuits) + all_random_bitstrings.append(random_bitstrings) + else: + for qubit_group in qubits_to_measure: + (parameterized_readout_calibration_circuit, readout_sweep_params, random_bitstrings) = ( + _generate_parameterized_readout_calibration_circuit_with_sweep( + qubit_group, rng, num_random_bitstrings + ) + ) + all_readout_calibration_circuits.append(parameterized_readout_calibration_circuit) + all_readout_sweep_params.append([readout_sweep_params]) + all_random_bitstrings.append(random_bitstrings) + + return all_readout_calibration_circuits, all_random_bitstrings, all_readout_sweep_params + + +def _determine_qubits_to_measure( + input_circuits: list[circuits.Circuit], + qubits: Optional[Union[List[ops.Qid], List[List[ops.Qid]]]], +) -> List[List[ops.Qid]]: + """Determine the qubits to measure based on the input circuits and provided qubits.""" + # If input qubits is None, extract qubits from input circuits + qubits_to_measure: List[List[ops.Qid]] = [] + if qubits is None: + qubits_set: set[ops.Qid] = set() + for circuit in input_circuits: + qubits_set.update(circuit.all_qubits()) + qubits_to_measure = [sorted(qubits_set)] + + elif isinstance(qubits[0], ops.Qid): + qubits_to_measure = [qubits] # type: ignore + else: + qubits_to_measure = qubits # type: ignore + return qubits_to_measure + + +# def _get_rng(rng_or_seed: Union[np.random.Generator, int]) -> np.random.Generator: + + def _shuffle_circuits( all_circuits: list[circuits.Circuit], all_repetitions: list[int], rng: np.random.Generator ) -> tuple[list[circuits.Circuit], list[int], np.ndarray]: @@ -185,35 +280,21 @@ def run_shuffled_with_readout_benchmarking( input_circuits, circuit_repetitions, rng_or_seed, num_random_bitstrings, readout_repetitions ) - # If input qubits is None, extract qubits from input circuits - qubits_to_measure: List[List[ops.Qid]] = [] - if qubits is None: - qubits_set: set[ops.Qid] = set() - for circuit in input_circuits: - qubits_set.update(circuit.all_qubits()) - qubits_to_measure = [sorted(qubits_set)] - elif isinstance(qubits[0], ops.Qid): - qubits_to_measure = [qubits] # type: ignore - else: - qubits_to_measure = qubits # type: ignore + qubits_to_measure = _determine_qubits_to_measure(input_circuits, qubits) # Generate the readout calibration circuits if num_random_bitstrings>0 # Else all_readout_calibration_circuits and all_random_bitstrings are empty - all_readout_calibration_circuits = [] - all_random_bitstrings = [] - rng = ( rng_or_seed if isinstance(rng_or_seed, np.random.Generator) else np.random.default_rng(rng_or_seed) ) - if num_random_bitstrings > 0: - for qubit_group in qubits_to_measure: - readout_calibration_circuits, random_bitstrings = ( - _generate_readout_calibration_circuits(qubit_group, rng, num_random_bitstrings) - ) - all_readout_calibration_circuits.extend(readout_calibration_circuits) - all_random_bitstrings.append(random_bitstrings) + + all_readout_calibration_circuits, all_random_bitstrings, _ = ( + _generate_all_readout_calibration_circuits( + rng, num_random_bitstrings, qubits_to_measure, False + ) + ) # Shuffle the circuits if isinstance(circuit_repetitions, int): @@ -248,3 +329,92 @@ def run_shuffled_with_readout_benchmarking( start_idx = end_idx return unshuffled_input_circuits_measiurements, readout_calibration_results + + +def run_sweep_with_readout_benchmarking( + input_circuits: list[circuits.Circuit], + sweep_params: list[study.Sweepable], + sampler: work.Sampler, + circuit_repetitions: Union[int, list[int]], + rng_or_seed: Union[np.random.Generator, int], + num_random_bitstrings: int = 100, + readout_repetitions: int = 1000, + qubits: Optional[Union[List[ops.Qid], List[List[ops.Qid]]]] = None, +) -> tuple[list[list[ResultDict]], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: + """Run the sweep circuits with readout error benchmarking (no shuffling). + Args: + input_circuits: The circuits to run. + sweep_params: The sweep parameters for the input circuits. + sampler: The sampler to use. + circuit_repetitions: The repetitions for `circuits`. + rng_or_seed: A random number generator used to generate readout circuits. + Or an integer seed. + num_random_bitstrings: The number of random bitstrings for measuring readout. + If set to 0, no readout calibration circuits are generated. + readout_repetitions: The number of repetitions for each readout bitstring. + qubits: The qubits to benchmark readout errors. If None, all qubits in the + input_circuits are used. Can be a list of qubits or a list of tuples + of qubits. + Returns: + A tuple containing: + - A list of lists of dictionaries with the measurement results. + - A dictionary mapping each tuple of qubits to a SingleQubitReadoutCalibrationResult. + """ + + _validate_input_with_sweep( + input_circuits, + sweep_params, + circuit_repetitions, + rng_or_seed, + num_random_bitstrings, + readout_repetitions, + ) + + qubits_to_measure = _determine_qubits_to_measure(input_circuits, qubits) + + # Generate the readout calibration circuits (parameterized circuits) and sweep params + # if num_random_bitstrings>0 + # Else all_readout_calibration_circuits and all_random_bitstrings are empty + rng = ( + rng_or_seed + if isinstance(rng_or_seed, np.random.Generator) + else np.random.default_rng(rng_or_seed) + ) + + all_readout_calibration_circuits, all_random_bitstrings, all_readout_sweep_params = ( + _generate_all_readout_calibration_circuits( + rng, num_random_bitstrings, qubits_to_measure, True + ) + ) + + if isinstance(circuit_repetitions, int): + circuit_repetitions = [circuit_repetitions] * len(input_circuits) + all_repetitions = circuit_repetitions + [readout_repetitions] * len( + all_readout_calibration_circuits + ) + + # Run the sweep circuits and measure + results = sampler.run_batch( + input_circuits + all_readout_calibration_circuits, + sweep_params + all_readout_sweep_params, + repetitions=all_repetitions, + ) + + timestamp = time.time() + + input_circuits_measiurements = results[: len(input_circuits)] + readout_measurements = results[len(input_circuits) :] + + # Analyze results + readout_calibration_results = {} + i = 0 + for qubit_group, random_bitstrings in zip(qubits_to_measure, all_random_bitstrings): + group_measurements = readout_measurements[i] + i += 1 + + calibration_result = _analyze_readout_results( + group_measurements, random_bitstrings, readout_repetitions, qubit_group, timestamp + ) + readout_calibration_results[tuple(qubit_group)] = calibration_result + + return input_circuits_measiurements, readout_calibration_results diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py index e73a77a2ccc..77b269019e5 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py @@ -15,6 +15,7 @@ import numpy as np import pytest +import sympy import cirq from cirq.experiments import ( @@ -47,32 +48,85 @@ def _create_test_circuits(qubits: list[cirq.Qid], n_circuits: int) -> list[cirq. return input_circuits -def test_shuffled_circuits_with_readout_benchmarking_errors_no_noise(): - """Test shuffled circuits with readout benchmarking with no noise from sampler.""" - qubits = cirq.LineQubit.range(5) +def _create_test_circuits_with_sweep(qubits: list[cirq.Qid], n_circuits: int) -> list[cirq.Circuit]: + """Helper function to generate sweep circuits for testing.""" + if len(qubits) < 2: + raise ValueError( + "Need at least two qubits to generate two-qubit circuits." + ) # pragma: no cover + theta_symbol = sympy.Symbol('theta') + phi_symbol = sympy.Symbol('phi') + + two_qubit_gates = [cirq.ISWAP, cirq.CNOT] - # Generate random input circuits - input_circuits = _create_test_circuits(qubits, 3) + input_circuits = [] + sweep_params = [] + qubit_pairs = list(itertools.combinations(qubits, 2)) + num_pairs = len(qubit_pairs) + for i in range(n_circuits): + gate = two_qubit_gates[i % len(two_qubit_gates)] + q0, q1 = qubit_pairs[i % num_pairs] + circuits = rqcg.generate_library_of_2q_circuits( + n_library_circuits=5, two_qubit_gate=gate, q0=q0, q1=q1 + ) + for circuit in circuits: + circuit += cirq.Circuit(cirq.X(q0) ** theta_symbol, cirq.Y(q1) ** phi_symbol) + circuit.append(cirq.measure(*qubits, key="m")) + sweep_params.append({'theta': 0, 'phi': 1}) + input_circuits.extend(circuits) + print(len(input_circuits)) + print(len(sweep_params)) + return input_circuits, sweep_params + + +@pytest.mark.parametrize("mode", ["shuffled", "sweep"]) +def test_circuits_with_readout_benchmarking_errors_no_noise(mode: str): + """Test shuffled/sweep circuits with readout benchmarking with no noise from sampler.""" + qubits = cirq.LineQubit.range(5) sampler = cirq.Simulator() circuit_repetitions = 1 # allow passing a seed rng = 123 readout_repetitions = 1000 + num_random_bitstrings = 100 + + readout_calibration_results = {} + + if mode == "shuffled": + input_circuits = _create_test_circuits(qubits, 3) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) + ) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, + for measurement in measurements: + assert isinstance(measurement, ResultDict) + elif mode == "sweep": + input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 3) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) ) - ) - for measurement in measurements: - assert isinstance(measurement, ResultDict) + for measurement in measurements: + for single_sweep_measurement in measurement: + assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): assert isinstance(qlist, tuple) @@ -85,31 +139,52 @@ def test_shuffled_circuits_with_readout_benchmarking_errors_no_noise(): assert isinstance(readout_calibration_result.timestamp, float) -def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise(): - """Test shuffled circuits with readout benchmarking with noise from sampler.""" +@pytest.mark.parametrize("mode", ["shuffled", "sweep"]) +def test_circuits_with_readout_benchmarking_errors_with_noise(mode: str): + """Test shuffled/sweep circuits with readout benchmarking with noise from sampler.""" qubits = cirq.LineQubit.range(6) - - # Generate random input circuits - input_circuits = _create_test_circuits(qubits, 6) - sampler = NoisySingleQubitReadoutSampler(p0=0.1, p1=0.2, seed=1234) circuit_repetitions = 1 rng = np.random.default_rng() readout_repetitions = 1000 + num_random_bitstrings = 100 + + readout_calibration_results = {} + + if mode == "shuffled": + input_circuits = _create_test_circuits(qubits, 6) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) + ) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, + for measurement in measurements: + assert isinstance(measurement, ResultDict) + elif mode == "sweep": + input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 6) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) ) - ) - for measurement in measurements: - assert isinstance(measurement, ResultDict) + for measurement in measurements: + for single_sweep_measurement in measurement: + assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): assert isinstance(qlist, tuple) @@ -124,33 +199,54 @@ def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise(): assert isinstance(readout_calibration_result.timestamp, float) -def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise_and_input_qubits(): - """Test shuffled circuits with readout benchmarking with noise from sampler and input qubits.""" +@pytest.mark.parametrize("mode", ["shuffled", "sweep"]) +def test_circuits_with_readout_benchmarking_errors_with_noise_and_input_qubits(mode: str): + """Test shuffled/sweep circuits with readout benchmarking with noise from sampler and input qubits.""" qubits = cirq.LineQubit.range(6) readout_qubits = qubits[:4] - # Generate random input circuits - input_circuits = _create_test_circuits(qubits, 6) - sampler = NoisySingleQubitReadoutSampler(p0=0.1, p1=0.3, seed=1234) circuit_repetitions = 1 rng = np.random.default_rng() readout_repetitions = 1000 - - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, + num_random_bitstrings = 100 + + readout_calibration_results = {} + + if mode == "shuffled": + input_circuits = _create_test_circuits(qubits, 6) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) ) - ) - - for measurement in measurements: - assert isinstance(measurement, ResultDict) + for measurement in measurements: + assert isinstance(measurement, ResultDict) + + elif mode == "sweep": + input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 6) + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) + ) + for measurement in measurements: + for single_sweep_measurement in measurement: + assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): assert isinstance(qlist, tuple) @@ -165,35 +261,58 @@ def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise_and_input assert isinstance(readout_calibration_result.timestamp, float) -def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise_and_lists_input_qubits(): - """Test shuffled circuits with readout benchmarking with noise from sampler and input qubits.""" +@pytest.mark.parametrize("mode", ["shuffled", "sweep"]) +def test_circuits_with_readout_benchmarking_errors_with_noise_and_lists_input_qubits(mode: str): + """Test shuffled/sweep circuits with readout benchmarking with noise from sampler and input qubits.""" qubits_1 = cirq.LineQubit.range(3) qubits_2 = cirq.LineQubit.range(4) - readout_qubits = [qubits_1, qubits_2] - # Generate random input circuits and append measurements - input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) - sampler = NoisySingleQubitReadoutSampler(p0=0.1, p1=0.3, seed=1234) circuit_repetitions = 1 rng = np.random.default_rng() readout_repetitions = 1000 - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, + if mode == "shuffled": + input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=100, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) + ) + + for measurement in measurements: + assert isinstance(measurement, ResultDict) + + elif mode == "sweep": + input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits_1, 6) + additional_circuits, additional_sweep_params = _create_test_circuits_with_sweep(qubits_2, 4) + input_circuits += additional_circuits + sweep_params += additional_sweep_params + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=100, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) ) - ) - for measurement in measurements: - assert isinstance(measurement, ResultDict) + for measurement in measurements: + for single_sweep_measurement in measurement: + assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): assert isinstance(qlist, tuple) @@ -208,35 +327,60 @@ def test_shuffled_circuits_with_readout_benchmarking_errors_with_noise_and_lists assert isinstance(readout_calibration_result.timestamp, float) -def test_can_handle_zero_random_bitstring(): - """Test shuffled circuits without readout benchmarking.""" +@pytest.mark.parametrize("mode", ["shuffled", "sweep"]) +def test_can_handle_zero_random_bitstring(mode: str): + """Test shuffled/sweep circuits without readout benchmarking.""" qubits_1 = cirq.LineQubit.range(3) qubits_2 = cirq.LineQubit.range(4) - readout_qubits = [qubits_1, qubits_2] - # Generate random input circuits and append measurements - input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) - sampler = NoisySingleQubitReadoutSampler(p0=0.1, p1=0.3, seed=1234) circuit_repetitions = 1 rng = np.random.default_rng() readout_repetitions = 1000 + readout_calibration_results = {} + + if mode == "shuffled": + input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=0, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) + ) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=0, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, + for measurement in measurements: + assert isinstance(measurement, ResultDict) + + elif mode == "sweep": + input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits_1, 6) + additional_circuits, additional_sweep_params = _create_test_circuits_with_sweep(qubits_2, 4) + input_circuits += additional_circuits + sweep_params += additional_sweep_params + + measurements, readout_calibration_results = ( + cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=0, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, + ) ) - ) - for measurement in measurements: - assert isinstance(measurement, ResultDict) + for measurement in measurements: + for single_sweep_measurement in measurement: + assert isinstance(single_sweep_measurement, ResultDict) + # Check that the readout_calibration_results is empty assert len(readout_calibration_results.items()) == 0 From 8a535ef6b4ebd2cbdcd64f395238dc6453f5ee58 Mon Sep 17 00:00:00 2001 From: ddddddanni Date: Mon, 19 May 2025 17:02:03 -0700 Subject: [PATCH 2/5] Allow the measure_pauli_strings function to use sweep --- ...ing_measurement_with_readout_mitigation.py | 148 +++++++++++++---- ...easurement_with_readout_mitigation_test.py | 151 +++++++++++------- 2 files changed, 211 insertions(+), 88 deletions(-) diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py index c8232420b71..34d0217906e 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py @@ -18,9 +18,13 @@ import attrs import numpy as np +import sympy -from cirq import circuits, ops, work -from cirq.contrib.shuffle_circuits import run_shuffled_with_readout_benchmarking +from cirq import circuits, ops, work, study +from cirq.contrib.shuffle_circuits import ( + run_shuffled_with_readout_benchmarking, + run_sweep_with_readout_benchmarking, +) from cirq.experiments import SingleQubitReadoutCalibrationResult from cirq.experiments.readout_confusion_matrix import TensoredConfusionMatrices from cirq.study import ResultDict @@ -227,6 +231,74 @@ def _pauli_strings_to_basis_change_ops( return operations +def _pauli_strings_to_basis_change_with_sweep( + pauli_strings: list[ops.PauliString], qid_list: list[ops.Qid] +) -> list[Dict[str, float]]: + params_dict = {} + + for qid, qubit in enumerate(qid_list): + params_dict[f"phi{qid}"] = 1 + params_dict[f"theta{qid}"] = 0 + for pauli_str in pauli_strings: + pauli_op = pauli_str.get(qubit, default=ops.I) + if pauli_op == ops.X: + params_dict[f"phi{qid}"] = 0 + params_dict[f"theta{qid}"] = 1 / 2 + break + elif pauli_op == ops.Y: + params_dict[f"phi{qid}"] = 1 + params_dict[f"theta{qid}"] = 1 / 2 + break + return params_dict + + +def _generate_basis_change_circuits( + normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], +): + pauli_measurement_circuits = list[circuits.Circuit]() + + for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): + qid_list = list(sorted(input_circuit.all_qubits())) + basis_change_circuits = [] + input_circuit_unfrozen = input_circuit.unfreeze() + for pauli_strings in pauli_string_groups: + basis_change_circuit = ( + input_circuit_unfrozen + + _pauli_strings_to_basis_change_ops(pauli_strings, qid_list) + + ops.measure(*qid_list, key="m") + ) + basis_change_circuits.append(basis_change_circuit) + pauli_measurement_circuits.extend(basis_change_circuits) + + return pauli_measurement_circuits + + +def _generate_basis_change_circuits_with_sweep( + normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], +) -> Tuple[list[circuits.Circuit], list[study.Sweepable]]: + parameterized_circuits = list[circuits.Circuit]() + sweep_params = list[study.Sweepable]() + for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): + qid_list = list(sorted(input_circuit.all_qubits())) + phi_symbols = sympy.symbols(f"phi:{len(qid_list)}") + theta_symbols = sympy.symbols(f"theta:{len(qid_list)}") + + parameterized_circuit = input_circuit.unfreeze() + circuits.Circuit( + [ + ops.PhasedXPowGate(phase_exponent=(a - 1) / 2, exponent=b)(qubit) + for a, b, qubit in zip(phi_symbols, theta_symbols, qid_list) + ], + ops.M(*qid_list, key="m"), + ) + sweep_param = [] + for pauli_strings in pauli_string_groups: + sweep_param.append(_pauli_strings_to_basis_change_with_sweep(pauli_strings, qid_list)) + sweep_params.append(sweep_param) + parameterized_circuits.append(parameterized_circuit) + + return parameterized_circuits, sweep_params + + def _build_one_qubit_confusion_matrix(e0: float, e1: float) -> np.ndarray: """Builds a 2x2 confusion matrix for a single qubit. @@ -371,6 +443,7 @@ def measure_pauli_strings( readout_repetitions: int, num_random_bitstrings: int, rng_or_seed: Union[np.random.Generator, int], + use_sweep: bool = False, ) -> List[CircuitToPauliStringsMeasurementResult]: """Measures expectation values of Pauli strings on given circuits with/without readout error mitigation. @@ -400,6 +473,8 @@ def measure_pauli_strings( num_random_bitstrings: The number of random bitstrings to use in readout benchmarking. rng_or_seed: A random number generator or seed for the readout benchmarking. + use_sweep: If True, the function will use parameterized circuits and sweep + parameters. Returns: A list of CircuitToPauliStringsMeasurementResult objects, where each object contains: @@ -429,45 +504,62 @@ def measure_pauli_strings( # Build the basis-change circuits for each Pauli string group pauli_measurement_circuits = list[circuits.Circuit]() - for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): - qid_list = list(sorted(input_circuit.all_qubits())) - basis_change_circuits = [] - input_circuit_unfrozen = input_circuit.unfreeze() - for pauli_strings in pauli_string_groups: - basis_change_circuit = ( - input_circuit_unfrozen - + _pauli_strings_to_basis_change_ops(pauli_strings, qid_list) - + ops.measure(*qid_list, key="m") - ) - basis_change_circuits.append(basis_change_circuit) - pauli_measurement_circuits.extend(basis_change_circuits) + sweep_params = list[study.Sweepable]() + circuits_results = list[list[ResultDict]]() + calibration_results = list[Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]() - # Run shuffled benchmarking for readout calibration - circuits_results, calibration_results = run_shuffled_with_readout_benchmarking( - input_circuits=pauli_measurement_circuits, - sampler=sampler, - circuit_repetitions=pauli_repetitions, - rng_or_seed=rng_or_seed, - qubits=[list(qubits) for qubits in qubits_list], - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - ) + if use_sweep: + pauli_measurement_circuits, sweep_params = _generate_basis_change_circuits_with_sweep( + normalized_circuits_to_pauli + ) + + # Run benchmarking using sweep for readout calibration + circuits_results, calibration_results = run_sweep_with_readout_benchmarking( + input_circuits=pauli_measurement_circuits, + sweep_params=sweep_params, + sampler=sampler, + circuit_repetitions=pauli_repetitions, + rng_or_seed=rng_or_seed, + qubits=[list(qubits) for qubits in qubits_list], + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) + + else: + pauli_measurement_circuits = _generate_basis_change_circuits(normalized_circuits_to_pauli) + + # Run shuffled benchmarking for readout calibration + circuits_results, calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits=pauli_measurement_circuits, + sampler=sampler, + circuit_repetitions=pauli_repetitions, + rng_or_seed=rng_or_seed, + qubits=[list(qubits) for qubits in qubits_list], + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + ) # Process the results to calculate expectation values results: List[CircuitToPauliStringsMeasurementResult] = [] circuit_result_index = 0 - for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): + for i, (input_circuit, pauli_string_groups) in enumerate(normalized_circuits_to_pauli.items()): qubits_in_circuit = tuple(sorted(input_circuit.all_qubits())) disable_readout_mitigation = False if num_random_bitstrings != 0 else True + circuits_results_for_group = [] + if use_sweep: + circuits_results_for_group = circuits_results[i] + else: + circuits_results_for_group = circuits_results[ + circuit_result_index : circuit_result_index + len(pauli_string_groups) + ] + pauli_measurement_results = _process_pauli_measurement_results( list(qubits_in_circuit), pauli_string_groups, - circuits_results[ - circuit_result_index : circuit_result_index + len(pauli_string_groups) - ], + circuits_results_for_group, calibration_results, pauli_repetitions, time.time(), diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py index d0e7cdf4b51..c683ad21452 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py @@ -18,6 +18,7 @@ import numpy as np import pytest +import sympy import cirq from cirq.contrib.paulistring import measure_pauli_strings @@ -33,6 +34,26 @@ def _create_ghz(number_of_qubits: int, qubits: Sequence[cirq.Qid]) -> cirq.Circu return ghz_circuit +def _create_sweep_circuit( + number_of_qubits: int, qubits: Sequence[cirq.Qid], number_of_sweeps: int +) -> tuple[cirq.Circuit, cirq.Sweepable]: + sweep_params = [] + + theta_symbols = sympy.symbols(f'theta:{number_of_qubits}') + single_qubit_gates = [cirq.X, cirq.Y, cirq.Z, cirq.H, cirq.I] + + circuit = _create_ghz(number_of_qubits, qubits) + for _ in range(number_of_sweeps): + for i in range(number_of_qubits): + gate = random.choice(single_qubit_gates) + circuit += gate(qubits[i]) ** theta_symbols[i] + sweep_params += [ + {theta_symbols[i]: random.uniform(0, 2 * np.pi) for i in range(number_of_qubits)} + ] + + return circuit, sweep_params + + def _generate_random_pauli_string( qubits: Sequence[cirq.Qid], enable_coeff: bool = False, allow_pauli_i: bool = True ): @@ -102,55 +123,57 @@ def _ideal_expectation_based_on_pauli_string( ) -def test_pauli_string_measurement_errors_no_noise() -> None: - """Test that the mitigated expectation is close to the ideal expectation - based on the Pauli string""" - - qubits = cirq.LineQubit.range(5) - circuit = cirq.FrozenCircuit(_create_ghz(5, qubits)) - sampler = cirq.Simulator() - - circuits_to_pauli: Dict[cirq.FrozenCircuit, list[cirq.PauliString]] = {} - circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits) for _ in range(3)] - - circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, 1000 - ) - - for circuit_with_pauli_expectations in circuits_with_pauli_expectations: - assert isinstance(circuit_with_pauli_expectations.circuit, cirq.FrozenCircuit) - - expected_val_simulation = sampler.simulate( - circuit_with_pauli_expectations.circuit.unfreeze() - ) - final_state_vector = expected_val_simulation.final_state_vector - - for pauli_string_measurement_results in circuit_with_pauli_expectations.results: - # Since there is no noise, the mitigated and unmitigated expectations should be the same - assert np.isclose( - pauli_string_measurement_results.mitigated_expectation, - pauli_string_measurement_results.unmitigated_expectation, - ) - assert np.isclose( - pauli_string_measurement_results.mitigated_expectation, - _ideal_expectation_based_on_pauli_string( - pauli_string_measurement_results.pauli_string, final_state_vector - ), - atol=4 * pauli_string_measurement_results.mitigated_stddev, - ) - assert isinstance( - pauli_string_measurement_results.calibration_result, - SingleQubitReadoutCalibrationResult, - ) - assert pauli_string_measurement_results.calibration_result.zero_state_errors == { - q: 0 for q in pauli_string_measurement_results.pauli_string.qubits - } - assert pauli_string_measurement_results.calibration_result.one_state_errors == { - q: 0 for q in pauli_string_measurement_results.pauli_string.qubits - } - - -def test_pauli_string_measurement_errors_with_coefficient_no_noise() -> None: +# @pytest.mark.parametrize("use_sweep", [True, False]) +# def test_pauli_string_measurement_errors_no_noise(use_sweep: bool) -> None: +# """Test that the mitigated expectation is close to the ideal expectation +# based on the Pauli string""" + +# qubits = cirq.LineQubit.range(10) +# circuit = cirq.FrozenCircuit(_create_ghz(10, qubits)) +# sampler = cirq.Simulator() + +# circuits_to_pauli: Dict[cirq.FrozenCircuit, list[cirq.PauliString]] = {} +# circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits) for _ in range(3)] + +# circuits_with_pauli_expectations = measure_pauli_strings( +# circuits_to_pauli, sampler, 100, 100, 100, 100, use_sweep +# ) + +# for circuit_with_pauli_expectations in circuits_with_pauli_expectations: +# assert isinstance(circuit_with_pauli_expectations.circuit, cirq.FrozenCircuit) + +# expected_val_simulation = sampler.simulate( +# circuit_with_pauli_expectations.circuit.unfreeze() +# ) +# final_state_vector = expected_val_simulation.final_state_vector + +# for pauli_string_measurement_results in circuit_with_pauli_expectations.results: +# # Since there is no noise, the mitigated and unmitigated expectations should be the same +# assert np.isclose( +# pauli_string_measurement_results.mitigated_expectation, +# pauli_string_measurement_results.unmitigated_expectation, +# ) +# assert np.isclose( +# pauli_string_measurement_results.mitigated_expectation, +# _ideal_expectation_based_on_pauli_string( +# pauli_string_measurement_results.pauli_string, final_state_vector +# ), +# atol=4 * pauli_string_measurement_results.mitigated_stddev, +# ) +# assert isinstance( +# pauli_string_measurement_results.calibration_result, +# SingleQubitReadoutCalibrationResult, +# ) +# assert pauli_string_measurement_results.calibration_result.zero_state_errors == { +# q: 0 for q in pauli_string_measurement_results.pauli_string.qubits +# } +# assert pauli_string_measurement_results.calibration_result.one_state_errors == { +# q: 0 for q in pauli_string_measurement_results.pauli_string.qubits +# } + + +@pytest.mark.parametrize("use_sweep", [True]) +def test_pauli_string_measurement_errors_with_coefficient_no_noise(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the Pauli string""" @@ -162,7 +185,7 @@ def test_pauli_string_measurement_errors_with_coefficient_no_noise() -> None: circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits, True) for _ in range(3)] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, 1000 + circuits_to_pauli, sampler, 1000, 1000, 1000, 1000, use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -198,7 +221,8 @@ def test_pauli_string_measurement_errors_with_coefficient_no_noise() -> None: } -def test_group_pauli_string_measurement_errors_no_noise_with_coefficient() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_group_pauli_string_measurement_errors_no_noise_with_coefficient(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the group of Pauli strings""" @@ -216,7 +240,7 @@ def test_group_pauli_string_measurement_errors_no_noise_with_coefficient() -> No circuits_to_pauli[circuit].append([cirq.PauliString({q: cirq.X for q in qubits})]) circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 100, 100, 100, 100 + circuits_to_pauli, sampler, 1000, 1000, 1000, 1000, use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -252,7 +276,8 @@ def test_group_pauli_string_measurement_errors_no_noise_with_coefficient() -> No } -def test_pauli_string_measurement_errors_with_noise() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_pauli_string_measurement_errors_with_noise(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the Pauli string""" qubits = cirq.LineQubit.range(7) @@ -264,7 +289,7 @@ def test_pauli_string_measurement_errors_with_noise() -> None: circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits) for _ in range(3)] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -299,7 +324,8 @@ def test_pauli_string_measurement_errors_with_noise() -> None: assert 0.0045 < error < 0.0055 -def test_group_pauli_string_measurement_errors_with_noise() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_group_pauli_string_measurement_errors_with_noise(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the group Pauli strings""" qubits = cirq.LineQubit.range(7) @@ -350,7 +376,8 @@ def test_group_pauli_string_measurement_errors_with_noise() -> None: assert 0.0045 < error < 0.0055 -def test_many_circuits_input_measurement_with_noise() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_many_circuits_input_measurement_with_noise(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the Pauli string for multiple circuits""" qubits_1 = cirq.LineQubit.range(3) @@ -409,7 +436,8 @@ def test_many_circuits_input_measurement_with_noise() -> None: assert 0.0045 < error < 0.0055 -def test_allow_measurement_without_readout_mitigation() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_allow_measurement_without_readout_mitigation(use_sweep: bool) -> None: """Test that the function allows to measure without error mitigation""" qubits = cirq.LineQubit.range(7) circuit = cirq.FrozenCircuit(_create_ghz(7, qubits)) @@ -439,7 +467,8 @@ def test_allow_measurement_without_readout_mitigation() -> None: assert pauli_string_measurement_results.calibration_result is None -def test_allow_group_pauli_measurement_without_readout_mitigation() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_allow_group_pauli_measurement_without_readout_mitigation(use_sweep: bool) -> None: """Test that the function allows to measure without error mitigation""" qubits = cirq.LineQubit.range(7) circuit = cirq.FrozenCircuit(_create_ghz(7, qubits)) @@ -469,7 +498,8 @@ def test_allow_group_pauli_measurement_without_readout_mitigation() -> None: assert pauli_string_measurement_results.calibration_result is None -def test_many_circuits_with_coefficient() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_many_circuits_with_coefficient(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the Pauli string for multiple circuits""" qubits_1 = cirq.LineQubit.range(3) @@ -528,7 +558,8 @@ def test_many_circuits_with_coefficient() -> None: assert 0.0045 < error < 0.0055 -def test_many_group_pauli_in_circuits_with_coefficient() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_many_group_pauli_in_circuits_with_coefficient(use_sweep: bool) -> None: """Test that the mitigated expectation is close to the ideal expectation based on the Pauli string for multiple circuits""" qubits_1 = cirq.LineQubit.range(3) From 8157b579f05ca014deb57b43180f808f2949fd14 Mon Sep 17 00:00:00 2001 From: ddddddanni Date: Wed, 21 May 2025 21:46:33 -0700 Subject: [PATCH 3/5] format --- ...ing_measurement_with_readout_mitigation.py | 34 ++- ...easurement_with_readout_mitigation_test.py | 212 ++++++++------ ...ffle_circuits_with_readout_benchmarking.py | 45 ++- ...circuits_with_readout_benchmarking_test.py | 261 +++++++++--------- 4 files changed, 303 insertions(+), 249 deletions(-) diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py index 34d0217906e..df4784cb4c5 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py @@ -20,7 +20,7 @@ import numpy as np import sympy -from cirq import circuits, ops, work, study +from cirq import circuits, ops, study, work from cirq.contrib.shuffle_circuits import ( run_shuffled_with_readout_benchmarking, run_sweep_with_readout_benchmarking, @@ -199,7 +199,7 @@ def _normalize_input_paulis( circuits_to_pauli: Union[ Dict[circuits.FrozenCircuit, list[ops.PauliString]], Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], - ], + ] ) -> Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]]: first_value = next(iter(circuits_to_pauli.values())) if ( @@ -234,27 +234,29 @@ def _pauli_strings_to_basis_change_ops( def _pauli_strings_to_basis_change_with_sweep( pauli_strings: list[ops.PauliString], qid_list: list[ops.Qid] ) -> list[Dict[str, float]]: + """Decide single-qubit rotation sweep parameters for basis change.""" params_dict = {} for qid, qubit in enumerate(qid_list): - params_dict[f"phi{qid}"] = 1 - params_dict[f"theta{qid}"] = 0 + params_dict[f"phi{qid}"] = 1.0 + params_dict[f"theta{qid}"] = 0.0 for pauli_str in pauli_strings: pauli_op = pauli_str.get(qubit, default=ops.I) if pauli_op == ops.X: - params_dict[f"phi{qid}"] = 0 + params_dict[f"phi{qid}"] = 0.0 params_dict[f"theta{qid}"] = 1 / 2 break elif pauli_op == ops.Y: - params_dict[f"phi{qid}"] = 1 + params_dict[f"phi{qid}"] = 1.0 params_dict[f"theta{qid}"] = 1 / 2 break return params_dict def _generate_basis_change_circuits( - normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], -): + normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]] +) -> list[circuits.Circuit]: + """Generates basis change circuits for each group of Pauli strings.""" pauli_measurement_circuits = list[circuits.Circuit]() for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): @@ -274,8 +276,9 @@ def _generate_basis_change_circuits( def _generate_basis_change_circuits_with_sweep( - normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], + normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]] ) -> Tuple[list[circuits.Circuit], list[study.Sweepable]]: + """Generates basis change circuits for each group of Pauli strings with sweep.""" parameterized_circuits = list[circuits.Circuit]() sweep_params = list[study.Sweepable]() for input_circuit, pauli_string_groups in normalized_circuits_to_pauli.items(): @@ -452,7 +455,8 @@ def measure_pauli_strings( For each circuit and its associated list of QWC pauli string group, it: 1. Constructs circuits to measure the Pauli string expectation value by adding basis change moments and measurement operations. - 2. Runs shuffled readout benchmarking on these circuits to calibrate readout errors. + 2. If `num_random_bitstrings` is greater than zero, performing readout + benchmarking (shuffled or sweep-based) to calibrate readout errors. 3. Mitigates readout errors using the calibrated confusion matrices. 4. Calculates and returns both error-mitigated and unmitigated expectation values for each Pauli string. @@ -473,8 +477,8 @@ def measure_pauli_strings( num_random_bitstrings: The number of random bitstrings to use in readout benchmarking. rng_or_seed: A random number generator or seed for the readout benchmarking. - use_sweep: If True, the function will use parameterized circuits and sweep - parameters. + use_sweep: If True, uses parameterized circuits and sweeps parameters + for both Pauli measurements and readout benchmarking. Defaults to False.. Returns: A list of CircuitToPauliStringsMeasurementResult objects, where each object contains: @@ -505,8 +509,8 @@ def measure_pauli_strings( # Build the basis-change circuits for each Pauli string group pauli_measurement_circuits = list[circuits.Circuit]() sweep_params = list[study.Sweepable]() - circuits_results = list[list[ResultDict]]() - calibration_results = list[Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]() + circuits_results: Union[list[ResultDict], list[list[ResultDict]]] = [] + # calibration_results = list[Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]() if use_sweep: pauli_measurement_circuits, sweep_params = _generate_basis_change_circuits_with_sweep( @@ -555,6 +559,7 @@ def measure_pauli_strings( circuits_results_for_group = circuits_results[ circuit_result_index : circuit_result_index + len(pauli_string_groups) ] + circuit_result_index += len(pauli_string_groups) pauli_measurement_results = _process_pauli_measurement_results( list(qubits_in_circuit), @@ -571,5 +576,4 @@ def measure_pauli_strings( ) ) - circuit_result_index += len(pauli_string_groups) return results diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py index c683ad21452..95fa202b51e 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py @@ -123,53 +123,53 @@ def _ideal_expectation_based_on_pauli_string( ) -# @pytest.mark.parametrize("use_sweep", [True, False]) -# def test_pauli_string_measurement_errors_no_noise(use_sweep: bool) -> None: -# """Test that the mitigated expectation is close to the ideal expectation -# based on the Pauli string""" - -# qubits = cirq.LineQubit.range(10) -# circuit = cirq.FrozenCircuit(_create_ghz(10, qubits)) -# sampler = cirq.Simulator() - -# circuits_to_pauli: Dict[cirq.FrozenCircuit, list[cirq.PauliString]] = {} -# circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits) for _ in range(3)] - -# circuits_with_pauli_expectations = measure_pauli_strings( -# circuits_to_pauli, sampler, 100, 100, 100, 100, use_sweep -# ) - -# for circuit_with_pauli_expectations in circuits_with_pauli_expectations: -# assert isinstance(circuit_with_pauli_expectations.circuit, cirq.FrozenCircuit) - -# expected_val_simulation = sampler.simulate( -# circuit_with_pauli_expectations.circuit.unfreeze() -# ) -# final_state_vector = expected_val_simulation.final_state_vector - -# for pauli_string_measurement_results in circuit_with_pauli_expectations.results: -# # Since there is no noise, the mitigated and unmitigated expectations should be the same -# assert np.isclose( -# pauli_string_measurement_results.mitigated_expectation, -# pauli_string_measurement_results.unmitigated_expectation, -# ) -# assert np.isclose( -# pauli_string_measurement_results.mitigated_expectation, -# _ideal_expectation_based_on_pauli_string( -# pauli_string_measurement_results.pauli_string, final_state_vector -# ), -# atol=4 * pauli_string_measurement_results.mitigated_stddev, -# ) -# assert isinstance( -# pauli_string_measurement_results.calibration_result, -# SingleQubitReadoutCalibrationResult, -# ) -# assert pauli_string_measurement_results.calibration_result.zero_state_errors == { -# q: 0 for q in pauli_string_measurement_results.pauli_string.qubits -# } -# assert pauli_string_measurement_results.calibration_result.one_state_errors == { -# q: 0 for q in pauli_string_measurement_results.pauli_string.qubits -# } +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_pauli_string_measurement_errors_no_noise(use_sweep: bool) -> None: + """Test that the mitigated expectation is close to the ideal expectation + based on the Pauli string""" + + qubits = cirq.LineQubit.range(5) + circuit = cirq.FrozenCircuit(_create_ghz(5, qubits)) + sampler = cirq.Simulator() + + circuits_to_pauli: Dict[cirq.FrozenCircuit, list[cirq.PauliString]] = {} + circuits_to_pauli[circuit] = [_generate_random_pauli_string(qubits) for _ in range(3)] + + circuits_with_pauli_expectations = measure_pauli_strings( + circuits_to_pauli, sampler, 1000, 1000, 1000, 1000, use_sweep + ) + + for circuit_with_pauli_expectations in circuits_with_pauli_expectations: + assert isinstance(circuit_with_pauli_expectations.circuit, cirq.FrozenCircuit) + + expected_val_simulation = sampler.simulate( + circuit_with_pauli_expectations.circuit.unfreeze() + ) + final_state_vector = expected_val_simulation.final_state_vector + + for pauli_string_measurement_results in circuit_with_pauli_expectations.results: + # Since there is no noise, the mitigated and unmitigated expectations should be the same + assert np.isclose( + pauli_string_measurement_results.mitigated_expectation, + pauli_string_measurement_results.unmitigated_expectation, + ) + assert np.isclose( + pauli_string_measurement_results.mitigated_expectation, + _ideal_expectation_based_on_pauli_string( + pauli_string_measurement_results.pauli_string, final_state_vector + ), + atol=4 * pauli_string_measurement_results.mitigated_stddev, + ) + assert isinstance( + pauli_string_measurement_results.calibration_result, + SingleQubitReadoutCalibrationResult, + ) + assert pauli_string_measurement_results.calibration_result.zero_state_errors == { + q: 0 for q in pauli_string_measurement_results.pauli_string.qubits + } + assert pauli_string_measurement_results.calibration_result.one_state_errors == { + q: 0 for q in pauli_string_measurement_results.pauli_string.qubits + } @pytest.mark.parametrize("use_sweep", [True]) @@ -240,7 +240,7 @@ def test_group_pauli_string_measurement_errors_no_noise_with_coefficient(use_swe circuits_to_pauli[circuit].append([cirq.PauliString({q: cirq.X for q in qubits})]) circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, 1000, use_sweep + circuits_to_pauli, sampler, 500, 500, 500, 500, use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -341,7 +341,7 @@ def test_group_pauli_string_measurement_errors_with_noise(use_sweep: bool) -> No ] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 800, 1000, 800, np.random.default_rng() + circuits_to_pauli, sampler, 800, 1000, 800, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -403,7 +403,7 @@ def test_many_circuits_input_measurement_with_noise(use_sweep: bool) -> None: simulator = cirq.Simulator() circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -451,7 +451,7 @@ def test_allow_measurement_without_readout_mitigation(use_sweep: bool) -> None: ] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 0, np.random.default_rng() + circuits_to_pauli, sampler, 1000, 1000, 0, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -482,7 +482,7 @@ def test_allow_group_pauli_measurement_without_readout_mitigation(use_sweep: boo ] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 100, 100, 0, np.random.default_rng() + circuits_to_pauli, sampler, 100, 100, 0, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -525,7 +525,7 @@ def test_many_circuits_with_coefficient(use_sweep: bool) -> None: simulator = cirq.Simulator() circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -597,7 +597,7 @@ def test_many_group_pauli_in_circuits_with_coefficient(use_sweep: bool) -> None: simulator = cirq.Simulator() circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -630,7 +630,8 @@ def test_many_group_pauli_in_circuits_with_coefficient(use_sweep: bool) -> None: assert 0.0045 < error < 0.0055 -def test_coefficient_not_real_number() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_coefficient_not_real_number(use_sweep: bool) -> None: """Test that the coefficient of input pauli string is not real. Should return error in this case""" qubits_1 = cirq.LineQubit.range(3) @@ -650,11 +651,18 @@ def test_coefficient_not_real_number() -> None: "non-Hermitian PauliString. Coefficient must be real.", ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + np.random.default_rng(), + use_sweep, ) -def test_empty_input_circuits_to_pauli_mapping() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_empty_input_circuits_to_pauli_mapping(use_sweep: bool) -> None: """Test that the input circuits are empty.""" with pytest.raises(ValueError, match="Input circuits must not be empty."): @@ -665,10 +673,12 @@ def test_empty_input_circuits_to_pauli_mapping() -> None: 1000, 1000, np.random.default_rng(), + use_sweep, ) -def test_invalid_input_circuit_type() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_invalid_input_circuit_type(use_sweep: bool) -> None: """Test that the input circuit type is not frozen circuit""" qubits = cirq.LineQubit.range(5) @@ -684,10 +694,12 @@ def test_invalid_input_circuit_type() -> None: 1000, 1000, np.random.default_rng(), + use_sweep, ) -def test_invalid_input_pauli_string_type() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_invalid_input_pauli_string_type(use_sweep: bool) -> None: """Test input circuit is not mapping to a paulistring""" qubits_1 = cirq.LineQubit.range(5) qubits_2 = [ @@ -717,10 +729,12 @@ def test_invalid_input_pauli_string_type() -> None: 1000, 1000, np.random.default_rng(), + use_sweep, ) -def test_all_pauli_strings_are_pauli_i() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_all_pauli_strings_are_pauli_i(use_sweep: bool) -> None: """Test that all input pauli are pauli I""" qubits_1 = cirq.LineQubit.range(5) qubits_2 = [ @@ -748,11 +762,18 @@ def test_all_pauli_strings_are_pauli_i() -> None: "valid input Pauli strings.", ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + np.random.default_rng(), + use_sweep, ) -def test_zero_pauli_repetitions() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_zero_pauli_repetitions(use_sweep: bool) -> None: """Test that the pauli repetitions are zero.""" qubits = cirq.LineQubit.range(5) @@ -762,11 +783,12 @@ def test_zero_pauli_repetitions() -> None: circuits_to_pauli[circuit] = [cirq.PauliString({q: cirq.X for q in qubits})] with pytest.raises(ValueError, match="Must provide non-zero pauli_repetitions."): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 0, 1000, 1000, np.random.default_rng() + circuits_to_pauli, cirq.Simulator(), 0, 1000, 1000, np.random.default_rng(), use_sweep ) -def test_negative_num_random_bitstrings() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_negative_num_random_bitstrings(use_sweep: bool) -> None: """Test that the number of random bitstrings is smaller than zero.""" qubits = cirq.LineQubit.range(5) @@ -776,11 +798,12 @@ def test_negative_num_random_bitstrings() -> None: circuits_to_pauli[circuit] = [cirq.PauliString({q: cirq.X for q in qubits})] with pytest.raises(ValueError, match="Must provide zero or more num_random_bitstrings."): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, -1, np.random.default_rng() + circuits_to_pauli, cirq.Simulator(), 1000, 1000, -1, np.random.default_rng(), use_sweep ) -def test_zero_readout_repetitions() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_zero_readout_repetitions(use_sweep: bool) -> None: """Test that the readout repetitions is zero.""" qubits = cirq.LineQubit.range(5) @@ -792,11 +815,12 @@ def test_zero_readout_repetitions() -> None: ValueError, match="Must provide non-zero readout_repetitions for readout" + " calibration." ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 0, 1000, np.random.default_rng() + circuits_to_pauli, cirq.Simulator(), 1000, 0, 1000, np.random.default_rng(), use_sweep ) -def test_rng_type_mismatch() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_rng_type_mismatch(use_sweep: bool) -> None: """Test that the rng is not a numpy random generator or a seed.""" qubits = cirq.LineQubit.range(5) @@ -806,11 +830,18 @@ def test_rng_type_mismatch() -> None: circuits_to_pauli[circuit] = [cirq.PauliString({q: cirq.X for q in qubits})] with pytest.raises(ValueError, match="Must provide a numpy random generator or a seed"): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, "test" # type: ignore[arg-type] + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + "test", # type: ignore[arg-type] + use_sweep, ) -def test_pauli_type_mismatch() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_pauli_type_mismatch(use_sweep: bool) -> None: """Test that the input paulis are not a sequence of PauliStrings.""" qubits = cirq.LineQubit.range(5) @@ -824,11 +855,18 @@ def test_pauli_type_mismatch() -> None: " ops.PauliStrings. Got instead.", ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, "test" # type: ignore[arg-type] + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + "test", # type: ignore[arg-type] + use_sweep, ) -def test_group_paulis_are_not_qwc() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_group_paulis_are_not_qwc(use_sweep: bool) -> None: """Test that the group paulis are not qwc.""" qubits = cirq.LineQubit.range(5) @@ -844,11 +882,18 @@ def test_group_paulis_are_not_qwc() -> None: match="The group of Pauli strings are not " "Qubit-Wise Commuting with each other.", ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + np.random.default_rng(), + use_sweep, ) -def test_empty_group_paulis_not_allowed() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_empty_group_paulis_not_allowed(use_sweep: bool) -> None: """Test that the group paulis are empty""" qubits = cirq.LineQubit.range(5) @@ -858,11 +903,18 @@ def test_empty_group_paulis_not_allowed() -> None: circuits_to_pauli[circuit] = [[]] # type: ignore with pytest.raises(ValueError, match="Empty group of Pauli strings is not allowed"): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + np.random.default_rng(), + use_sweep, ) -def test_group_paulis_type_mismatch() -> None: +@pytest.mark.parametrize("use_sweep", [True, False]) +def test_group_paulis_type_mismatch(use_sweep: bool) -> None: """Test that the group paulis type is not correct""" qubits_1 = cirq.LineQubit.range(3) qubits_2 = [ @@ -894,5 +946,11 @@ def test_group_paulis_type_mismatch() -> None: "but found .", ): measure_pauli_strings( - circuits_to_pauli, cirq.Simulator(), 1000, 1000, 1000, np.random.default_rng() + circuits_to_pauli, + cirq.Simulator(), + 1000, + 1000, + 1000, + np.random.default_rng(), + use_sweep, ) diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py index f8dac4ad4bc..c8bb901d346 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py @@ -13,18 +13,18 @@ # limitations under the License. """Tools for running circuits in a shuffled order with readout error benchmarking.""" import time -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Sequence, Tuple, Union import numpy as np import sympy -from cirq import circuits, ops, protocols, work, study +from cirq import circuits, ops, protocols, study, work from cirq.experiments import SingleQubitReadoutCalibrationResult from cirq.study import ResultDict def _validate_input( - input_circuits: list[circuits.Circuit], + input_circuits: Sequence[circuits.Circuit], circuit_repetitions: Union[int, list[int]], rng_or_seed: Union[np.random.Generator, int], num_random_bitstrings: int, @@ -61,18 +61,16 @@ def _validate_input( def _validate_input_with_sweep( - input_circuits: list[circuits.Circuit], - sweep_params: list[study.Sweepable], + input_circuits: Sequence[circuits.Circuit], + sweep_params: Sequence[study.Sweepable], circuit_repetitions: Union[int, list[int]], rng_or_seed: Union[np.random.Generator, int], num_random_bitstrings: int, readout_repetitions: int, ): """Validates the input for the run_sweep_with_readout_benchmarking function.""" - # if len(input_circuits) != len(sweep_params): - # raise ValueError( - # "Number of input parameterized circuits must match the number of sweep parameters." - # ) + if not sweep_params: + raise ValueError("Sweep parameters must not be empty.") return _validate_input( input_circuits, circuit_repetitions, rng_or_seed, num_random_bitstrings, readout_repetitions ) @@ -120,11 +118,11 @@ def _generate_all_readout_calibration_circuits( num_random_bitstrings: int, qubits_to_measure: List[List[ops.Qid]], is_sweep: bool, -) -> Tuple[List[circuits.Circuit], List[List[List[int]]], List[List[study.Sweepable]]]: +) -> Tuple[List[circuits.Circuit], List[np.ndarray], List[study.Sweepable]]: """Generates all readout calibration circuits and random bitstrings.""" - all_readout_calibration_circuits = [] - all_random_bitstrings = [] - all_readout_sweep_params = [] + all_readout_calibration_circuits: list[circuits.Circuit] = [] + all_random_bitstrings: list[np.ndarray] = [] + all_readout_sweep_params: list[study.Sweepable] = [] if num_random_bitstrings <= 0: return all_readout_calibration_circuits, all_random_bitstrings, all_readout_sweep_params @@ -151,8 +149,8 @@ def _generate_all_readout_calibration_circuits( def _determine_qubits_to_measure( - input_circuits: list[circuits.Circuit], - qubits: Optional[Union[List[ops.Qid], List[List[ops.Qid]]]], + input_circuits: Sequence[circuits.Circuit], + qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]], ) -> List[List[ops.Qid]]: """Determine the qubits to measure based on the input circuits and provided qubits.""" # If input qubits is None, extract qubits from input circuits @@ -170,9 +168,6 @@ def _determine_qubits_to_measure( return qubits_to_measure -# def _get_rng(rng_or_seed: Union[np.random.Generator, int]) -> np.random.Generator: - - def _shuffle_circuits( all_circuits: list[circuits.Circuit], all_repetitions: list[int], rng: np.random.Generator ) -> tuple[list[circuits.Circuit], list[int], np.ndarray]: @@ -186,7 +181,7 @@ def _shuffle_circuits( def _analyze_readout_results( - unshuffled_readout_measurements: list[ResultDict], + unshuffled_readout_measurements: Union[Sequence[ResultDict], Sequence[study.Result]], random_bitstrings: np.ndarray, readout_repetitions: int, qubits: list[ops.Qid], @@ -252,7 +247,7 @@ def run_shuffled_with_readout_benchmarking( rng_or_seed: Union[np.random.Generator, int], num_random_bitstrings: int = 100, readout_repetitions: int = 1000, - qubits: Optional[Union[List[ops.Qid], List[List[ops.Qid]]]] = None, + qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]] = None, ) -> tuple[list[ResultDict], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: """Run the circuits in a shuffled order with readout error benchmarking. @@ -333,14 +328,16 @@ def run_shuffled_with_readout_benchmarking( def run_sweep_with_readout_benchmarking( input_circuits: list[circuits.Circuit], - sweep_params: list[study.Sweepable], + sweep_params: Sequence[study.Sweepable], sampler: work.Sampler, circuit_repetitions: Union[int, list[int]], rng_or_seed: Union[np.random.Generator, int], num_random_bitstrings: int = 100, readout_repetitions: int = 1000, - qubits: Optional[Union[List[ops.Qid], List[List[ops.Qid]]]] = None, -) -> tuple[list[list[ResultDict]], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: + qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]] = None, +) -> tuple[ + list[list[study.Result]], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult] +]: """Run the sweep circuits with readout error benchmarking (no shuffling). Args: input_circuits: The circuits to run. @@ -396,7 +393,7 @@ def run_sweep_with_readout_benchmarking( # Run the sweep circuits and measure results = sampler.run_batch( input_circuits + all_readout_calibration_circuits, - sweep_params + all_readout_sweep_params, + list(sweep_params) + all_readout_sweep_params, repetitions=all_repetitions, ) diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py index 77b269019e5..bfe998f66a7 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py @@ -12,12 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import itertools +from typing import Sequence, Union import numpy as np import pytest import sympy import cirq +from cirq.contrib.shuffle_circuits import ( + run_shuffled_with_readout_benchmarking, + run_sweep_with_readout_benchmarking, +) from cirq.experiments import ( random_quantum_circuit_generation as rqcg, SingleQubitReadoutCalibrationResult, @@ -26,7 +31,7 @@ from cirq.study import ResultDict -def _create_test_circuits(qubits: list[cirq.Qid], n_circuits: int) -> list[cirq.Circuit]: +def _create_test_circuits(qubits: Sequence[cirq.Qid], n_circuits: int) -> list[cirq.Circuit]: """Helper function to generate circuits for testing.""" if len(qubits) < 2: raise ValueError( @@ -48,7 +53,9 @@ def _create_test_circuits(qubits: list[cirq.Qid], n_circuits: int) -> list[cirq. return input_circuits -def _create_test_circuits_with_sweep(qubits: list[cirq.Qid], n_circuits: int) -> list[cirq.Circuit]: +def _create_test_circuits_with_sweep( + qubits: Sequence[cirq.Qid], n_circuits: int +) -> tuple[list[cirq.Circuit], list[cirq.ParamResolver]]: """Helper function to generate sweep circuits for testing.""" if len(qubits) < 2: raise ValueError( @@ -60,7 +67,7 @@ def _create_test_circuits_with_sweep(qubits: list[cirq.Qid], n_circuits: int) -> two_qubit_gates = [cirq.ISWAP, cirq.CNOT] input_circuits = [] - sweep_params = [] + sweep_params: list[cirq.ParamResolver] = [] qubit_pairs = list(itertools.combinations(qubits, 2)) num_pairs = len(qubit_pairs) for i in range(n_circuits): @@ -72,10 +79,9 @@ def _create_test_circuits_with_sweep(qubits: list[cirq.Qid], n_circuits: int) -> for circuit in circuits: circuit += cirq.Circuit(cirq.X(q0) ** theta_symbol, cirq.Y(q1) ** phi_symbol) circuit.append(cirq.measure(*qubits, key="m")) - sweep_params.append({'theta': 0, 'phi': 1}) + sweep_params.append(cirq.ParamResolver({'theta': 0, 'phi': 1})) input_circuits.extend(circuits) - print(len(input_circuits)) - print(len(sweep_params)) + return input_circuits, sweep_params @@ -91,41 +97,35 @@ def test_circuits_with_readout_benchmarking_errors_no_noise(mode: str): readout_repetitions = 1000 num_random_bitstrings = 100 - readout_calibration_results = {} - if mode == "shuffled": input_circuits = _create_test_circuits(qubits, 3) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - ) + sweep_measurements, readout_calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, ) - for measurement in measurements: + for measurement in sweep_measurements: assert isinstance(measurement, ResultDict) elif mode == "sweep": input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 3) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( - input_circuits, - sweep_params, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - ) + measurements, readout_calibration_results = run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, ) - for measurement in measurements: - for single_sweep_measurement in measurement: + for measurement_group in measurements: # Treat as a list of lists + for single_sweep_measurement in measurement_group: assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): @@ -149,20 +149,16 @@ def test_circuits_with_readout_benchmarking_errors_with_noise(mode: str): readout_repetitions = 1000 num_random_bitstrings = 100 - readout_calibration_results = {} - if mode == "shuffled": input_circuits = _create_test_circuits(qubits, 6) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - ) + measurements, readout_calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, ) for measurement in measurements: @@ -170,20 +166,18 @@ def test_circuits_with_readout_benchmarking_errors_with_noise(mode: str): elif mode == "sweep": input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 6) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( - input_circuits, - sweep_params, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - ) + sweep_measurements, readout_calibration_results = run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, ) - for measurement in measurements: - for single_sweep_measurement in measurement: + for measurement_group in sweep_measurements: # Treat as a list of lists + for single_sweep_measurement in measurement_group: assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): @@ -211,41 +205,35 @@ def test_circuits_with_readout_benchmarking_errors_with_noise_and_input_qubits(m readout_repetitions = 1000 num_random_bitstrings = 100 - readout_calibration_results = {} - if mode == "shuffled": input_circuits = _create_test_circuits(qubits, 6) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + measurements, readout_calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) for measurement in measurements: assert isinstance(measurement, ResultDict) elif mode == "sweep": input_circuits, sweep_params = _create_test_circuits_with_sweep(qubits, 6) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( - input_circuits, - sweep_params, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=num_random_bitstrings, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + sweep_measurements, readout_calibration_results = run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=num_random_bitstrings, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) - for measurement in measurements: - for single_sweep_measurement in measurement: + for measurement_group in sweep_measurements: # Treat as a list of lists + for single_sweep_measurement in measurement_group: assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): @@ -276,16 +264,14 @@ def test_circuits_with_readout_benchmarking_errors_with_noise_and_lists_input_qu if mode == "shuffled": input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + measurements, readout_calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=100, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) for measurement in measurements: @@ -297,21 +283,19 @@ def test_circuits_with_readout_benchmarking_errors_with_noise_and_lists_input_qu input_circuits += additional_circuits sweep_params += additional_sweep_params - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( - input_circuits, - sweep_params, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=100, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + sweep_measurements, readout_calibration_results = run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=100, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) - for measurement in measurements: - for single_sweep_measurement in measurement: + for measurement_group in sweep_measurements: # Treat as a list of lists + for single_sweep_measurement in measurement_group: assert isinstance(single_sweep_measurement, ResultDict) for qlist, readout_calibration_result in readout_calibration_results.items(): @@ -338,21 +322,18 @@ def test_can_handle_zero_random_bitstring(mode: str): circuit_repetitions = 1 rng = np.random.default_rng() readout_repetitions = 1000 - readout_calibration_results = {} if mode == "shuffled": input_circuits = _create_test_circuits(qubits_1, 6) + _create_test_circuits(qubits_2, 4) - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( - input_circuits, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=0, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + measurements, readout_calibration_results = run_shuffled_with_readout_benchmarking( + input_circuits, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=0, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) for measurement in measurements: @@ -364,21 +345,19 @@ def test_can_handle_zero_random_bitstring(mode: str): input_circuits += additional_circuits sweep_params += additional_sweep_params - measurements, readout_calibration_results = ( - cirq.contrib.shuffle_circuits.run_sweep_with_readout_benchmarking( - input_circuits, - sweep_params, - sampler, - circuit_repetitions, - rng, - num_random_bitstrings=0, - readout_repetitions=readout_repetitions, - qubits=readout_qubits, - ) + sweep_measurements, readout_calibration_results = run_sweep_with_readout_benchmarking( + input_circuits, + sweep_params, + sampler, + circuit_repetitions, + rng, + num_random_bitstrings=0, + readout_repetitions=readout_repetitions, + qubits=readout_qubits, ) - for measurement in measurements: - for single_sweep_measurement in measurement: + for sweep_measurement in sweep_measurements: + for single_sweep_measurement in sweep_measurement: assert isinstance(single_sweep_measurement, ResultDict) # Check that the readout_calibration_results is empty @@ -388,7 +367,7 @@ def test_can_handle_zero_random_bitstring(mode: str): def test_empty_input_circuits(): """Test that the input circuits are empty.""" with pytest.raises(ValueError, match="Input circuits must not be empty."): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [], cirq.ZerosSampler(), circuit_repetitions=10, @@ -402,7 +381,7 @@ def test_non_circuit_input(): """Test that the input circuits are not of type cirq.Circuit.""" q = cirq.LineQubit(0) with pytest.raises(ValueError, match="Input circuits must be of type cirq.Circuit."): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [q], cirq.ZerosSampler(), circuit_repetitions=10, @@ -417,7 +396,7 @@ def test_no_measurements(): q = cirq.LineQubit(0) circuit = cirq.Circuit(cirq.H(q)) with pytest.raises(ValueError, match="Input circuits must have measurements."): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=10, @@ -432,7 +411,7 @@ def test_zero_circuit_repetitions(): q = cirq.LineQubit(0) circuit = cirq.Circuit(cirq.H(q), cirq.measure(q)) with pytest.raises(ValueError, match="Must provide non-zero circuit_repetitions."): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=0, @@ -450,7 +429,7 @@ def test_mismatch_circuit_repetitions(): ValueError, match="Number of circuit_repetitions must match the number of" + " input circuits.", ): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=[10, 20], @@ -465,7 +444,7 @@ def test_zero_num_random_bitstrings(): q = cirq.LineQubit(0) circuit = cirq.Circuit(cirq.H(q), cirq.measure(q)) with pytest.raises(ValueError, match="Must provide zero or more num_random_bitstrings."): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=10, @@ -482,7 +461,7 @@ def test_zero_readout_repetitions(): with pytest.raises( ValueError, match="Must provide non-zero readout_repetitions for readout" + " calibration." ): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=10, @@ -497,7 +476,7 @@ def test_rng_type_mismatch(): q = cirq.LineQubit(0) circuit = cirq.Circuit(cirq.H(q), cirq.measure(q)) with pytest.raises(ValueError, match="Must provide a numpy random generator or a seed"): - cirq.contrib.shuffle_circuits.run_shuffled_with_readout_benchmarking( + run_shuffled_with_readout_benchmarking( [circuit], cirq.ZerosSampler(), circuit_repetitions=10, @@ -505,3 +484,19 @@ def test_rng_type_mismatch(): num_random_bitstrings=5, readout_repetitions=100, ) + + +def test_empty_sweep_params(): + """Test that the sweep params are empty.""" + q = cirq.LineQubit(5) + circuit = cirq.Circuit(cirq.H(q)) + with pytest.raises(ValueError, match="Sweep parameters must not be empty."): + run_sweep_with_readout_benchmarking( + [circuit], + [], + cirq.ZerosSampler(), + circuit_repetitions=10, + rng_or_seed=np.random.default_rng(456), + num_random_bitstrings=5, + readout_repetitions=100, + ) From f4a8f6bab65fec83f9b58e4db5cf954b3e537cf1 Mon Sep 17 00:00:00 2001 From: ddddddanni Date: Wed, 21 May 2025 22:13:50 -0700 Subject: [PATCH 4/5] Remove unused method --- ...ing_measurement_with_readout_mitigation.py | 20 +++++++++--------- ...easurement_with_readout_mitigation_test.py | 21 ------------------- ...ffle_circuits_with_readout_benchmarking.py | 3 +-- ...circuits_with_readout_benchmarking_test.py | 2 +- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py index 4756916f678..f434df4e108 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py @@ -18,7 +18,7 @@ import itertools import time -from typing import cast, Sequence, TYPE_CHECKING +from typing import cast, Sequence, Union, TYPE_CHECKING import attrs import numpy as np @@ -204,10 +204,10 @@ def _validate_input( def _normalize_input_paulis( circuits_to_pauli: Union[ - Dict[circuits.FrozenCircuit, list[ops.PauliString]], - Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], - ] -) -> Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]]: + dict[circuits.FrozenCircuit, list[ops.PauliString]], + dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], + ], +) -> dict[circuits.FrozenCircuit, list[list[ops.PauliString]]]: first_value = next(iter(circuits_to_pauli.values())) if ( first_value @@ -240,7 +240,7 @@ def _pauli_strings_to_basis_change_ops( def _pauli_strings_to_basis_change_with_sweep( pauli_strings: list[ops.PauliString], qid_list: list[ops.Qid] -) -> list[Dict[str, float]]: +) -> list[dict[str, float]]: """Decide single-qubit rotation sweep parameters for basis change.""" params_dict = {} @@ -261,7 +261,7 @@ def _pauli_strings_to_basis_change_with_sweep( def _generate_basis_change_circuits( - normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]] + normalized_circuits_to_pauli: dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], ) -> list[circuits.Circuit]: """Generates basis change circuits for each group of Pauli strings.""" pauli_measurement_circuits = list[circuits.Circuit]() @@ -283,8 +283,8 @@ def _generate_basis_change_circuits( def _generate_basis_change_circuits_with_sweep( - normalized_circuits_to_pauli: Dict[circuits.FrozenCircuit, list[list[ops.PauliString]]] -) -> Tuple[list[circuits.Circuit], list[study.Sweepable]]: + normalized_circuits_to_pauli: dict[circuits.FrozenCircuit, list[list[ops.PauliString]]], +) -> tuple[list[circuits.Circuit], list[study.Sweepable]]: """Generates basis change circuits for each group of Pauli strings with sweep.""" parameterized_circuits = list[circuits.Circuit]() sweep_params = list[study.Sweepable]() @@ -454,7 +454,7 @@ def measure_pauli_strings( num_random_bitstrings: int, rng_or_seed: Union[np.random.Generator, int], use_sweep: bool = False, -) -> List[CircuitToPauliStringsMeasurementResult]: +) -> list[CircuitToPauliStringsMeasurementResult]: """Measures expectation values of Pauli strings on given circuits with/without readout error mitigation. diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py index 4bd23e8b069..967a9afb4b5 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py @@ -20,7 +20,6 @@ import numpy as np import pytest -import sympy import cirq from cirq.contrib.paulistring import measure_pauli_strings @@ -36,26 +35,6 @@ def _create_ghz(number_of_qubits: int, qubits: Sequence[cirq.Qid]) -> cirq.Circu return ghz_circuit -def _create_sweep_circuit( - number_of_qubits: int, qubits: Sequence[cirq.Qid], number_of_sweeps: int -) -> tuple[cirq.Circuit, cirq.Sweepable]: - sweep_params = [] - - theta_symbols = sympy.symbols(f'theta:{number_of_qubits}') - single_qubit_gates = [cirq.X, cirq.Y, cirq.Z, cirq.H, cirq.I] - - circuit = _create_ghz(number_of_qubits, qubits) - for _ in range(number_of_sweeps): - for i in range(number_of_qubits): - gate = random.choice(single_qubit_gates) - circuit += gate(qubits[i]) ** theta_symbols[i] - sweep_params += [ - {theta_symbols[i]: random.uniform(0, 2 * np.pi) for i in range(number_of_qubits)} - ] - - return circuit, sweep_params - - def _generate_random_pauli_string( qubits: Sequence[cirq.Qid], enable_coeff: bool = False, allow_pauli_i: bool = True ): diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py index 86933a6f04f..91ba6f278fe 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py @@ -17,7 +17,7 @@ from __future__ import annotations import time -from typing import Dict, List, Optional, Sequence, Tuple, Union +from typing import Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union import numpy as np import sympy @@ -255,7 +255,6 @@ def run_shuffled_with_readout_benchmarking( readout_repetitions: int = 1000, qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]] = None, ) -> tuple[list[ResultDict], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: - """Run the circuits in a shuffled order with readout error benchmarking. Args: diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py index 2c9297c0552..95184220547 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking_test.py @@ -15,7 +15,7 @@ from __future__ import annotations import itertools -from typing import Sequence, Union +from typing import Sequence import numpy as np import pytest From 1c46aae255c2c39ebd8ab887588697097dc415c0 Mon Sep 17 00:00:00 2001 From: ddddddanni Date: Thu, 22 May 2025 17:24:37 -0700 Subject: [PATCH 5/5] Update pauli_repetitions, readout_repetitions, and num_random_bitstrings to be all 1000 to avoid test flakiness --- ...uli_string_measurement_with_readout_mitigation.py | 10 +++++----- ...tring_measurement_with_readout_mitigation_test.py | 12 ++++++------ .../shuffle_circuits_with_readout_benchmarking.py | 8 ++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py index f434df4e108..abb34e8b221 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation.py @@ -18,7 +18,7 @@ import itertools import time -from typing import cast, Sequence, Union, TYPE_CHECKING +from typing import cast, Sequence, TYPE_CHECKING, Union import attrs import numpy as np @@ -240,7 +240,7 @@ def _pauli_strings_to_basis_change_ops( def _pauli_strings_to_basis_change_with_sweep( pauli_strings: list[ops.PauliString], qid_list: list[ops.Qid] -) -> list[dict[str, float]]: +) -> dict[str, float]: """Decide single-qubit rotation sweep parameters for basis change.""" params_dict = {} @@ -357,7 +357,7 @@ def _build_many_one_qubits_empty_confusion_matrix(qubits_length: int) -> list[np def _process_pauli_measurement_results( qubits: list[ops.Qid], pauli_string_groups: list[list[ops.PauliString]], - circuit_results: list[ResultDict], + circuit_results: Sequence[ResultDict], calibration_results: dict[tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult], pauli_repetitions: int, timestamp: float, @@ -516,7 +516,7 @@ def measure_pauli_strings( # Build the basis-change circuits for each Pauli string group pauli_measurement_circuits = list[circuits.Circuit]() sweep_params = list[study.Sweepable]() - circuits_results: Union[list[ResultDict], list[list[ResultDict]]] = [] + circuits_results: Union[Sequence[ResultDict], Sequence[Sequence[study.Result]]] = [] # calibration_results = list[Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]() if use_sweep: @@ -559,7 +559,7 @@ def measure_pauli_strings( disable_readout_mitigation = False if num_random_bitstrings != 0 else True - circuits_results_for_group = [] + circuits_results_for_group: Union[ResultDict, Sequence[study.Result]] = [] if use_sweep: circuits_results_for_group = circuits_results[i] else: diff --git a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py index 967a9afb4b5..8b8b1337be9 100644 --- a/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py +++ b/cirq-core/cirq/contrib/paulistring/pauli_string_measurement_with_readout_mitigation_test.py @@ -214,14 +214,14 @@ def test_group_pauli_string_measurement_errors_no_noise_with_coefficient(use_swe circuits_to_pauli: dict[cirq.FrozenCircuit, list[list[cirq.PauliString]]] = {} circuits_to_pauli[circuit] = [ _generate_qwc_paulis( - _generate_random_pauli_string(qubits, enable_coeff=True, allow_pauli_i=False), 100, True + _generate_random_pauli_string(qubits, enable_coeff=True, allow_pauli_i=False), 10, True ) for _ in range(3) ] circuits_to_pauli[circuit].append([cirq.PauliString({q: cirq.X for q in qubits})]) circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 500, 500, 500, 500, use_sweep + circuits_to_pauli, sampler, 1000, 1000, 1000, 500, use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -322,7 +322,7 @@ def test_group_pauli_string_measurement_errors_with_noise(use_sweep: bool) -> No ] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 800, 1000, 800, np.random.default_rng(), use_sweep + circuits_to_pauli, sampler, 1000, 1000, 1000, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -463,7 +463,7 @@ def test_allow_group_pauli_measurement_without_readout_mitigation(use_sweep: boo ] circuits_with_pauli_expectations = measure_pauli_strings( - circuits_to_pauli, sampler, 100, 100, 0, np.random.default_rng(), use_sweep + circuits_to_pauli, sampler, 1000, 1000, 0, np.random.default_rng(), use_sweep ) for circuit_with_pauli_expectations in circuits_with_pauli_expectations: @@ -836,12 +836,12 @@ def test_pauli_type_mismatch(use_sweep: bool) -> None: " ops.PauliStrings. Got instead.", ): measure_pauli_strings( - circuits_to_pauli, + circuits_to_pauli, # type: ignore[arg-type] cirq.Simulator(), 1000, 1000, 1000, - "test", # type: ignore[arg-type] + 1, use_sweep, ) diff --git a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py index 91ba6f278fe..0ad0027be8b 100644 --- a/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py +++ b/cirq-core/cirq/contrib/shuffle_circuits/shuffle_circuits_with_readout_benchmarking.py @@ -254,7 +254,7 @@ def run_shuffled_with_readout_benchmarking( num_random_bitstrings: int = 100, readout_repetitions: int = 1000, qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]] = None, -) -> tuple[list[ResultDict], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: +) -> tuple[Sequence[ResultDict], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult]]: """Run the circuits in a shuffled order with readout error benchmarking. Args: @@ -342,7 +342,7 @@ def run_sweep_with_readout_benchmarking( readout_repetitions: int = 1000, qubits: Optional[Union[Sequence[ops.Qid], Sequence[Sequence[ops.Qid]]]] = None, ) -> tuple[ - list[list[study.Result]], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult] + Sequence[Sequence[study.Result]], Dict[Tuple[ops.Qid, ...], SingleQubitReadoutCalibrationResult] ]: """Run the sweep circuits with readout error benchmarking (no shuffling). Args: @@ -405,7 +405,7 @@ def run_sweep_with_readout_benchmarking( timestamp = time.time() - input_circuits_measiurements = results[: len(input_circuits)] + input_circuits_measurement = results[: len(input_circuits)] readout_measurements = results[len(input_circuits) :] # Analyze results @@ -420,4 +420,4 @@ def run_sweep_with_readout_benchmarking( ) readout_calibration_results[tuple(qubit_group)] = calibration_result - return input_circuits_measiurements, readout_calibration_results + return input_circuits_measurement, readout_calibration_results