Source code for fiesta.train.AfterglowData

"""Method to train the surrogate models"""
import os
from xmlrpc.client import Boolean
import numpy as np
import ast
import h5py
import tqdm
from multiprocessing import Pool, Value

from fiesta.constants import days_to_seconds

try:
    import afterglowpy as grb
except ImportError:
    pass


[docs] class AfterglowData: def __init__(self, outfile: str, n_training: int, n_val: int, n_test: int, parameter_distributions: dict = None, jet_type: int = -1, tmin: float = 1., tmax: float = 1000., n_times: int = 100, use_log_spacing: bool = True, numin: float = 1e9, numax: float = 2.5e18, n_nu: int = 256, fixed_parameters: dict = None) -> None: if fixed_parameters is None: fixed_parameters = {} outdir = os.path.dirname(outfile) if not os.path.exists(outdir): os.makedirs(outdir) self.outfile = outfile self.n_training = n_training self.n_val = n_val self.n_test = n_test if os.path.exists(self.outfile): self._read_file() else: self.jet_type = jet_type if self.jet_type not in [-1, 0, 2]: raise ValueError(f"Jet type {jet_type} is not supported. Supported jet types are: [-1, 0, 2]") self.initialize_times(tmin, tmax, n_times, use_log_spacing) # create time array self.initialize_nus(numin, numax, n_nu) # create frequency array self.parameter_names = list(parameter_distributions.keys()) self.parameter_distributions = parameter_distributions self._initialize_file() # initialize the h5 file the data is later written to self.n_training_exists, self.n_val_exists, self.n_test_exists = 0, 0, 0 print(f"Initialized fiesta.train.AfterglowData \nJet type: {self.jet_type} \nParameters: {self.parameter_names} \nTimes: {self.times[0]} {self.times[-1]} {len(self.times)} \nNus: {self.nus[0]:.3e} {self.nus[-1]:.3e} {len(self.nus)} \nparameter_distributions: {self.parameter_distributions}\nExisting train, val, test: {self.n_training_exists}, {self.n_val_exists}, {self.n_test_exists} \n \n \n") self.fixed_parameters = fixed_parameters self.get_raw_data(self.n_training, "train") # create new data and save it to file self.get_raw_data(self.n_val, "val") self.get_raw_data(self.n_test, "test")
[docs] def initialize_times(self, tmin, tmax, n_times, use_log_spacing: bool = True): if use_log_spacing: times = np.logspace(np.log10(tmin), np.log10(tmax), num=n_times) else: times = np.linspace(tmin, tmax, num=n_times) self.times = times
[docs] def initialize_nus(self, numin: float, numax: float, n_nu: int): self.nus = np.logspace(np.log10(numin), np.log10(numax), n_nu)
def _initialize_file(self,): with h5py.File(self.outfile, "w") as f: f.create_dataset("times", data = self.times) f.create_dataset("nus", data = self.nus) f.create_dataset("parameter_names", data = self.parameter_names) f.create_dataset("parameter_distributions", data = str(self.parameter_distributions)) f.create_dataset("jet_type", data = self.jet_type) f.create_group("train"); f.create_group("val"); f.create_group("test"); f.create_group("special_train")
[docs] def get_raw_data(self, n: int, group: str): if group == "train": training = True else: training = False nchunks, rest = divmod(n, self.chunk_size) # create raw data in chunks of chunk_size for chunk in tqdm.tqdm([*(nchunks*[self.chunk_size]), rest], desc = f"Calculating {nchunks+1} chunks of {group} data...", leave = True): if chunk ==0: continue X, y = self.create_raw_data(chunk, training) X, y = self.fix_nans(X, y) self._save_to_file(X, y, group)
[docs] def fix_nans(self,X,y): # fixes any nans that remain from create_raw_data problematic = np.unique(np.where(np.isnan(y))[0]) n = len(problematic) while n>0: if n> 0.1*len(X): print(f"Warning: Found many nans for the parameter samples, in total {n} out of {len(X)} samples.") X_replacement, y_replacement = self.create_raw_data(n) X[problematic] = X_replacement y[problematic] = y_replacement problematic = np.unique(np.where(np.isnan(y))[0]) n = len(problematic) return X, y
def _read_file(self,): with h5py.File(self.outfile, "r") as f: self.jet_type = f["jet_type"][()] self.times = f["times"][:] self.nus = f["nus"][:] self.parameter_names = f["parameter_names"][:].astype(str).tolist() self.parameter_distributions = ast.literal_eval(f["parameter_distributions"][()].decode('utf-8')) try: self.n_training_exists = (f["train"]["X"].shape)[0] except KeyError: self.n_training_exists = 0 try: self.n_val_exists = (f["val"]["X"].shape)[0] except KeyError: self.n_val_exists = 0 try: self.n_test_exists = (f["test"]["X"].shape)[0] except KeyError: self.n_test_exists = 0
[docs] def create_raw_data(self, n: int, training: bool = True): """ Create draws X in the parameter space and run the afterglow model on it. """ # Create training data X_raw = np.empty((n, len(self.parameter_names))) if training: for j, key in enumerate(self.parameter_names): a, b, distribution = self.parameter_distributions[key] # FIXME if distribution == "uniform": X_raw[:,j] = np.random.uniform(a, b, size = n) elif distribution == "loguniform": X_raw[:,j] = np.exp(np.random.uniform(np.log(a), np.log(b), size = n)) else: for j, key in enumerate(self.parameter_distributions.keys()): a, b, _ = self.parameter_distributions[key] X_raw[:, j] = np.random.uniform(a, b, size = n) # Ensure that epsilon_e + epsilon_B < 1 epsilon_e_ind = self.parameter_names.index("log10_epsilon_e") epsilon_B_ind = self.parameter_names.index("log10_epsilon_B") epsilon_tot = (10**(X_raw[:, epsilon_e_ind]) + 10**(X_raw[:, epsilon_B_ind])) mask = epsilon_tot>=1 X_raw[mask, epsilon_B_ind] += np.log10(0.99/epsilon_tot[mask]) X_raw[mask, epsilon_e_ind] += np.log10(0.99/epsilon_tot[mask]) # Ensure that thetaWing is smaller than pi/2 if self.jet_type !=-1: alphaw_ind = self.parameter_names.index("alphaWing") thetac_ind = self.parameter_names.index("thetaCore") mask = X_raw[:, alphaw_ind]*X_raw[:, thetac_ind] >= np.pi/2 X_raw[mask, alphaw_ind] = np.pi/2 * 1/X_raw[mask, thetac_ind] X, y = self.run_afterglow_model(X_raw) return X, y
[docs] def create_special_data(self, X_raw, label:str, comment: str = None): """Create special training data with pre-specified parameters X. These will be stored in the 'special_train' hdf5 group.""" # Ensure that epsilon_e + epsilon_B < 1 epsilon_e_ind = self.parameter_names.index("log10_epsilon_e") epsilon_B_ind = self.parameter_names.index("log10_epsilon_B") epsilon_tot = (10**(X_raw[:, epsilon_e_ind]) + 10**(X_raw[:, epsilon_B_ind])) mask = epsilon_tot>=1 X_raw[mask, epsilon_B_ind] += np.log10(0.99/epsilon_tot[mask]) X_raw[mask, epsilon_e_ind] += np.log10(0.99/epsilon_tot[mask]) # Ensure that thetaWing is smaller than pi/2 if self.jet_type !=-1: alphaw_ind = self.parameter_names.index("alphaWing") thetac_ind = self.parameter_names.index("thetaCore") mask = X_raw[:, alphaw_ind]*X_raw[:, thetac_ind] >= np.pi/2 X_raw[mask, alphaw_ind] = np.pi/2 * 1/X_raw[mask, thetac_ind] X, y = self.run_afterglow_model(X_raw) X, y = self.fix_nans(X,y) self._save_to_file(X, y, "special_train", label = label, comment= comment)
[docs] def run_afterglow_model(self, X): raise NotImplementedError
def _save_to_file(self, X, y, group: str, label: str = None, comment: str = None): with h5py.File(self.outfile, "a") as f: if "y" in f[group]: # checks if the dataset already exists Xset = f[group]["X"] Xset.resize(Xset.shape[0]+X.shape[0], axis = 0) Xset[-X.shape[0]:] = X yset = f[group]["y"] yset.resize(yset.shape[0]+y.shape[0], axis = 0) yset[-y.shape[0]:] = y elif label is not None: # or if we have special training data if label in f["special_train"]: Xset = f["special_train"][label]["X"] Xset.resize(Xset.shape[0]+X.shape[0], axis = 0) Xset[-X.shape[0]:] = X yset = f[group][label]["y"] yset.resize(yset.shape[0]+y.shape[0], axis = 0) yset[-y.shape[0]:] = y else: f["special_train"].create_group(label) if comment is not None: f["special_train"][label].attrs["comment"] = comment f["special_train"][label].create_dataset("X", data = X, maxshape=(None, len(self.parameter_names)), chunks = (self.chunk_size, len(self.parameter_names))) f["special_train"][label].create_dataset("y", data = y, maxshape=(None, len(self.nus), len(self.times)), chunks = (self.chunk_size, len(self.times), len(self.nus))) else: # or if we need to create a new data set f[group].create_dataset("X", data = X, maxshape=(None, len(self.parameter_names)), chunks = (self.chunk_size, len(self.parameter_names))) f[group].create_dataset("y", data = y, maxshape=(None, len(self.nus), len(self.times)), chunks = (self.chunk_size, len(self.nus), len(self.times)))
[docs] class AfterglowpyData(AfterglowData): def __init__(self, n_pool: int, *args, **kwargs): self.n_pool = n_pool self.chunk_size = 1000 super().__init__(*args, **kwargs)
[docs] def run_afterglow_model(self, X): """Uses multiprocessing to run afterglowpy on the supplied parameters in X.""" y = np.empty((len(X), len(self.nus), len(self.times))) afgpy = RunAfterglowpy(self.jet_type, self.times, self.nus, X, self.parameter_names, self.fixed_parameters) pool = Pool(processes=self.n_pool) jobs = [pool.apply_async(func=afgpy, args=(argument,)) for argument in range(len(X))] pool.close() for Idx, job in enumerate(tqdm.tqdm(jobs, desc = f"Computing {len(X)} afterglowpy calculations.", leave = False)): try: idx, out = job.get() y[idx] = out except Exception: y[Idx] = np.full(len(self.nus), len(self.times), np.nan) return X, y
[docs] class PyblastafterglowData(AfterglowData): def __init__(self, path_to_exec: str, pbag_kwargs: dict = None, rank: int = 0, *args, **kwargs): self.chunk_size = 10 self.rank = rank self.path_to_exec = path_to_exec self.pbag_kwargs = pbag_kwargs super().__init__(*args, **kwargs)
[docs] def run_afterglow_model(self, X): """Should be run in parallel with different mpi processes to run pyblastafterglow on the parameters in the array X.""" y = np.empty((len(X), len(self.nus), len(self.times))) pbag = RunPyblastafterglow(self.jet_type, self.times, self.nus, X, self.parameter_names, self.fixed_parameters, rank=self.rank, path_to_exec=self.path_to_exec, **self.pbag_kwargs) for j in range(len(X)): try: idx, out = pbag(j) y[idx] = out except Exception: try: # increase blast wave evolution time grid if there is an error old_ntb = pbag.ntb pbag.ntb = 3000 idx, out = pbag(j) y[idx] = out pbag.ntb = old_ntb except Exception: y[j] = np.full(len(self.nus), len(self.times), np.nan) return X, y
[docs] def supplement_time(self,t_supp): """ WARNING: NOT READY TO BE USED""" self.times = t_supp for group in ["train", "val", "test"]: with h5py.File(self.outfile) as f: if "y" not in f[group].keys(): continue if f[group]["y"].shape[1]>f["times"].shape[0] * f["nus"].shape[0]: continue X = f[group]["X"][:] _, y_new = self.run_afterglow_model(X) y_new = y_new.reshape(-1, len(self.nus), len(self.times)) with h5py.File(self.outfile, "r+") as f: y_old = f[group]["y"][:] y_old = y_old.reshape(-1, f["nus"].shape[0], f["times"].shape[0]) y = np.concatenate((y_new, y_old), axis=-1) new_time_shape = len(self.times) + f["times"].shape[0] y = y.reshape(-1, len(self.nus), new_time_shape) del f[group]["y"] f[group].create_dataset("y", data=y, maxshape=(None, len(self.nus), new_time_shape), chunks = (self.chunk_size, len(self.nus), new_time_shape) ) with h5py.File(self.outfile,"r+") as f: t_old = f["times"][:] del f["times"] time = np.concatenate((t_supp, t_old)) f.create_dataset("times", data=time)
[docs] class RunAfterglowpy: def __init__(self, jet_type, times, nus, X, parameter_names, fixed_parameters=None): if fixed_parameters is None: fixed_parameters = {} self.jet_type = jet_type self.times = times self._times_afterglowpy = self.times * days_to_seconds # afterglowpy takes seconds as input self.nus = nus self.X = X self.parameter_names = parameter_names self.fixed_parameters = fixed_parameters def _call_afterglowpy(self, params_dict: dict[str, float]): """ Call afterglowpy to generate a single flux density output, for a given set of parameters. Note that the parameters_dict should contain all the parameters that the model requires, as well as the nu value. The output will be a set of mJys. Args: Float[Array, "n_times"]: The flux density in mJys at the given times. """ # Preprocess the params_dict into the format that afterglowpy expects, which is usually called Z Z = {} Z["jetType"] = params_dict.get("jetType", self.jet_type) Z["specType"] = params_dict.get("specType", 0) Z["z"] = params_dict.get("redshift", 0.0) Z["xi_N"] = params_dict.get("xi_N", 1.0) Z["counterjet"] = True Z["E0"] = 10 ** params_dict["log10_E0"] Z["n0"] = 10 ** params_dict["log10_n0"] Z["p"] = params_dict["p"] Z["epsilon_e"] = 10 ** params_dict["log10_epsilon_e"] Z["epsilon_B"] = 10 ** params_dict["log10_epsilon_B"] Z["d_L"] = 3.086e19 # fix at 10 pc, so that AB magnitude equals absolute magnitude if "inclination_EM" in list(params_dict.keys()): Z["thetaObs"] = params_dict["inclination_EM"] else: Z["thetaObs"] = params_dict["thetaObs"] if self.jet_type == -1: Z["thetaCore"] = params_dict["thetaCore"] elif self.jet_type == 0: Z["thetaCore"] = params_dict["thetaCore"] Z["thetaWing"] = params_dict["thetaCore"]*params_dict["alphaWing"] elif self.jet_type == 4: Z["thetaCore"] = params_dict["thetaCore"] Z["thetaWing"] = params_dict["thetaCore"]*params_dict["alphaWing"] Z["b"] = params_dict["b"] else: raise ValueError(f"Provided jet type {self.jet_type} invalid.") # Afterglowpy returns flux in mJys tt, nunu = np.meshgrid(self._times_afterglowpy, self.nus) mJys = grb.fluxDensity(tt, nunu, **Z) return mJys def __call__(self, idx): param_dict = dict(zip(self.parameter_names, self.X[idx], strict=True)) param_dict.update(self.fixed_parameters) mJys = self._call_afterglowpy(param_dict) return idx, np.log10(mJys)
[docs] class RunPyblastafterglow: def __init__(self, jet_type: int, times, nus, X, parameter_names, fixed_parameters=None, rank = 0, path_to_exec: str="./pba.out", grb_resolution: int=12, ntb: int=1000, tb0: float=1e1, tb1: float=1e11, rtol: float=1e-1, loglevel: str="err", ): jet_conversion = {"-1": "tophat", "0": "gaussian"} self.jet_type = jet_conversion[str(jet_type)] times_seconds = times * days_to_seconds # pyblastafterglow takes seconds as input # preparing the pyblastafterglow string argument for time array is_log_uniform = np.allclose(np.diff(np.log(times_seconds)), np.log(times_seconds[1])-np.log(times_seconds[0]), atol=0.01) if is_log_uniform: log_dt = np.log(times_seconds[1])-np.log(times_seconds[0]) self.lc_times = f'array logspace {times_seconds[0]:e} {np.exp(log_dt)*times_seconds[-1]:e} {len(times_seconds)}' # pyblastafterglow only takes this string format else: raise ValueError("Time array must be loguniform.") # preparing the pyblastafterglow string argument for frequency array log_dnu = np.log(nus[1]/nus[0]) self.lc_freqs = f'array logspace {nus[0]:e} {np.exp(log_dnu)*nus[-1]:e} {len(nus)}' # pyblastafterglow only takes this string format self.X = X self.parameter_names = parameter_names if fixed_parameters is None: fixed_parameters = {} self.fixed_parameters = fixed_parameters self.rank = rank self.path_to_exec = path_to_exec self.grb_resolution = grb_resolution self.ntb = ntb self.tb0 = tb0 self.tb1 = tb1 self.rtol = rtol self.loglevel = loglevel def _call_pyblastafterglow(self, params_dict: dict[str, float]): """ Run pyblastafterglow to generate a single flux density output, for a given set of parameters. Note that the parameters_dict should contain all the parameters that the model requires. The output will be a set of mJys. Args: Float[Array, "n_times"]: The flux density in mJys at the given times. """ try: from PyBlastAfterglowMag.wrappers import run_grb except ImportError: raise ImportError("PyBlastAfterglowMag is not installed. Please install it from source") # Define jet structure (analytic; gaussian) -- 3 free parameters struct = dict( struct= self.jet_type, # type of the structure tophat or gaussian Eiso_c=np.power(10, params_dict["log10_E0"]), # isotropic equivalent energy of the burst Gamma0c=params_dict["Gamma0"], # lorentz factor of the core of the jet M0c=-1., # mass of the ejecta (if -1 -- inferr from Eiso_c and Gamma0c) n_layers_a=self.grb_resolution # resolution of the jet (number of individual blastwaves) ) if self.jet_type == "tophat": struct["theta_c"] = params_dict['thetaCore'] # half-opening angle of the winds of the jet elif self.jet_type == "gaussian": struct["theta_c"] = params_dict['thetaCore'] # half-opening angle of the winds of the jet struct["theta_w"] = params_dict["thetaCore"] * params_dict["alphaWing"] else: raise ValueError(f"Provided jet type {self.jet_type} invalid.") # set model parameters P = dict( # main model parameters; Uniform ISM -- 2 free parameters main=dict( d_l= 3.086e19, # luminocity distance to the source [cm], fix at 10 pc, so that AB magnitude equals absolute magnitude z = params_dict.get("redshift", 0.0), # redshift of the source (used in Doppler shifring and EBL table) n_ism=np.power(10, params_dict["log10_n0"]), # ISM density [cm^-3] (assuming uniform) theta_obs= params_dict["inclination_EM"], # observer angle [rad] (from pol to jet axis) lc_freqs= self.lc_freqs, # frequencies for light curve calculation lc_times= self.lc_times, # times for light curve calculation tb0=self.tb0, tb1=self.tb1, ntb=self.ntb, # burster frame time grid boundary, resolution, for the simulation ), # ejecta parameters; FS only -- 3 free parameters grb=dict( structure=struct, # structure of the ejecta eps_e_fs=np.power(10, params_dict["log10_epsilon_e"]), # microphysics - FS - frac. energy in electrons eps_b_fs=np.power(10, params_dict["log10_epsilon_B"]), # microphysics - FS - frac. energy in magnetic fields p_fs= params_dict["p"], # microphysics - FS - slope of the injection electron spectrum do_lc='yes', # task - compute light curves rtol_theta = self.rtol, method_limit_spread=None, # save_spec='yes' # save comoving spectra # method_synchrotron_fs = 'Joh06', # method_ne_fs = 'usenprime', # method_ele_fs = 'analytic', # method_comp_mode = 'observFlux' ) ) pba_run = run_grb(working_dir= os.getcwd() + f'/tmp_{self.rank}/', # directory to save/load from simulation data P=P, # all parameters run=True, # run code itself (if False, it will try to load results) path_to_cpp=self.path_to_exec, # absolute path to the C++ executable of the code loglevel=self.loglevel, # logging level of the code (info or err) process_skymaps=False # process unstractured sky maps. Only useed if `do_skymap = yes` ) mJys = pba_run.GRB.get_lc() return mJys def __call__(self, idx): param_dict = dict(zip(self.parameter_names, self.X[idx], strict=True)) param_dict.update(self.fixed_parameters) mJys = self._call_pyblastafterglow(param_dict) return idx, np.log10(mJys)
[docs] class BlastwaveData(AfterglowData): def __init__(self, *args, n_pool=None, **kwargs): if n_pool is not None: import warnings warnings.warn("n_pool is ignored; blastwave uses rayon for parallelism", stacklevel=2) self.chunk_size = 100 super().__init__(*args, **kwargs) def _sample_parameters(self, n, training): """Sample parameters and enforce the forward-shock energy constraint.""" X_raw = np.empty((n, len(self.parameter_names))) if training: for j, key in enumerate(self.parameter_names): a, b, distribution = self.parameter_distributions[key] if distribution == "uniform": X_raw[:, j] = np.random.uniform(a, b, size=n) elif distribution == "loguniform": X_raw[:, j] = np.exp(np.random.uniform(np.log(a), np.log(b), size=n)) else: for j, key in enumerate(self.parameter_distributions.keys()): a, b, _ = self.parameter_distributions[key] X_raw[:, j] = np.random.uniform(a, b, size=n) # Ensure that epsilon_e + epsilon_B < 1 epsilon_e_ind = self.parameter_names.index("log10_epsilon_e") epsilon_B_ind = self.parameter_names.index("log10_epsilon_B") epsilon_tot = (10**(X_raw[:, epsilon_e_ind]) + 10**(X_raw[:, epsilon_B_ind])) mask = epsilon_tot >= 1 X_raw[mask, epsilon_B_ind] += np.log10(0.99 / epsilon_tot[mask]) X_raw[mask, epsilon_e_ind] += np.log10(0.99 / epsilon_tot[mask]) return X_raw
[docs] def create_raw_data(self, n: int, training: bool = True): """Create draws X in the parameter space and run the blastwave model on it.""" X_raw = self._sample_parameters(n, training) X, y = self.run_afterglow_model(X_raw) return X, y
[docs] def run_afterglow_model(self, X): """Run blastwave model on the supplied parameters in X. No multiprocessing.Pool needed — the Rust extension uses rayon for automatic parallelism within each FluxDensity call (releases the GIL via py.allow_threads). """ y = np.empty((len(X), len(self.nus), len(self.times))) runner = RunBlastwave(self.times, self.nus, X, self.parameter_names, self.fixed_parameters) for idx in tqdm.tqdm(range(len(X)), desc=f"Computing {len(X)} blastwave calculations.", leave=False): try: i, out = runner(idx) y[i] = out except Exception as e: print(f"[WARNING] Blastwave call failed for sample {idx}: {e}") y[idx] = np.full((len(self.nus), len(self.times)), np.nan) return X, y
[docs] class RunBlastwave: def __init__(self, times, nus, X, parameter_names, fixed_parameters=None, ncells=33, spread_mode="ode"): if fixed_parameters is None: fixed_parameters = {} self.times = np.array(times) self._times_seconds = self.times * days_to_seconds self.nus = np.array(nus) self.X = np.array(X) self.parameter_names = list(parameter_names) self.fixed_parameters = dict(fixed_parameters) self.ncells = ncells if spread_mode not in ("ode", "pde", "none"): raise ValueError(f"Invalid spread_mode '{spread_mode}'. Must be 'ode', 'pde', or 'none'.") self.spread_mode = spread_mode # Pre-compute the meshgrid once tt, nunu = np.meshgrid(self._times_seconds, self.nus) self._tt_flat = tt.flatten() self._nunu_flat = nunu.flatten() self._tmax = float(np.max(self._times_seconds)) * 2 try: import blastwave as _bw self._bw = _bw except ImportError as err: raise ImportError( "blastwave is not installed. Install it from " "https://github.com/nuclear-multimessenger-astronomy/blastwave" ) from err def _call_blastwave(self, params_dict): """ Call blastwave model to generate a single flux density output. Uses the Jet API directly with tuned settings for data generation. Returns flux density in mJy as a 2D array (n_nu, n_times). """ bw = self._bw theta_c = params_dict["thetaCore"] Eiso = 10 ** params_dict["log10_E0"] lf = 10 ** params_dict["log10_lf"] n0 = 10 ** params_dict["log10_n0"] p = params_dict["p"] eps_e = 10 ** params_dict["log10_epsilon_e"] eps_b = 10 ** params_dict["log10_epsilon_B"] theta_v = params_dict["inclination_EM"] P = dict( Eiso=Eiso, lf=lf, theta_c=theta_c, n0=n0, A=0, eps_e=eps_e, eps_b=eps_b, p=p, theta_v=theta_v, d=9.99e-6, z=0.0, ) # Solve dynamics with reduced cell count and ODE spreading spread_kwargs = {} if self.spread_mode == "ode": spread_kwargs["spread_mode"] = "ode" elif self.spread_mode == "pde": spread_kwargs["spread"] = True else: spread_kwargs["spread"] = False jet = bw.Jet( bw.Gaussian(theta_c, Eiso, lf0=lf), 0.0, n0, tmin=10.0, tmax=self._tmax, grid=bw.ForwardJetRes(theta_c, self.ncells), tail=True, cal_level=1, rtol=1e-6, eps_e=eps_e, eps_b=eps_b, p_fwd=p, **spread_kwargs, ) mJys = jet.FluxDensity( self._tt_flat, self._nunu_flat, P, force_return=True, flux_method="forward", ) mJys = mJys.reshape(len(self.nus), len(self._times_seconds)) return mJys def __call__(self, idx): param_dict = dict(zip(self.parameter_names, self.X[idx], strict=True)) param_dict.update(self.fixed_parameters) mJys = self._call_blastwave(param_dict) mJys = np.clip(mJys, 1e-300, None) return idx, np.log10(mJys)
[docs] class BlastwaveRSData(BlastwaveData): """BlastwaveData variant with reverse shock enabled. Following Japelj+ 2014 (1402.3701), the RS microphysics are tied to the FS values via a magnetization ratio RB:: eps_e_rs = eps_e_f eps_b_rs = RB * eps_b_f p_rs = p_f Extra sampled parameter: log10_RB, log10_duration. sigma is kept fixed at 0.0 (unmagnetized ejecta). """
[docs] def create_raw_data(self, n: int, training: bool = True): """Sample parameters, enforce FS and RS energy constraints, then run model.""" X_raw = self._sample_parameters(n, training) # Ensure that eps_b_rs = RB * eps_b_f < 1 and eps_e + eps_b_rs < 1 (reverse shock) epsilon_e_ind = self.parameter_names.index("log10_epsilon_e") epsilon_B_ind = self.parameter_names.index("log10_epsilon_B") RB_ind = self.parameter_names.index("log10_RB") eps_e = 10**(X_raw[:, epsilon_e_ind]) eps_b_f = 10**(X_raw[:, epsilon_B_ind]) # Cap RB so that RB * eps_b_f < min(1 - eps_e, 0.99) RB_max = np.clip((0.99 - eps_e) / eps_b_f, 1.0, None) log10_RB_max = np.log10(RB_max) mask_rs = X_raw[:, RB_ind] > log10_RB_max X_raw[mask_rs, RB_ind] = log10_RB_max[mask_rs] X, y = self.run_afterglow_model(X_raw) return X, y
[docs] def run_afterglow_model(self, X): y = np.empty((len(X), len(self.nus), len(self.times))) runner = RunBlastwaveRS(self.times, self.nus, X, self.parameter_names, self.fixed_parameters) for idx in tqdm.tqdm(range(len(X)), desc=f"Computing {len(X)} blastwave+RS calculations.", leave=False): try: i, out = runner(idx) y[i] = out except Exception as e: print(f"[WARNING] Blastwave+RS call failed for sample {idx}: {e}") y[idx] = np.full((len(self.nus), len(self.times)), np.nan) return X, y
[docs] class RunBlastwaveRS: """Like RunBlastwave but with reverse shock enabled. Following Japelj+ 2014: RS microphysics derived from FS values via magnetization ratio RB = eps_b_rs / eps_b_f, with eps_e_rs = eps_e_f and p_rs = p_f. """ def __init__(self, times, nus, X, parameter_names, fixed_parameters=None, ncells=33, spread_mode="ode"): if fixed_parameters is None: fixed_parameters = {} self.times = np.array(times) self._times_seconds = self.times * days_to_seconds self.nus = np.array(nus) self.X = np.array(X) self.parameter_names = list(parameter_names) self.fixed_parameters = dict(fixed_parameters) self.ncells = ncells if spread_mode not in ("ode", "pde", "none"): raise ValueError(f"Invalid spread_mode '{spread_mode}'. Must be 'ode', 'pde', or 'none'.") self.spread_mode = spread_mode tt, nunu = np.meshgrid(self._times_seconds, self.nus) self._tt_flat = tt.flatten() self._nunu_flat = nunu.flatten() self._tmax = float(np.max(self._times_seconds)) * 2 try: import blastwave as _bw self._bw = _bw except ImportError as err: raise ImportError( "blastwave is not installed. Install it from " "https://github.com/nuclear-multimessenger-astronomy/blastwave" ) from err def _call_blastwave(self, params_dict): bw = self._bw theta_c = params_dict["thetaCore"] Eiso = 10 ** params_dict["log10_E0"] lf = 10 ** params_dict["log10_lf"] n0 = 10 ** params_dict["log10_n0"] p = params_dict["p"] eps_e = 10 ** params_dict["log10_epsilon_e"] eps_b = 10 ** params_dict["log10_epsilon_B"] theta_v = params_dict["inclination_EM"] # Reverse shock parameters derived from FS (Japelj+ 2014) RB = 10 ** params_dict["log10_RB"] eps_e_rs = eps_e # same as forward shock eps_b_rs = RB * eps_b # magnetization ratio p_rs = p # same as forward shock duration = 10 ** params_dict["log10_duration"] sigma = params_dict.get("sigma", 0.0) P = dict( Eiso=Eiso, lf=lf, theta_c=theta_c, n0=n0, A=0, eps_e=eps_e, eps_b=eps_b, p=p, theta_v=theta_v, d=9.99e-6, z=0.0, ) spread_kwargs = {} if self.spread_mode == "ode": spread_kwargs["spread_mode"] = "ode" elif self.spread_mode == "pde": spread_kwargs["spread"] = True else: spread_kwargs["spread"] = False jet = bw.Jet( bw.Gaussian(theta_c, Eiso, lf0=lf), 0.0, n0, tmin=10.0, tmax=self._tmax, grid=bw.ForwardJetRes(theta_c, self.ncells), tail=True, cal_level=1, rtol=1e-6, eps_e=eps_e, eps_b=eps_b, p_fwd=p, include_reverse_shock=True, sigma=sigma, eps_e_rs=eps_e_rs, eps_b_rs=eps_b_rs, p_rs=p_rs, duration=duration, **spread_kwargs, ) mJys = jet.FluxDensity( self._tt_flat, self._nunu_flat, P, force_return=True, ) mJys = mJys.reshape(len(self.nus), len(self._times_seconds)) return mJys def __call__(self, idx): param_dict = dict(zip(self.parameter_names, self.X[idx], strict=True)) param_dict.update(self.fixed_parameters) mJys = self._call_blastwave(param_dict) mJys = np.clip(mJys, 1e-300, None) return idx, np.log10(mJys)
# Backward-compatibility aliases JetsimpyData = BlastwaveData RunJetsimpy = RunBlastwave