Source code for landscape_tools.barren_plateaus

"""Module meant to study barren plateaus"""

import os
from collections import defaultdict
from itertools import repeat
from typing import (
    Any,
    Callable,
    Literal,
    Optional,
    Protocol,
    Sequence,
    TypedDict,
    TypeVar,
)

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.lines import Line2D
from numpy.typing import ArrayLike
from qiskit import QuantumCircuit
from qiskit.quantum_info import Pauli
from tqdm.auto import tqdm

T = TypeVar("T")

from dataclasses import dataclass

PaddingType = Literal["identity", "I", "linear_half", "log"]


[docs] @dataclass class SamplingConfig: bootstrap_B: int = 500 """Number of bootstrap resamples used to estimate the uncertainty of the variance estimate.""" rel_err_target: float = 0.05 """Target relative error for the variance estimate.""" N_min: int = 900 """Minimum number of samples collected before checking convergence.""" N_batch: int = 400 """Number of new samples added at each adaptive sampling step.""" N_max: int = 20000 """Maximum number of samples allowed.""" safety_factor: float = 1.2 """Multiplicative safety factor applied to the estimated required sample count.""" max_batch: int = 10000 """Maximum batch size used during adaptive sampling."""
[docs] @dataclass class ExecutionConfig: n_jobs: int = -1 """Number of parallel jobs used for sampling. A value of -1 usually means that all available cores are used.""" verbose: bool = True """Whether to print progress information during the analysis."""
[docs] @dataclass class ExperimentConfig: analysis_type: str """Type of scaling analysis to run. This selects how qubit counts, layer counts, observables, and padding strategies are combined.""" N_qubits: Sequence[int] """List of system sizes to test.""" N_layers: Sequence[int] """List of circuit depths to test.""" ansatz_name: str """Name of the ansatz used to generate circuits.""" observables_list: Sequence[Pauli | str] | None = None """Observables used in the cost function. Entries may be Qiskit Pauli objects or Pauli strings.""" initial_Pauli_string: Pauli | str | None = None """Initial Pauli string used when observables are generated by padding.""" padding_types: Sequence[PaddingType] | None = None """Padding strategies used to grow the initial Pauli string with the number of qubits."""
[docs] def extend_with_last( lst: Sequence[T], target_len: int, ) -> list[T]: """Extend a sequence to a target length by repeating its last element. Args: lst: Input sequence to extend. Must not be empty. target_len: Desired minimum length of the output list. If target_len is smaller than len(lst), the original list is returned unchanged. Returns: A list extended to ``target_len`` by repeating the last element. """ if len(lst) == 0: raise ValueError("List must not be empty") return list(lst) + list(repeat(lst[-1], target_len - len(lst)))
[docs] def pad_pauli_strings_growth( pauli: Pauli | str, target_n: int, growth: PaddingType = "linear_half", log_base: float = 2, ) -> Pauli: """Pad a Pauli string on the left to reach a target number of qubits. Args: pauli: Input Pauli string. (for instance ``Pauli("IIYY")``) target_n: Desired total number of qubits. growth: Padding strategy. Available options are: - "identity" or "I": Pad with identity operators only. - "linear_half": Pad with X, Y or Z operators such that the total number of X/Y/Z operators is approximately ``target_n / 2``. - "log": Pad with X, Y or Z operators such that the total number of X/Y/Z operators is approximately ``log(target_n)``. log_base: Base of the logarithm used when ``growth="log"``. Returns: Padded Pauli string of size ``target_n``. """ def round_value(x: float): return int(np.ceil(x)) def target_number_of_active_paulis(n: int): if growth == "linear_half": return round_value(n / 2) elif growth == "log": return 1 if n <= 1 else round_value(np.log(n) / np.log(log_base)) else: raise ValueError("growth must be 'identity', 'I', 'linear_half', or 'log'") label = pauli.to_label() if isinstance(pauli, Pauli) else pauli n_current = len(label) if n_current > target_n: raise ValueError(f"Pauli {label} has length {n_current} > target_n={target_n}") n_padding = target_n - n_current if growth in ["identity", "I"]: padding = "I" * n_padding padded_label = padding + label return Pauli(padded_label) if "Z" in label: pad_char = "Z" elif "Y" in label: pad_char = "Y" elif "X" in label: pad_char = "X" else: pad_char = "Y" current_count = label.count(pad_char) target_count = target_number_of_active_paulis(target_n) n_active_to_add = max(0, target_count - current_count) n_active_to_add = min(n_active_to_add, n_padding) n_identity_to_add = n_padding - n_active_to_add padding = "I" * n_identity_to_add + pad_char * n_active_to_add padded_label = padding + label return Pauli(padded_label)
[docs] def bootstrap_var_diagnostic_1d( var: float, L_samples: np.ndarray, B: int = 500, rng: np.random.Generator | None = None, rel_err_warn: float = 0.05, rel_err_fail: float = 0.10, ) -> dict[str, Any]: """Perform a bootstrap diagnostic for a single variance estimate. Args: var: Reference estimate of the variance of the loss. L_samples: Array of loss values evaluated for different parameter samples. Can have shape ``(N_samples,)`` or ``(N_samples, 1)``. B: Number of bootstrap resamples. Default is 500. rng: Random number generator for reproducibility. If ``None``, a default generator is used. rel_err_warn: Relative error threshold for issuing a warning. rel_err_fail: Relative error threshold for declaring failure. Returns: Dictionary containing diagnostic quantities, including: - bootstrap estimates of the standard deviation - relative error metrics - diagnostic flags based on thresholds """ if rng is None: rng = np.random.default_rng() assert rng is not None L_samples = np.asarray(L_samples, dtype=float) if L_samples.ndim == 2: if L_samples.shape[1] != 1: raise ValueError( f"L_samples has shape {L_samples.shape}, but this diagnostic " "expects a single Pauli observable, i.e. shape (N_samples,) " "or (N_samples, 1)." ) L_samples = L_samples[:, 0] elif L_samples.ndim != 1: raise ValueError( f"L_samples must be 1D or shape (N_samples, 1), got shape {L_samples.shape}" ) N_samples = len(L_samples) score = float(var) boot_scores = np.empty(B) for b in range(B): idx = rng.integers(0, N_samples, size=N_samples) Lb = L_samples[idx] boot_scores[b] = np.var(Lb, ddof=1) score_se = np.std(boot_scores, ddof=1) score_rel_err = score_se / max(abs(score), 1e-30) if score_rel_err > rel_err_fail: advice = "/!\\ Increase N_samples." elif score_rel_err > rel_err_warn: advice = "/!\\ OK for trends; increase N_samples for reliability." elif score_rel_err < 0.01: advice = "Very stable estimate; N_samples may be reducible if runtime matters." else: advice = "N_samples in right zone." return { "var": score, "var_se": score_se, "rel_err": score_rel_err, "boot_scores": boot_scores, "advice": advice, "passed": score_rel_err <= rel_err_warn, "N_samples": N_samples, }
[docs] def adaptive_sampling_var( sample_function: Callable[[], Any], N_min: int = 50, N_batch: int = 50, N_max: int = 2000, B: int = 500, rel_err_target: float = 0.05, observable_index: int = 0, rng: np.random.Generator | None = None, verbose: bool = True, safety_factor: float = 1.2, max_batch: int | None = None, abs_err_target: float | None = None, n_jobs: int = -1, ) -> tuple[np.ndarray, np.ndarray, dict[str, Any] | None]: r"""Perform adaptive sampling to estimate the loss variance with bootstrap-based error control. This method dynamically adjusts the number of samples using the expected `1 / \sqrt(N)` scaling of the bootstrap relative error. Args: sample_function: Callable that returns a single sample N_min: Initial number of samples. Default is 50. N_batch: Minimum number of additional samples added per iteration. In this adaptive scheme, the effective batch size may vary. N_max: Maximum allowed number of samples. Default is 2000. B: Number of bootstrap resamples. Default is 500. rel_err_target: Target relative bootstrap error. Default is 0.05. observable_index: Index of the observable used for the stopping criterion. rng: Random number generator for reproducibility. If ``None``, a default generator is used. verbose: Whether to print progress information. Default is True. safety_factor: Multiplicative safety factor applied to the estimated required sample size. Typical values range from 1.1 to 1.5. max_batch: Maximum number of samples added in a single iteration. If ``None``, no explicit cap is applied apart from ``N_max``. abs_err_target: Optional absolute bootstrap error target. If provided, the algorithm stops when either: - ``relative_error <= rel_err_target``, or - ``absolute_error <= abs_err_target``. This is particularly useful when the standard deviation is very small and relative error becomes overly strict. n_jobs: Number of parallel jobs used when drawing new samples. Passed to joblib.Parallel. A value of -1 usually uses all available cores. Returns: A the three following elements: an array containing all collected samples; the variance computed over all samples; and a dictionary containing bootstrap diagnostic information for the selected observable. """ if rng is None: rng = np.random.default_rng() L_samples = [] diagnostic = None var: Optional[np.ndarray] = None while True: # -------------------- Number of samples to add -------------------- n_current = len(L_samples) if n_current == 0: n_to_add = N_min else: assert diagnostic is not None rel_err = diagnostic["rel_err"] # Estimate required total N using rel_err ~ 1 / sqrt(N) if rel_err > 0 and np.isfinite(rel_err): N_required = int( np.ceil(n_current * (rel_err / rel_err_target) ** 2 * safety_factor) ) else: N_required = n_current + N_batch # At least add N_batch samples n_to_add = max(N_required - n_current, N_batch) # Optional cap on one adaptive jump if max_batch is not None: n_to_add = min(n_to_add, max_batch) # Do not exceed N_max if n_current + n_to_add > N_max: n_to_add = N_max - n_current if n_to_add <= 0: break if verbose and n_current > 0: print( f"[Adaptive sampling] " f"Adding {n_to_add} samples " f"(N: {n_current} -> {n_current + n_to_add})" ) # -------------------- Add new samples -------------------- from joblib import Parallel, delayed results = Parallel(n_jobs=n_jobs)( delayed(sample_function)() for _ in tqdm(range(n_to_add), desc="Adding samples", leave=False) ) L_samples.extend(results) L_samples_array = np.asarray(L_samples) # -------------------- Compute current var -------------------- var = np.var(L_samples_array, axis=0, ddof=1) assert var is not None # -------------------- Pick observable for diagnostic -------------------- var_obs: float if L_samples_array.ndim == 1: L_samples_obs = L_samples_array var_obs = float(var) else: L_samples_obs = L_samples_array[:, observable_index] var_obs = var[observable_index] # -------------------- Bootstrap diagnostic -------------------- diagnostic = bootstrap_var_diagnostic_1d( var_obs, L_samples_obs, B=B, rng=rng, rel_err_warn=rel_err_target, rel_err_fail=2 * rel_err_target, ) rel_err = diagnostic["rel_err"] # Absolute bootstrap error, if available or inferable abs_err = diagnostic.get("abs_err", None) if abs_err is None: abs_err = rel_err * abs(var_obs) if verbose: msg = ( f"[Adaptive sampling] " f"N = {len(L_samples)} | " f"relative error = {100 * rel_err:.2f}%" ) if abs_err_target is not None: msg += f" | absolute error = {abs_err:.3e}" print(msg) # -------------------- Stopping criteria -------------------- rel_criterion_reached = rel_err <= rel_err_target abs_criterion_reached = abs_err_target is not None and abs_err <= abs_err_target if rel_criterion_reached or abs_criterion_reached: if verbose: if rel_criterion_reached: reason = "relative error target reached" else: reason = "absolute error target reached" print( f"[Adaptive sampling] Target reached with " f"N_samples = {len(L_samples)} " f"({reason})" ) break if len(L_samples) >= N_max: if verbose: print( f"[Adaptive sampling] Maximum reached: " f"N_samples = {len(L_samples)} | " f"relative error = {100 * rel_err:.2f}%" ) break assert var is not None return np.asarray(L_samples), var, diagnostic
[docs] class Result(TypedDict): nq: int var: float | np.ndarray obs: Optional[Pauli]
[docs] class LayerResult(Result): lay: int
[docs] def plot_layerwise_qubits( results: dict[int, list[Result]], N_layers: Sequence[int], make_param_text: str, ) -> None: """Plot the variance of loss values as a function of the number of qubits for different circuit depths. Args: results: Mapping from circuit depth to computed statistics. Each entry must contain dictionaries with at least: - "nq": number of qubits - "var": variance value - "obs": observable identifier N_layers: Sequence of circuit depths to display. make_param_text: Parameter summary text displayed on the figure. """ param_text = make_param_text layer_colors = plt.cm.tab10( # pyright: ignore[reportAttributeAccessIssue] np.linspace(0, 1, max(len(N_layers), 1)) ) layer_to_color = { lay: layer_colors[i % len(layer_colors)] for i, lay in enumerate(N_layers) } plt.figure(figsize=(12, 7)) for lay in N_layers: if lay not in results: continue pts = sorted(results[lay], key=lambda d: d["nq"]) xs = [d["nq"] for d in pts] ys = [d["var"] for d in pts] plt.semilogy( xs, ys, marker="o", color=layer_to_color[lay], linewidth=2, markersize=5, label=None, ) plt.xlabel(r"Number of qubits $n_q$") plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$") plt.title("Loss landscape concentration with increasing system size") plt.grid(True, alpha=0.4) handles_L = [ Line2D( [0], [0], color=layer_to_color[lay], lw=2, label=rf"$L={lay}$", ) for lay in N_layers ] plt.legend( handles=handles_L, title="Depth", loc="best", fontsize=11, handlelength=4, ) plt.text( 0.02, 0.02, param_text, transform=plt.gca().transAxes, ha="left", va="bottom", fontsize=11, bbox=dict( boxstyle="round", facecolor="white", edgecolor="gray", alpha=0.55, ), ) plt.tight_layout() plt.savefig("figures/layerwise_qubits.pdf") plt.show()
[docs] def plot_layerwise_qubits_padding( results: dict[tuple[int, Optional[PaddingType]], list[Result]], N_layers: Sequence[int], padding_types: Optional[Sequence[PaddingType]], padding_latex: dict[str, str], make_param_text: str, ) -> None: """Plot the variance of loss values as a function of the number of qubits for different circuit depths and padding strategies. Args: results: Mapping indexed by ``(depth, padding_type)`` tuples. Each entry must contain dictionaries with at least: - "nq": number of qubits - "var": variance value - "obs": observable identifier N_layers: Sequence of circuit depths to display. padding_types: Sequence of padding strategies to include in the plot. padding_latex: Mapping from padding strategy identifiers to LaTeX labels used in the legend. make_param_text: Parameter summary text displayed on the figure. """ param_text = make_param_text markers = ["o", "s", "^", "D", "v", "P", "X"] linestyles = ["-", "--", "-.", ":"] layer_colors = plt.cm.tab10( # pyright: ignore[reportAttributeAccessIssue] np.linspace(0, 1, max(len(N_layers), 1)) ) layer_to_color = { lay: layer_colors[i % len(layer_colors)] for i, lay in enumerate(N_layers) } padding_to_marker = { pad: markers[i % len(markers)] for i, pad in enumerate(padding_types or [None]) } padding_to_linestyle = { pad: linestyles[i % len(linestyles)] for i, pad in enumerate(padding_types or [None]) } plt.figure(figsize=(12, 7)) for lay in N_layers: for pad in padding_types or [None]: key = (lay, pad) if key not in results: continue pts = sorted(results[key], key=lambda d: d["nq"]) xs = [d["nq"] for d in pts] ys = [d["var"] for d in pts] plt.semilogy( xs, ys, linestyle=padding_to_linestyle[pad], marker=padding_to_marker[pad], color=layer_to_color[lay], linewidth=2, markersize=5, label=None, ) plt.xlabel(r"Number of qubits $n_q$") plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$") plt.title( rf"Loss landscape concentration with increasing system size for different padding types" ) plt.grid(True, alpha=0.4) handles_L = [ Line2D([0], [0], color=layer_to_color[lay], lw=2, label=rf"$L={lay}$") for lay in N_layers ] handles_padding = ( [] if padding_types is None else [ Line2D( [0], [0], color="black", linestyle=padding_to_linestyle[pad], marker=padding_to_marker[pad], lw=2, label=padding_latex.get(pad, str(pad)), ) for pad in padding_types ] ) plt.legend( handles=handles_L + handles_padding, title="Depth & Padding type", loc="best", fontsize=11, handlelength=4, ) plt.text( 0.02, 0.02, param_text, transform=plt.gca().transAxes, ha="left", va="bottom", fontsize=11, bbox=dict(boxstyle="round", facecolor="white", edgecolor="gray", alpha=0.55), ) plt.tight_layout() os.makedirs("figures", exist_ok=True) plt.savefig("figures/layerwise_qubits_padding.pdf") plt.show()
[docs] def plot_joint_scaling_padding( tracked: ArrayLike, A_ext: Sequence[int], B_ext: Sequence[int], N_qubits: Sequence[int], N_layers: Sequence[int], padding_type: str | None, padding_latex: dict[str, str], Ansatz: str, rel_err_target: float, ) -> None: """ Plot the variance of loss values as a joint function of system size and circuit depth for a given padding strategy. Args: tracked: Sequence of tracked variance values. A_ext: Extended sequence associated with the primary scan axis. B_ext: Extended sequence associated with the secondary grouping axis. N_qubits: Sequence of qubit counts considered in the scan. N_layers: Sequence of circuit depths considered in the scan. padding_type: Identifier of the padding strategy used. padding_latex: Mapping from padding strategy identifiers to LaTeX labels. Ansatz: Name of the variational ansatz displayed in the figure. rel_err_target: Target relative error displayed in the figure annotations. """ if len(N_qubits) >= len(N_layers): x_axis = A_ext secondary = B_ext x_label = r"Number of qubits $n_q$" sec_label = "L" else: x_axis = B_ext secondary = A_ext x_label = r"Number of layers $L$" sec_label = "n" tracked = np.array(tracked) padding_label = padding_latex.get( padding_type, rf"Padding: {padding_type}", ) if not padding_label.startswith("Padding"): padding_label = rf"padding: {padding_label}" param_text = ( f"{Ansatz}\n" f"{padding_label}\n" f"$\\epsilon_{{rel}} = {rel_err_target:.0%}$".replace("%", "\\%") ) groups_j = defaultdict(list) for x, s, y in zip(x_axis, secondary, tracked): groups_j[s].append((x, y)) plt.figure(figsize=(12, 7)) all_pts = sorted(zip(x_axis, tracked)) xs_all, ys_all = zip(*all_pts) plt.plot( xs_all, ys_all, "-", color="black", linewidth=1.5, alpha=0.6, ) for s_val, pts in sorted(groups_j.items()): pts = sorted(pts) xs, ys = zip(*pts) plt.plot( xs, ys, "-o", linewidth=2, markersize=5, label=rf"${sec_label} = {s_val}$" ) plt.xlabel(x_label) plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$") plt.title("Loss landscape concentration with increasing system size") plt.grid(True, alpha=0.4) plt.legend() plt.text( 0.02, 0.02, param_text, transform=plt.gca().transAxes, ha="left", va="bottom", fontsize=11, bbox=dict(boxstyle="round", facecolor="white", edgecolor="gray", alpha=0.85), ) plt.yscale("log") plt.tight_layout() plt.savefig("figures/joint_scaling_padding.pdf") plt.show()
[docs] class AnalysisResult(TypedDict): tracked: np.ndarray metadata: list[Result] N_qubits_extended: list[int] N_layers_extended: list[int] padding_type: PaddingType | None
[docs] class CostFunctionContext(TypedDict): n_qubits: int depth: int Ansatz: str circuits: QuantumCircuit observables: Optional[Pauli]
[docs] class CostFunctionBuilder(Protocol): def __call__( self, context: CostFunctionContext, *args: Any, **kwargs: Any ) -> Callable[[list[float]], float]: ...
[docs] class VarianceNormalisation(Protocol): def __call__( self, n_qubits: int, depth: int, Ansatz: str, observable: Pauli | None, context: CostFunctionContext, raw_var: np.ndarray, L_samples: np.ndarray, ) -> float: ...
[docs] def barren_plateaus_analysis( experiment: ExperimentConfig, cost_function_builder: CostFunctionBuilder, generate_params: Callable[[int, int], list[float]], generate_circuits: Callable[[int, int, str], QuantumCircuit], sampling: SamplingConfig | None = None, execution: ExecutionConfig | None = None, variance_normalization: VarianceNormalisation | None = None, **cost_kwargs: Any, ) -> AnalysisResult | dict[int, list[Result]]: """Run a barren plateau scaling analysis for a given experiment configuration. The function builds the requested circuits and cost functions, samples loss values, estimates their variance, and stores the results for the selected analysis type. Optional sampling and execution configurations control the adaptive sampling procedure and parallel execution. Extra keyword arguments are passed to the cost function builder. Args: experiment: Main experiment configuration. cost_function_builder: Function used to build the cost function for each experiment setting. generate_params: Function used to generate random circuit parameters. generate_circuits: Function used to generate circuits for the selected ansatz and settings. sampling: Sampling configuration. If None, default sampling settings are used. execution: Execution configuration. If None, default execution settings are used. variance_normalization: Optional function used to normalize the estimated variance before storing or plotting it. **cost_kwargs: Additional keyword arguments passed to the cost function builder. Returns: Results produced by the selected analysis type. The returned objects contain variance estimates and metadata such as qubit count, layer count and observable. """ # -------------------- Validation -------------------- if sampling is None: sampling = SamplingConfig() if execution is None: execution = ExecutionConfig() valid_analysis_types = { "layerwise_qubits", "layerwise_qubits_padding", "joint_scaling_padding", } if experiment.analysis_type not in valid_analysis_types: raise ValueError( f"Unknown analysis_type='{experiment.analysis_type}'. " f"Expected one of {valid_analysis_types}." ) if experiment.analysis_type == "layerwise_qubits": if experiment.observables_list is not None and len(experiment.N_qubits) != len( experiment.observables_list ): raise ValueError( "For analysis_type='layerwise_qubits', " "N_qubits and observables_list must have the same length." ) if experiment.analysis_type == "layerwise_qubits_padding": if experiment.padding_types is None and experiment.initial_Pauli_string is None: pass elif ( experiment.padding_types is None or experiment.initial_Pauli_string is None ): raise ValueError( "analysis_type='layerwise_qubits_padding' requires " "padding_types and initial_Pauli_string to either " "both be None or both be defined." ) if experiment.analysis_type == "joint_scaling_padding": if experiment.padding_types is None and experiment.initial_Pauli_string is None: pass elif ( experiment.padding_types is None or experiment.initial_Pauli_string is None ): raise ValueError( "analysis_type='joint_scaling_padding' requires " "padding_types and initial_Pauli_string to either " "both be None or both be defined." ) # -------------------- Helpers -------------------- def run_single_point( nq: int, lay: int, obs_list_builder: Optional[Callable[[int], Pauli]] | None, extra_print: Optional[str] = None, ) -> LayerResult: print("--------------------") print(f"Number of qubits: {nq}") print(f"Number of layers: {lay}") if extra_print is not None: print(extra_print) n_qubits = nq depth = lay circuit = generate_circuits(n_qubits, depth, experiment.ansatz_name) observable = obs_list_builder(nq) if obs_list_builder is not None else None if observable is not None: print(f"Pauli string: {observable}") context: CostFunctionContext = { "n_qubits": n_qubits, "depth": depth, "Ansatz": experiment.ansatz_name, "circuits": circuit, "observables": observable, } cost_function_expv = cost_function_builder( context=context, **cost_kwargs, ) def sample_once(): theta = generate_params(n_qubits, depth) return cost_function_expv(theta) L_samples, raw_var, _ = adaptive_sampling_var( sample_function=sample_once, N_min=sampling.N_min, N_batch=sampling.N_batch, N_max=sampling.N_max, B=sampling.bootstrap_B, rel_err_target=sampling.rel_err_target, observable_index=0, safety_factor=sampling.safety_factor, max_batch=sampling.max_batch, verbose=execution.verbose, n_jobs=execution.n_jobs, ) # -------------------- Optional variance normalization -------------------- variance_scale = None if variance_normalization is not None: variance_scale = float( variance_normalization( n_qubits=n_qubits, depth=depth, Ansatz=experiment.ansatz_name, observable=observable, context=context, raw_var=raw_var, L_samples=L_samples, ) ) if variance_scale <= 0: raise ValueError( f"variance_normalization must return a positive scale, " f"got {variance_scale}." ) var = raw_var if variance_scale is None else raw_var / variance_scale print(f"var = {raw_var}") if variance_scale is not None: print(f"variance scale = {variance_scale}") print(f"normalized var = {var}") return { "nq": nq, "lay": lay, "var": var, "obs": observable, } use_state_vector = cost_kwargs.get("use_state_vector", None) shots = cost_kwargs.get("shots", None) lines = [ f"{experiment.ansatz_name}", ] if use_state_vector is not None: lines.append(f"state_vector = {use_state_vector}") if use_state_vector is False and shots is not None: lines.append(f"shots = {shots:.0e}") lines.append( f"$\\epsilon_{{rel}} = {sampling.rel_err_target:.0%}$".replace("%", "\\%") ) param_text = "\n".join(lines) padding_latex = { "identity": r"$I^{n_q}$", "I": r"$I^{n_q}$", "linear_half": r"$n_q/2$", "log": r"$\log(n_q)$", } # -------------------- Analysis 1: layerwise_qubits -------------------- if experiment.analysis_type == "layerwise_qubits": results = defaultdict(list) for lay in experiment.N_layers: if experiment.observables_list is None: iterable = [(nq, None) for nq in experiment.N_qubits] else: iterable = zip( experiment.N_qubits, experiment.observables_list, ) for nq, Pauli_string in iterable: if Pauli_string is None: obs_builder_1 = None else: def obs_builder_1(target_n: int): if isinstance(Pauli_string, Pauli): label = Pauli_string.to_label() pauli = Pauli_string else: label = Pauli_string pauli = Pauli(Pauli_string) if len(label) != target_n: raise ValueError( f"Pauli string size mismatch: got '{label}' " f"(len={len(label)}), expected {target_n}" ) return pauli point = run_single_point( nq=nq, lay=lay, obs_list_builder=obs_builder_1, ) results[lay].append(point) plot_layerwise_qubits( results=results, N_layers=experiment.N_layers, make_param_text=param_text, ) return results # -------------------- Analysis 2: layerwise_qubits_padding -------------------- if experiment.analysis_type == "layerwise_qubits_padding": results = defaultdict(list) if experiment.padding_types is None and experiment.initial_Pauli_string is None: padding_iter = [None] elif ( experiment.padding_types is None or experiment.initial_Pauli_string is None ): raise ValueError( "padding_types and initial_Pauli_string must either " "both be None or both be defined." ) else: padding_iter = experiment.padding_types for lay in experiment.N_layers: for pad in padding_iter: for nq in experiment.N_qubits: if pad is None: layerwise_obs_builder = None extra_print = None else: def layerwise_obs_builder( target_n: int, pad: PaddingType = pad, ): assert experiment.initial_Pauli_string is not None return pad_pauli_strings_growth( experiment.initial_Pauli_string, target_n=target_n, growth=pad, ) extra_print = f"Padding type: {pad}" point = run_single_point( nq=nq, lay=lay, obs_list_builder=layerwise_obs_builder, extra_print=extra_print, ) results[(lay, pad)].append(point) plot_layerwise_qubits_padding( results=results, N_layers=experiment.N_layers, padding_types=experiment.padding_types, padding_latex=padding_latex, make_param_text=param_text, ) return results # -------------------- Analysis 3: joint_scaling_padding -------------------- if experiment.analysis_type == "joint_scaling_padding": tracked = [] metadata = [] L = max(len(experiment.N_qubits), len(experiment.N_layers)) A_ext = extend_with_last(experiment.N_qubits, L) B_ext = extend_with_last(experiment.N_layers, L) if experiment.padding_types is None and experiment.initial_Pauli_string is None: obs_builder = None padding_type = None extra_print = None elif ( experiment.padding_types is None or experiment.initial_Pauli_string is None ): raise ValueError( "padding_types and initial_Pauli_string must either " "both be None or both be defined." ) else: padding_type = experiment.padding_types[0] def obs_builder(target_n: int): assert experiment.initial_Pauli_string is not None return pad_pauli_strings_growth( experiment.initial_Pauli_string, target_n=target_n, growth=padding_type, ) extra_print = f"Padding type: {padding_type}" for nq, lay in zip(A_ext, B_ext): point = run_single_point( nq=nq, lay=lay, obs_list_builder=obs_builder, extra_print=extra_print, ) tracked.append(point["var"]) metadata.append(point) joint_scaling_results: AnalysisResult = { "tracked": np.array(tracked), "metadata": metadata, "N_qubits_extended": A_ext, "N_layers_extended": B_ext, "padding_type": padding_type, } plot_joint_scaling_padding( tracked=np.array(tracked), A_ext=A_ext, B_ext=B_ext, N_qubits=experiment.N_qubits, N_layers=experiment.N_layers, padding_type=padding_type, padding_latex=padding_latex, Ansatz=experiment.ansatz_name, rel_err_target=sampling.rel_err_target, ) return joint_scaling_results return {}