"""Module meant to study barren plateaus"""
import os
from collections import defaultdict
from itertools import repeat
from typing import (
Any,
Callable,
Literal,
Optional,
Protocol,
Sequence,
TypedDict,
TypeVar,
)
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.lines import Line2D
from numpy.typing import ArrayLike
from qiskit import QuantumCircuit
from qiskit.quantum_info import Pauli
from tqdm.auto import tqdm
T = TypeVar("T")
from dataclasses import dataclass
PaddingType = Literal["identity", "I", "linear_half", "log"]
[docs]
@dataclass
class SamplingConfig:
bootstrap_B: int = 500
"""Number of bootstrap resamples used to estimate the uncertainty of the variance estimate."""
rel_err_target: float = 0.05
"""Target relative error for the variance estimate."""
N_min: int = 900
"""Minimum number of samples collected before checking convergence."""
N_batch: int = 400
"""Number of new samples added at each adaptive sampling step."""
N_max: int = 20000
"""Maximum number of samples allowed."""
safety_factor: float = 1.2
"""Multiplicative safety factor applied to the estimated required sample count."""
max_batch: int = 10000
"""Maximum batch size used during adaptive sampling."""
[docs]
@dataclass
class ExecutionConfig:
n_jobs: int = -1
"""Number of parallel jobs used for sampling. A value of -1 usually means that all available cores are used."""
verbose: bool = True
"""Whether to print progress information during the analysis."""
[docs]
@dataclass
class ExperimentConfig:
analysis_type: str
"""Type of scaling analysis to run. This selects how qubit
counts, layer counts, observables, and padding strategies are combined."""
N_qubits: Sequence[int]
"""List of system sizes to test."""
N_layers: Sequence[int]
"""List of circuit depths to test."""
ansatz_name: str
"""Name of the ansatz used to generate circuits."""
observables_list: Sequence[Pauli | str] | None = None
"""Observables used in the cost function. Entries may be Qiskit Pauli
objects or Pauli strings."""
initial_Pauli_string: Pauli | str | None = None
"""Initial Pauli string used when observables are
generated by padding."""
padding_types: Sequence[PaddingType] | None = None
"""Padding strategies used to grow the initial Pauli string
with the number of qubits."""
[docs]
def extend_with_last(
lst: Sequence[T],
target_len: int,
) -> list[T]:
"""Extend a sequence to a target length by repeating its last element.
Args:
lst: Input sequence to extend. Must not be empty.
target_len: Desired minimum length of the output list. If target_len is
smaller than len(lst), the original list is returned unchanged.
Returns:
A list extended to ``target_len`` by repeating the last element.
"""
if len(lst) == 0:
raise ValueError("List must not be empty")
return list(lst) + list(repeat(lst[-1], target_len - len(lst)))
[docs]
def pad_pauli_strings_growth(
pauli: Pauli | str,
target_n: int,
growth: PaddingType = "linear_half",
log_base: float = 2,
) -> Pauli:
"""Pad a Pauli string on the left to reach a target number of qubits.
Args:
pauli: Input Pauli string. (for instance ``Pauli("IIYY")``)
target_n: Desired total number of qubits.
growth: Padding strategy. Available options are:
- "identity" or "I": Pad with identity operators only.
- "linear_half": Pad with X, Y or Z operators such that the total
number of X/Y/Z operators is approximately ``target_n / 2``.
- "log": Pad with X, Y or Z operators such that the total number
of X/Y/Z operators is approximately ``log(target_n)``.
log_base: Base of the logarithm used when ``growth="log"``.
Returns:
Padded Pauli string of size ``target_n``.
"""
def round_value(x: float):
return int(np.ceil(x))
def target_number_of_active_paulis(n: int):
if growth == "linear_half":
return round_value(n / 2)
elif growth == "log":
return 1 if n <= 1 else round_value(np.log(n) / np.log(log_base))
else:
raise ValueError("growth must be 'identity', 'I', 'linear_half', or 'log'")
label = pauli.to_label() if isinstance(pauli, Pauli) else pauli
n_current = len(label)
if n_current > target_n:
raise ValueError(f"Pauli {label} has length {n_current} > target_n={target_n}")
n_padding = target_n - n_current
if growth in ["identity", "I"]:
padding = "I" * n_padding
padded_label = padding + label
return Pauli(padded_label)
if "Z" in label:
pad_char = "Z"
elif "Y" in label:
pad_char = "Y"
elif "X" in label:
pad_char = "X"
else:
pad_char = "Y"
current_count = label.count(pad_char)
target_count = target_number_of_active_paulis(target_n)
n_active_to_add = max(0, target_count - current_count)
n_active_to_add = min(n_active_to_add, n_padding)
n_identity_to_add = n_padding - n_active_to_add
padding = "I" * n_identity_to_add + pad_char * n_active_to_add
padded_label = padding + label
return Pauli(padded_label)
[docs]
def bootstrap_var_diagnostic_1d(
var: float,
L_samples: np.ndarray,
B: int = 500,
rng: np.random.Generator | None = None,
rel_err_warn: float = 0.05,
rel_err_fail: float = 0.10,
) -> dict[str, Any]:
"""Perform a bootstrap diagnostic for a single variance estimate.
Args:
var: Reference estimate of the variance of the loss.
L_samples: Array of loss values evaluated for different parameter
samples. Can have shape ``(N_samples,)`` or ``(N_samples, 1)``.
B: Number of bootstrap resamples. Default is 500.
rng: Random number generator for reproducibility. If ``None``,
a default generator is used.
rel_err_warn: Relative error threshold for issuing a warning.
rel_err_fail: Relative error threshold for declaring failure.
Returns:
Dictionary containing diagnostic quantities, including:
- bootstrap estimates of the standard deviation
- relative error metrics
- diagnostic flags based on thresholds
"""
if rng is None:
rng = np.random.default_rng()
assert rng is not None
L_samples = np.asarray(L_samples, dtype=float)
if L_samples.ndim == 2:
if L_samples.shape[1] != 1:
raise ValueError(
f"L_samples has shape {L_samples.shape}, but this diagnostic "
"expects a single Pauli observable, i.e. shape (N_samples,) "
"or (N_samples, 1)."
)
L_samples = L_samples[:, 0]
elif L_samples.ndim != 1:
raise ValueError(
f"L_samples must be 1D or shape (N_samples, 1), got shape {L_samples.shape}"
)
N_samples = len(L_samples)
score = float(var)
boot_scores = np.empty(B)
for b in range(B):
idx = rng.integers(0, N_samples, size=N_samples)
Lb = L_samples[idx]
boot_scores[b] = np.var(Lb, ddof=1)
score_se = np.std(boot_scores, ddof=1)
score_rel_err = score_se / max(abs(score), 1e-30)
if score_rel_err > rel_err_fail:
advice = "/!\\ Increase N_samples."
elif score_rel_err > rel_err_warn:
advice = "/!\\ OK for trends; increase N_samples for reliability."
elif score_rel_err < 0.01:
advice = "Very stable estimate; N_samples may be reducible if runtime matters."
else:
advice = "N_samples in right zone."
return {
"var": score,
"var_se": score_se,
"rel_err": score_rel_err,
"boot_scores": boot_scores,
"advice": advice,
"passed": score_rel_err <= rel_err_warn,
"N_samples": N_samples,
}
[docs]
def adaptive_sampling_var(
sample_function: Callable[[], Any],
N_min: int = 50,
N_batch: int = 50,
N_max: int = 2000,
B: int = 500,
rel_err_target: float = 0.05,
observable_index: int = 0,
rng: np.random.Generator | None = None,
verbose: bool = True,
safety_factor: float = 1.2,
max_batch: int | None = None,
abs_err_target: float | None = None,
n_jobs: int = -1,
) -> tuple[np.ndarray, np.ndarray, dict[str, Any] | None]:
r"""Perform adaptive sampling to estimate the loss variance with
bootstrap-based error control.
This method dynamically adjusts the number of samples using the expected
`1 / \sqrt(N)` scaling of the bootstrap relative error.
Args:
sample_function: Callable that returns a single sample
N_min: Initial number of samples. Default is 50.
N_batch: Minimum number of additional samples added per iteration.
In this adaptive scheme, the effective batch size may vary.
N_max: Maximum allowed number of samples. Default is 2000.
B: Number of bootstrap resamples. Default is 500.
rel_err_target: Target relative bootstrap error. Default is 0.05.
observable_index: Index of the observable used for the stopping criterion.
rng: Random number generator for reproducibility. If ``None``,
a default generator is used.
verbose: Whether to print progress information. Default is True.
safety_factor: Multiplicative safety factor applied to the estimated
required sample size. Typical values range from 1.1 to 1.5.
max_batch: Maximum number of samples added in a single iteration.
If ``None``, no explicit cap is applied apart from ``N_max``.
abs_err_target: Optional absolute bootstrap error target.
If provided, the algorithm stops when either:
- ``relative_error <= rel_err_target``, or
- ``absolute_error <= abs_err_target``.
This is particularly useful when the standard deviation is very
small and relative error becomes overly strict.
n_jobs: Number of parallel jobs used when drawing new samples.
Passed to joblib.Parallel. A value of -1 usually uses all available cores.
Returns:
A the three following elements: an array containing all collected
samples; the variance computed over all samples; and a dictionary
containing bootstrap diagnostic information for the selected observable.
"""
if rng is None:
rng = np.random.default_rng()
L_samples = []
diagnostic = None
var: Optional[np.ndarray] = None
while True:
# -------------------- Number of samples to add --------------------
n_current = len(L_samples)
if n_current == 0:
n_to_add = N_min
else:
assert diagnostic is not None
rel_err = diagnostic["rel_err"]
# Estimate required total N using rel_err ~ 1 / sqrt(N)
if rel_err > 0 and np.isfinite(rel_err):
N_required = int(
np.ceil(n_current * (rel_err / rel_err_target) ** 2 * safety_factor)
)
else:
N_required = n_current + N_batch
# At least add N_batch samples
n_to_add = max(N_required - n_current, N_batch)
# Optional cap on one adaptive jump
if max_batch is not None:
n_to_add = min(n_to_add, max_batch)
# Do not exceed N_max
if n_current + n_to_add > N_max:
n_to_add = N_max - n_current
if n_to_add <= 0:
break
if verbose and n_current > 0:
print(
f"[Adaptive sampling] "
f"Adding {n_to_add} samples "
f"(N: {n_current} -> {n_current + n_to_add})"
)
# -------------------- Add new samples --------------------
from joblib import Parallel, delayed
results = Parallel(n_jobs=n_jobs)(
delayed(sample_function)()
for _ in tqdm(range(n_to_add), desc="Adding samples", leave=False)
)
L_samples.extend(results)
L_samples_array = np.asarray(L_samples)
# -------------------- Compute current var --------------------
var = np.var(L_samples_array, axis=0, ddof=1)
assert var is not None
# -------------------- Pick observable for diagnostic --------------------
var_obs: float
if L_samples_array.ndim == 1:
L_samples_obs = L_samples_array
var_obs = float(var)
else:
L_samples_obs = L_samples_array[:, observable_index]
var_obs = var[observable_index]
# -------------------- Bootstrap diagnostic --------------------
diagnostic = bootstrap_var_diagnostic_1d(
var_obs,
L_samples_obs,
B=B,
rng=rng,
rel_err_warn=rel_err_target,
rel_err_fail=2 * rel_err_target,
)
rel_err = diagnostic["rel_err"]
# Absolute bootstrap error, if available or inferable
abs_err = diagnostic.get("abs_err", None)
if abs_err is None:
abs_err = rel_err * abs(var_obs)
if verbose:
msg = (
f"[Adaptive sampling] "
f"N = {len(L_samples)} | "
f"relative error = {100 * rel_err:.2f}%"
)
if abs_err_target is not None:
msg += f" | absolute error = {abs_err:.3e}"
print(msg)
# -------------------- Stopping criteria --------------------
rel_criterion_reached = rel_err <= rel_err_target
abs_criterion_reached = abs_err_target is not None and abs_err <= abs_err_target
if rel_criterion_reached or abs_criterion_reached:
if verbose:
if rel_criterion_reached:
reason = "relative error target reached"
else:
reason = "absolute error target reached"
print(
f"[Adaptive sampling] Target reached with "
f"N_samples = {len(L_samples)} "
f"({reason})"
)
break
if len(L_samples) >= N_max:
if verbose:
print(
f"[Adaptive sampling] Maximum reached: "
f"N_samples = {len(L_samples)} | "
f"relative error = {100 * rel_err:.2f}%"
)
break
assert var is not None
return np.asarray(L_samples), var, diagnostic
[docs]
class Result(TypedDict):
nq: int
var: float | np.ndarray
obs: Optional[Pauli]
[docs]
class LayerResult(Result):
lay: int
[docs]
def plot_layerwise_qubits(
results: dict[int, list[Result]],
N_layers: Sequence[int],
make_param_text: str,
) -> None:
"""Plot the variance of loss values as a function of the number of qubits
for different circuit depths.
Args:
results: Mapping from circuit depth to computed statistics.
Each entry must contain dictionaries with at least:
- "nq": number of qubits
- "var": variance value
- "obs": observable identifier
N_layers: Sequence of circuit depths to display.
make_param_text: Parameter summary text displayed
on the figure.
"""
param_text = make_param_text
layer_colors = plt.cm.tab10( # pyright: ignore[reportAttributeAccessIssue]
np.linspace(0, 1, max(len(N_layers), 1))
)
layer_to_color = {
lay: layer_colors[i % len(layer_colors)] for i, lay in enumerate(N_layers)
}
plt.figure(figsize=(12, 7))
for lay in N_layers:
if lay not in results:
continue
pts = sorted(results[lay], key=lambda d: d["nq"])
xs = [d["nq"] for d in pts]
ys = [d["var"] for d in pts]
plt.semilogy(
xs,
ys,
marker="o",
color=layer_to_color[lay],
linewidth=2,
markersize=5,
label=None,
)
plt.xlabel(r"Number of qubits $n_q$")
plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$")
plt.title("Loss landscape concentration with increasing system size")
plt.grid(True, alpha=0.4)
handles_L = [
Line2D(
[0],
[0],
color=layer_to_color[lay],
lw=2,
label=rf"$L={lay}$",
)
for lay in N_layers
]
plt.legend(
handles=handles_L,
title="Depth",
loc="best",
fontsize=11,
handlelength=4,
)
plt.text(
0.02,
0.02,
param_text,
transform=plt.gca().transAxes,
ha="left",
va="bottom",
fontsize=11,
bbox=dict(
boxstyle="round",
facecolor="white",
edgecolor="gray",
alpha=0.55,
),
)
plt.tight_layout()
plt.savefig("figures/layerwise_qubits.pdf")
plt.show()
[docs]
def plot_layerwise_qubits_padding(
results: dict[tuple[int, Optional[PaddingType]], list[Result]],
N_layers: Sequence[int],
padding_types: Optional[Sequence[PaddingType]],
padding_latex: dict[str, str],
make_param_text: str,
) -> None:
"""Plot the variance of loss values as a function of the number of qubits
for different circuit depths and padding strategies.
Args:
results: Mapping indexed by ``(depth, padding_type)`` tuples.
Each entry must contain dictionaries with at least:
- "nq": number of qubits
- "var": variance value
- "obs": observable identifier
N_layers: Sequence of circuit depths to display.
padding_types: Sequence of padding strategies to include in the plot.
padding_latex: Mapping from padding strategy identifiers to LaTeX labels
used in the legend.
make_param_text: Parameter summary text displayed
on the figure.
"""
param_text = make_param_text
markers = ["o", "s", "^", "D", "v", "P", "X"]
linestyles = ["-", "--", "-.", ":"]
layer_colors = plt.cm.tab10( # pyright: ignore[reportAttributeAccessIssue]
np.linspace(0, 1, max(len(N_layers), 1))
)
layer_to_color = {
lay: layer_colors[i % len(layer_colors)] for i, lay in enumerate(N_layers)
}
padding_to_marker = {
pad: markers[i % len(markers)] for i, pad in enumerate(padding_types or [None])
}
padding_to_linestyle = {
pad: linestyles[i % len(linestyles)]
for i, pad in enumerate(padding_types or [None])
}
plt.figure(figsize=(12, 7))
for lay in N_layers:
for pad in padding_types or [None]:
key = (lay, pad)
if key not in results:
continue
pts = sorted(results[key], key=lambda d: d["nq"])
xs = [d["nq"] for d in pts]
ys = [d["var"] for d in pts]
plt.semilogy(
xs,
ys,
linestyle=padding_to_linestyle[pad],
marker=padding_to_marker[pad],
color=layer_to_color[lay],
linewidth=2,
markersize=5,
label=None,
)
plt.xlabel(r"Number of qubits $n_q$")
plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$")
plt.title(
rf"Loss landscape concentration with increasing system size for different padding types"
)
plt.grid(True, alpha=0.4)
handles_L = [
Line2D([0], [0], color=layer_to_color[lay], lw=2, label=rf"$L={lay}$")
for lay in N_layers
]
handles_padding = (
[]
if padding_types is None
else [
Line2D(
[0],
[0],
color="black",
linestyle=padding_to_linestyle[pad],
marker=padding_to_marker[pad],
lw=2,
label=padding_latex.get(pad, str(pad)),
)
for pad in padding_types
]
)
plt.legend(
handles=handles_L + handles_padding,
title="Depth & Padding type",
loc="best",
fontsize=11,
handlelength=4,
)
plt.text(
0.02,
0.02,
param_text,
transform=plt.gca().transAxes,
ha="left",
va="bottom",
fontsize=11,
bbox=dict(boxstyle="round", facecolor="white", edgecolor="gray", alpha=0.55),
)
plt.tight_layout()
os.makedirs("figures", exist_ok=True)
plt.savefig("figures/layerwise_qubits_padding.pdf")
plt.show()
[docs]
def plot_joint_scaling_padding(
tracked: ArrayLike,
A_ext: Sequence[int],
B_ext: Sequence[int],
N_qubits: Sequence[int],
N_layers: Sequence[int],
padding_type: str | None,
padding_latex: dict[str, str],
Ansatz: str,
rel_err_target: float,
) -> None:
"""
Plot the variance of loss values as a joint
function of system size and circuit depth for a given padding strategy.
Args:
tracked: Sequence of tracked variance values.
A_ext: Extended sequence associated with the primary scan axis.
B_ext: Extended sequence associated with the secondary grouping axis.
N_qubits: Sequence of qubit counts considered in the scan.
N_layers: Sequence of circuit depths considered in the scan.
padding_type: Identifier of the padding strategy used.
padding_latex: Mapping from padding strategy identifiers to LaTeX labels.
Ansatz: Name of the variational ansatz displayed in the figure.
rel_err_target: Target relative error displayed in the figure annotations.
"""
if len(N_qubits) >= len(N_layers):
x_axis = A_ext
secondary = B_ext
x_label = r"Number of qubits $n_q$"
sec_label = "L"
else:
x_axis = B_ext
secondary = A_ext
x_label = r"Number of layers $L$"
sec_label = "n"
tracked = np.array(tracked)
padding_label = padding_latex.get(
padding_type,
rf"Padding: {padding_type}",
)
if not padding_label.startswith("Padding"):
padding_label = rf"padding: {padding_label}"
param_text = (
f"{Ansatz}\n"
f"{padding_label}\n"
f"$\\epsilon_{{rel}} = {rel_err_target:.0%}$".replace("%", "\\%")
)
groups_j = defaultdict(list)
for x, s, y in zip(x_axis, secondary, tracked):
groups_j[s].append((x, y))
plt.figure(figsize=(12, 7))
all_pts = sorted(zip(x_axis, tracked))
xs_all, ys_all = zip(*all_pts)
plt.plot(
xs_all,
ys_all,
"-",
color="black",
linewidth=1.5,
alpha=0.6,
)
for s_val, pts in sorted(groups_j.items()):
pts = sorted(pts)
xs, ys = zip(*pts)
plt.plot(
xs, ys, "-o", linewidth=2, markersize=5, label=rf"${sec_label} = {s_val}$"
)
plt.xlabel(x_label)
plt.ylabel(r"$\mathrm{Var}_{\theta}(L)$")
plt.title("Loss landscape concentration with increasing system size")
plt.grid(True, alpha=0.4)
plt.legend()
plt.text(
0.02,
0.02,
param_text,
transform=plt.gca().transAxes,
ha="left",
va="bottom",
fontsize=11,
bbox=dict(boxstyle="round", facecolor="white", edgecolor="gray", alpha=0.85),
)
plt.yscale("log")
plt.tight_layout()
plt.savefig("figures/joint_scaling_padding.pdf")
plt.show()
[docs]
class AnalysisResult(TypedDict):
tracked: np.ndarray
metadata: list[Result]
N_qubits_extended: list[int]
N_layers_extended: list[int]
padding_type: PaddingType | None
[docs]
class CostFunctionContext(TypedDict):
n_qubits: int
depth: int
Ansatz: str
circuits: QuantumCircuit
observables: Optional[Pauli]
[docs]
class CostFunctionBuilder(Protocol):
def __call__(
self, context: CostFunctionContext, *args: Any, **kwargs: Any
) -> Callable[[list[float]], float]: ...
[docs]
class VarianceNormalisation(Protocol):
def __call__(
self,
n_qubits: int,
depth: int,
Ansatz: str,
observable: Pauli | None,
context: CostFunctionContext,
raw_var: np.ndarray,
L_samples: np.ndarray,
) -> float: ...
[docs]
def barren_plateaus_analysis(
experiment: ExperimentConfig,
cost_function_builder: CostFunctionBuilder,
generate_params: Callable[[int, int], list[float]],
generate_circuits: Callable[[int, int, str], QuantumCircuit],
sampling: SamplingConfig | None = None,
execution: ExecutionConfig | None = None,
variance_normalization: VarianceNormalisation | None = None,
**cost_kwargs: Any,
) -> AnalysisResult | dict[int, list[Result]]:
"""Run a barren plateau scaling analysis for a given experiment configuration.
The function builds the requested circuits and cost functions, samples loss
values, estimates their variance, and stores the results for the selected
analysis type. Optional sampling and execution configurations control the
adaptive sampling procedure and parallel execution. Extra keyword arguments are
passed to the cost function builder.
Args:
experiment: Main experiment configuration.
cost_function_builder: Function used to build the cost function for each
experiment setting.
generate_params: Function used to generate random circuit parameters.
generate_circuits: Function used to generate circuits for the selected
ansatz and settings.
sampling: Sampling configuration. If None, default sampling settings are
used.
execution: Execution configuration. If None, default execution settings
are used.
variance_normalization: Optional function used to normalize the
estimated variance before storing or plotting it.
**cost_kwargs: Additional keyword arguments passed to the cost function
builder.
Returns:
Results produced by the selected analysis type. The returned objects contain
variance estimates and metadata such as qubit count, layer count and observable.
"""
# -------------------- Validation --------------------
if sampling is None:
sampling = SamplingConfig()
if execution is None:
execution = ExecutionConfig()
valid_analysis_types = {
"layerwise_qubits",
"layerwise_qubits_padding",
"joint_scaling_padding",
}
if experiment.analysis_type not in valid_analysis_types:
raise ValueError(
f"Unknown analysis_type='{experiment.analysis_type}'. "
f"Expected one of {valid_analysis_types}."
)
if experiment.analysis_type == "layerwise_qubits":
if experiment.observables_list is not None and len(experiment.N_qubits) != len(
experiment.observables_list
):
raise ValueError(
"For analysis_type='layerwise_qubits', "
"N_qubits and observables_list must have the same length."
)
if experiment.analysis_type == "layerwise_qubits_padding":
if experiment.padding_types is None and experiment.initial_Pauli_string is None:
pass
elif (
experiment.padding_types is None or experiment.initial_Pauli_string is None
):
raise ValueError(
"analysis_type='layerwise_qubits_padding' requires "
"padding_types and initial_Pauli_string to either "
"both be None or both be defined."
)
if experiment.analysis_type == "joint_scaling_padding":
if experiment.padding_types is None and experiment.initial_Pauli_string is None:
pass
elif (
experiment.padding_types is None or experiment.initial_Pauli_string is None
):
raise ValueError(
"analysis_type='joint_scaling_padding' requires "
"padding_types and initial_Pauli_string to either "
"both be None or both be defined."
)
# -------------------- Helpers --------------------
def run_single_point(
nq: int,
lay: int,
obs_list_builder: Optional[Callable[[int], Pauli]] | None,
extra_print: Optional[str] = None,
) -> LayerResult:
print("--------------------")
print(f"Number of qubits: {nq}")
print(f"Number of layers: {lay}")
if extra_print is not None:
print(extra_print)
n_qubits = nq
depth = lay
circuit = generate_circuits(n_qubits, depth, experiment.ansatz_name)
observable = obs_list_builder(nq) if obs_list_builder is not None else None
if observable is not None:
print(f"Pauli string: {observable}")
context: CostFunctionContext = {
"n_qubits": n_qubits,
"depth": depth,
"Ansatz": experiment.ansatz_name,
"circuits": circuit,
"observables": observable,
}
cost_function_expv = cost_function_builder(
context=context,
**cost_kwargs,
)
def sample_once():
theta = generate_params(n_qubits, depth)
return cost_function_expv(theta)
L_samples, raw_var, _ = adaptive_sampling_var(
sample_function=sample_once,
N_min=sampling.N_min,
N_batch=sampling.N_batch,
N_max=sampling.N_max,
B=sampling.bootstrap_B,
rel_err_target=sampling.rel_err_target,
observable_index=0,
safety_factor=sampling.safety_factor,
max_batch=sampling.max_batch,
verbose=execution.verbose,
n_jobs=execution.n_jobs,
)
# -------------------- Optional variance normalization --------------------
variance_scale = None
if variance_normalization is not None:
variance_scale = float(
variance_normalization(
n_qubits=n_qubits,
depth=depth,
Ansatz=experiment.ansatz_name,
observable=observable,
context=context,
raw_var=raw_var,
L_samples=L_samples,
)
)
if variance_scale <= 0:
raise ValueError(
f"variance_normalization must return a positive scale, "
f"got {variance_scale}."
)
var = raw_var if variance_scale is None else raw_var / variance_scale
print(f"var = {raw_var}")
if variance_scale is not None:
print(f"variance scale = {variance_scale}")
print(f"normalized var = {var}")
return {
"nq": nq,
"lay": lay,
"var": var,
"obs": observable,
}
use_state_vector = cost_kwargs.get("use_state_vector", None)
shots = cost_kwargs.get("shots", None)
lines = [
f"{experiment.ansatz_name}",
]
if use_state_vector is not None:
lines.append(f"state_vector = {use_state_vector}")
if use_state_vector is False and shots is not None:
lines.append(f"shots = {shots:.0e}")
lines.append(
f"$\\epsilon_{{rel}} = {sampling.rel_err_target:.0%}$".replace("%", "\\%")
)
param_text = "\n".join(lines)
padding_latex = {
"identity": r"$I^{n_q}$",
"I": r"$I^{n_q}$",
"linear_half": r"$n_q/2$",
"log": r"$\log(n_q)$",
}
# -------------------- Analysis 1: layerwise_qubits --------------------
if experiment.analysis_type == "layerwise_qubits":
results = defaultdict(list)
for lay in experiment.N_layers:
if experiment.observables_list is None:
iterable = [(nq, None) for nq in experiment.N_qubits]
else:
iterable = zip(
experiment.N_qubits,
experiment.observables_list,
)
for nq, Pauli_string in iterable:
if Pauli_string is None:
obs_builder_1 = None
else:
def obs_builder_1(target_n: int):
if isinstance(Pauli_string, Pauli):
label = Pauli_string.to_label()
pauli = Pauli_string
else:
label = Pauli_string
pauli = Pauli(Pauli_string)
if len(label) != target_n:
raise ValueError(
f"Pauli string size mismatch: got '{label}' "
f"(len={len(label)}), expected {target_n}"
)
return pauli
point = run_single_point(
nq=nq,
lay=lay,
obs_list_builder=obs_builder_1,
)
results[lay].append(point)
plot_layerwise_qubits(
results=results,
N_layers=experiment.N_layers,
make_param_text=param_text,
)
return results
# -------------------- Analysis 2: layerwise_qubits_padding --------------------
if experiment.analysis_type == "layerwise_qubits_padding":
results = defaultdict(list)
if experiment.padding_types is None and experiment.initial_Pauli_string is None:
padding_iter = [None]
elif (
experiment.padding_types is None or experiment.initial_Pauli_string is None
):
raise ValueError(
"padding_types and initial_Pauli_string must either "
"both be None or both be defined."
)
else:
padding_iter = experiment.padding_types
for lay in experiment.N_layers:
for pad in padding_iter:
for nq in experiment.N_qubits:
if pad is None:
layerwise_obs_builder = None
extra_print = None
else:
def layerwise_obs_builder(
target_n: int,
pad: PaddingType = pad,
):
assert experiment.initial_Pauli_string is not None
return pad_pauli_strings_growth(
experiment.initial_Pauli_string,
target_n=target_n,
growth=pad,
)
extra_print = f"Padding type: {pad}"
point = run_single_point(
nq=nq,
lay=lay,
obs_list_builder=layerwise_obs_builder,
extra_print=extra_print,
)
results[(lay, pad)].append(point)
plot_layerwise_qubits_padding(
results=results,
N_layers=experiment.N_layers,
padding_types=experiment.padding_types,
padding_latex=padding_latex,
make_param_text=param_text,
)
return results
# -------------------- Analysis 3: joint_scaling_padding --------------------
if experiment.analysis_type == "joint_scaling_padding":
tracked = []
metadata = []
L = max(len(experiment.N_qubits), len(experiment.N_layers))
A_ext = extend_with_last(experiment.N_qubits, L)
B_ext = extend_with_last(experiment.N_layers, L)
if experiment.padding_types is None and experiment.initial_Pauli_string is None:
obs_builder = None
padding_type = None
extra_print = None
elif (
experiment.padding_types is None or experiment.initial_Pauli_string is None
):
raise ValueError(
"padding_types and initial_Pauli_string must either "
"both be None or both be defined."
)
else:
padding_type = experiment.padding_types[0]
def obs_builder(target_n: int):
assert experiment.initial_Pauli_string is not None
return pad_pauli_strings_growth(
experiment.initial_Pauli_string,
target_n=target_n,
growth=padding_type,
)
extra_print = f"Padding type: {padding_type}"
for nq, lay in zip(A_ext, B_ext):
point = run_single_point(
nq=nq,
lay=lay,
obs_list_builder=obs_builder,
extra_print=extra_print,
)
tracked.append(point["var"])
metadata.append(point)
joint_scaling_results: AnalysisResult = {
"tracked": np.array(tracked),
"metadata": metadata,
"N_qubits_extended": A_ext,
"N_layers_extended": B_ext,
"padding_type": padding_type,
}
plot_joint_scaling_padding(
tracked=np.array(tracked),
A_ext=A_ext,
B_ext=B_ext,
N_qubits=experiment.N_qubits,
N_layers=experiment.N_layers,
padding_type=padding_type,
padding_latex=padding_latex,
Ansatz=experiment.ansatz_name,
rel_err_target=sampling.rel_err_target,
)
return joint_scaling_results
return {}