import hydra
import numpy as np
from hydra.core.hydra_config import HydraConfig
from pathlib import Path
from qutip import Qobj, fock, ket2dm
from typing import Any, Dict, Optional, Tuple
from loguru import logger
from omegaconf import DictConfig
from mqed.Lindblad.quantum_dynamics import LindbladDynamics, NonHermitianSchDynamics, SimulationConfig
from mqed.Lindblad.quantum_operator import ipr_callable, position_square_operator, position_operator, site_population_operator, x_shift_conditional_callable, x_shift2_conditional_callable
from mqed.utils.dgf_data import load_gf_h5
from mqed.utils.logging_utils import setup_loggers_hydra_aware
from mqed.utils.hydra_local import prepare_hydra_config_path
from mqed.utils.save_hdf5 import save_dx_h5
[docs]
def resolve_initial_site_index(initial_state_cfg: DictConfig, nmol: int) -> int:
"""Return the 1-based reference site index used by observables/state defaults."""
candidate = initial_state_cfg.get("center_site", None)
if candidate is None:
candidate = initial_state_cfg.get("site_index", None)
if candidate is None:
candidate = nmol // 2 + 1
site_index = int(candidate)
if not 1 <= site_index <= nmol:
raise ValueError(
f"initial_state.site_index/center_site must be in [1, {nmol}], got {site_index}."
)
return site_index
[docs]
def build_initial_ket(initial_state_cfg: DictConfig, *, nmol: int) -> Tuple[Qobj, int]:
"""Build initial ket for single-site or Gaussian wave packet excitations."""
dim = nmol + 1
state_type = str(initial_state_cfg.get("type", "single_site")).lower()
reference_site = resolve_initial_site_index(initial_state_cfg, nmol)
if state_type in {"single_site", "site"}:
logger.info(f"Building single-site initial state localized at site: {reference_site}")
return fock(dim, reference_site), reference_site
if state_type == "gaussian":
logger.info("Building Gaussian initial state with parameters:k_parallel:{}, sigma_sites:{}".format(
initial_state_cfg.get("k_parallel", 0.0),
initial_state_cfg.get("sigma_sites", 1.0)
))
sigma_sites = float(initial_state_cfg.get("sigma_sites", 1.0))
if sigma_sites <= 0.0:
raise ValueError(
f"initial_state.sigma_sites must be positive for gaussian state, got {sigma_sites}."
)
k_parallel = float(initial_state_cfg.get("k_parallel", 0.0))
center_candidate = initial_state_cfg.get("center_site", None)
center_site = reference_site if center_candidate is None else int(center_candidate)
if not 1 <= center_site <= nmol:
raise ValueError(
f"initial_state.center_site must be in [1, {nmol}], got {center_site}."
)
site_positions = np.arange(1, nmol + 1, dtype=float)
displacement = site_positions - float(center_site)
envelope = np.exp(-0.5 * (displacement / sigma_sites) ** 2)
phase = np.exp(1j * k_parallel * displacement)
amplitudes = envelope * phase
norm = np.linalg.norm(amplitudes)
if norm <= 0.0:
raise ValueError("Gaussian initial state has zero norm; check sigma_sites.")
data = np.zeros(dim, dtype=np.complex128)
data[1:] = amplitudes / norm
return Qobj(data, dims=[[dim], [1]]), center_site
raise ValueError(
f"Unsupported initial_state.type '{state_type}'. Use 'single_site' or 'gaussian'."
)
[docs]
def build_initial_state(
initial_state_cfg: DictConfig,
*,
method: str,
nmol: int,
) -> Tuple[Qobj, int]:
"""Build method-compatible initial state and the observable reference site."""
ket, reference_site = build_initial_ket(initial_state_cfg, nmol=nmol)
if method == "Lindblad":
return ket2dm(ket), reference_site
if method == "NonHermitian":
return ket, reference_site
raise ValueError(f"Unknown solver.method = {method}")
[docs]
def build_observable(item: Dict[str, Any], *, dim: int, d_nm: float, Nmol: int, init_site: int) -> Tuple[str, object]:
"""
Turn one YAML 'observables' item into (key, obj_or_callable) for QuTiP.
- If 'kind' == 'operator' -> return Qobj
- If 'kind' == 'callable' -> return f(t, state) callable
"""
name = str(item["name"])
kind = str(item.get("kind", "operator"))
params = dict(item.get("params", {}) or {})
if name in {"X_shift_cond", "X_shift2_cond", "IPR_site"} and kind != "callable":
logger.warning(f"{name} is usually used with kind='callable' (got {kind!r})")
if name == "X_shift":
logger.info("Adding observable: position operator (X_shift)")
return name, position_operator(dim, d_nm, Nmol, init_site)
if name == "X_shift2":
logger.info("Adding observable: second moment of position operator (X_shift2)")
return name, position_square_operator(dim, d_nm, Nmol, init_site)
if name == "pop_site":
logger.info("Adding observable: population operator at specified site")
site = int(params.get("site", 1))
return f"pop_site_{site}", site_population_operator(dim, site) # label includes site
if name == "IPR_site":
# bind Nmol (required); any extra params are ignored safely
logger.info("Adding observable: Inverse Participation Ratio (requires Nmol param)")
Nmol_local = int(params.get("Nmol", Nmol))
return name, (lambda t, st: ipr_callable(t, st, Nmol=Nmol_local))
if name == "X_shift_cond":
logger.info("Adding observable: conditional position expectation (X_shift_cond)")
Nmol_local = int(params.get("Nmol", Nmol))
X_shift_op = position_operator(dim, d_nm, Nmol, init_site) # fixed operator, normalization in callable
return name, (
lambda t, st, X=X_shift_op, N=Nmol_local:
x_shift_conditional_callable(t, st, X_shift=X, Nmol=N)
)
if name == "X_shift2_cond":
logger.info("Adding observable: conditional second moment of position operator (X_shift2_cond)")
Nmol_local = int(params.get("Nmol", Nmol))
X_shift2_op = position_square_operator(dim, d_nm, Nmol, init_site)
return name, (
lambda t, st, X2=X_shift2_op, N=Nmol_local:
x_shift2_conditional_callable(t, st, X_shift2=X2, Nmol=N)
)
logger.error(f"Unknown observable name: {name!r}")
raise ValueError(f"Unknown observable name: {name!r}")
def _build_observables(
obs_cfg,
*,
dim: int,
d_nm: float,
Nmol: int,
init_site: int,
):
e_ops = {}
compute_root_msd = False
for item in obs_cfg:
name = str(item.get("name", ""))
if name == "root_MSD":
compute_root_msd = bool(item.get("enabled", True))
logger.info(f"root_MSD output enabled: {compute_root_msd}")
continue
key, obj = build_observable(
item,
dim=dim,
d_nm=d_nm,
Nmol=Nmol,
init_site=init_site,
)
e_ops[key] = obj
return e_ops, compute_root_msd
def app_run(cfg: DictConfig, output_dir: Optional[Path] = None):
if output_dir is None:
try:
output_dir = Path(HydraConfig.get().runtime.output_dir)
except Exception:
output_dir = Path.cwd()
output_dir.mkdir(parents=True, exist_ok=True)
setup_loggers_hydra_aware()
logger.info("--- Starting quantum dynamics simulation---")
# 1) Load Green's function slice at the emitter energy
data = load_gf_h5(cfg.greens.h5_path) # {"G_total","G_vac","energy_eV","Rx_nm","zD","zA"}
G_slice = data["G_total"] # (M,N,3,3)
E_eV = data["energy_eV"] # (M,)
Rx_nm = data["Rx_nm"]
logger.info(f"Dipole heights (nm): zD={data['zD']} zA={data['zA']}")
logger.info(f"inter-site distance (nm): {cfg.simulation.d_nm}")
G_slice = G_slice[0] # use the first energy slice for now
# 2) Build SimulationConfig (unify with your abstractions)
# breakpoint()
tlist = np.arange(cfg.simulation.t_ps.start, cfg.simulation.t_ps.stop, cfg.simulation.t_ps.output_step)
sim_cfg = SimulationConfig(
tlist=tlist,
emitter_frequency=E_eV[0],
Nmol=cfg.simulation.Nmol,
Rx_nm=Rx_nm,
d_nm=cfg.simulation.d_nm,
mu_D_debye=cfg.simulation.mu_D_debye,
mu_A_debye=cfg.simulation.mu_A_debye,
theta_deg=cfg.simulation.theta_deg,
phi_deg=cfg.simulation.phi_deg,
disorder_sigma_phi_deg=cfg.simulation.get('disorder_sigma_phi_deg', None),
mode=cfg.simulation.mode,
coupling_limit=cfg.simulation.get('coupling_limit', None),
)
# 3) Choose dynamics backend
method = cfg.solver.method
if method == 'Lindblad':
logger.info("Using Lindblad master equation solver.")
dyn = LindbladDynamics(sim_cfg, G_slice)
elif method == 'NonHermitian':
logger.info("Using Non-Hermitian Schrodinger Equation solver.")
dyn = NonHermitianSchDynamics(sim_cfg, G_slice)
else:
logger.error(f"Unknown solver.method = {method}")
raise ValueError(f"Unknown solver.method = {method}")
rho_or_psi, init_site = build_initial_state(
cfg.initial_state,
method=method,
nmol=sim_cfg.Nmol,
)
obs_cfg = getattr(cfg, "observables", []) or []
e_ops, compute_root_msd = _build_observables(
obs_cfg,
dim=sim_cfg.Nmol + 1,
d_nm=sim_cfg.d_nm,
Nmol=sim_cfg.Nmol,
init_site=init_site,
)
# 5) Evolve
result = dyn.evolve(rho_or_psi, e_ops=e_ops, options=None)
#calculate dx from expectations
# breakpoint()
# ex1 = np.asarray(result.expectations["X_shift"])
# ex2 = np.asarray(result.expectations["X_shift2"])
# dx = np.sqrt(np.maximum(0.0, ex2 - ex1**2))
dx = None
dx_std = None
if compute_root_msd:
# Prefer conditional moments if they exist (best for NHSE / fair comparisons)
if "X_shift_cond" in result.expectations and "X_shift2_cond" in result.expectations:
logger.info("Computing root_MSD from conditional moments: X_shift_cond, X_shift2_cond")
x = np.asarray(result.expectations["X_shift_cond"])
x2 = np.asarray(result.expectations["X_shift2_cond"])
# Fallback to raw moments
elif "X_shift" in result.expectations and "X_shift2" in result.expectations:
logger.info("Computing root_MSD from raw moments: X_shift, X_shift2")
x = np.asarray(result.expectations["X_shift"])
x2 = np.asarray(result.expectations["X_shift2"])
else:
raise ValueError(
"root_MSD requested, but need either "
"('X_shift_cond','X_shift2_cond') or ('X_shift','X_shift2')."
)
# MSD = <(x-x0)^2>, which is simply x2 (the second moment of
# displacement from the initial site). Note: x2 - x**2 would give
# the *variance* of displacement, not the MSD.
dx = np.sqrt(np.maximum(0.0, x2))
dx_std = np.zeros_like(dx)
# 6) Save to HDF5
outfile = output_dir / cfg.output.filename
# states = getattr(result, 'states', None)
logger.info(f"Saving results to {outfile.absolute()}")
save_legacy_aliases = bool(cfg.output.get("save_legacy_aliases", True))
mode = str(cfg.simulation.get("mode", "stationary"))
n_realizations = 1
if mode == "disorder" and hasattr(cfg, "disorder"):
n_realizations = int(cfg.disorder.get("n_realizations", 1))
expectations_to_save: Dict[str, np.ndarray] = {
key: np.asarray(val) for key, val in result.expectations.items()
}
if "X_shift" in expectations_to_save:
expectations_to_save["position_mean"] = expectations_to_save["X_shift"]
if "X_shift2" in expectations_to_save:
x2 = expectations_to_save["X_shift2"]
expectations_to_save["x2_mean"] = x2
x = expectations_to_save.get("X_shift")
# MSD = <(x-x0)^2> = x2 (second moment of displacement).
# Previously this was x2 - x**2, which is the *variance*, not MSD.
expectations_to_save["msd_mean"] = np.maximum(0.0, x2)
if x is not None:
# Also store the variance for reference (not MSD).
expectations_to_save["variance_mean"] = np.maximum(0.0, x2 - x**2)
if not save_legacy_aliases:
expectations_to_save.pop("X_shift", None)
expectations_to_save.pop("X_shift2", None)
save_dx_h5(
outfile=outfile,
t_ps=result.tlist,
dx_mean_nm=dx,
dx_std_nm=dx_std,
method=method,
mode=mode,
n_realizations=n_realizations,
expectations=expectations_to_save,
extra_attrs={
"root_MSD_enabled": compute_root_msd,
"save_legacy_aliases": save_legacy_aliases,
},
)
logger.success(f"Simulation complete. Output saved to: {outfile.absolute()}")
HYDRA_CONFIG_PATH: str = prepare_hydra_config_path("Lindblad", __file__)
@hydra.main(config_path=HYDRA_CONFIG_PATH, config_name="quantum_dynamics", version_base=None)
def mqed_lindblad(cfg: Optional[DictConfig] = None):
if cfg is None:
raise ValueError("Hydra did not provide configuration.")
cfg.solver.method = "Lindblad"
app_run(cfg)
@hydra.main(config_path=HYDRA_CONFIG_PATH, config_name="quantum_dynamics_nhse", version_base=None)
def mqed_nhse(cfg: Optional[DictConfig] = None):
if cfg is None:
raise ValueError("Hydra did not provide configuration.")
cfg.solver.method = "NonHermitian"
app_run(cfg)
if __name__ == "__main__":
mqed_nhse()