# %%
__author__ = "Sarah Shi"
import os
import glob
import pickle
import warnings
import numpy as np
import pandas as pd
import mc3
import matplotlib as mpl
from matplotlib import pyplot as plt
import mc3.plots as mp
import mc3.stats as ms
import mc3.utils as mu
from pykrige import OrdinaryKriging
import scipy.signal as signal
from scipy.linalg import solveh_banded
import scipy.interpolate as interpolate
# %% Core functions
[docs]
class SampleDataLoader:
"""
A class for efficiently loading and managing spectral data and associated
metadata from CSV files. This includes spectral data from multiple samples,
as well as related glass chemistry and thickness information. The class
supports loading spectral data within a specified wavenumber range from a
directory containing CSV files and loading chemistry and thickness data
from a CSV file.
Attributes:
spectrum_path (list[str]): The default list of paths where the spectral
data CSV files are located.
chemistry_thickness_path (str): The default file path to the CSV file
containing glass chemistry and thickness data.
Methods:
load_spectrum_directory(paths, wn_high, wn_low): Loads
spectral data from CSV files within a specified wavenumber
range, ensuring that the wavenumbers are in ascending order and
skipping headers if present.
load_chemistry_thickness(chemistry_thickness_path): Loads glass
chemistry and thickness data from a CSV file, setting the "Sample"
column as the index.
load_all_data(paths, chemistry_thickness_path, wn_high, wn_low):
Loads both spectral data from CSV files and chemistry thickness
data from a CSV file.
"""
def __init__(self,
spectrum_path=None,
chemistry_thickness_path=None):
"""
Initializes a SampleDataLoader object with optional default paths for
the directory containing spectra and the chemistry thickness CSV file.
"""
self.spectrum_path = spectrum_path
self.chemistry_thickness_path = chemistry_thickness_path
[docs]
def load_spectrum_directory(self, wn_high=5500, wn_low=1000):
if self.spectrum_path is None:
raise ValueError("Spectrum path is not provided.")
paths = sorted(glob.glob(os.path.join(self.spectrum_path, "*")))
dfs = []
files = []
for path in paths:
file = os.path.split(path)[1][0:-4]
try:
df = pd.read_csv(
path,
names=["Wavenumber", "Absorbance"],
dtype={"Wavenumber": float},
)
except ValueError:
df = pd.read_csv(
path,
skiprows=1,
names=["Wavenumber", "Absorbance"],
dtype={"Wavenumber": float},
)
if df["Wavenumber"].iloc[0] > df["Wavenumber"].iloc[-1]:
df = df.iloc[::-1]
if wn_low == 1000 and wn_high == 5500:
if not (df["Wavenumber"].min() <= wn_low and
df["Wavenumber"].max() >= wn_high):
warnings.warn(f"{file} data do not span the required "
f"wavenumbers of 1000-5500 cm^-1. "
f"Available range: "
f"{df['Wavenumber'].min():.2f}-"
f"{df['Wavenumber'].max():.2f} cm^-1.",
UserWarning,
stacklevel=2)
df.set_index("Wavenumber", inplace=True)
spectrum = df.loc[wn_low:wn_high]
dfs.append(spectrum)
files.append(file)
self.dfs_dict = dict(zip(files, dfs))
return self.dfs_dict
[docs]
def load_chemistry_thickness(self):
if (self.chemistry_thickness_path is None or
not os.path.exists(self.chemistry_thickness_path)):
raise ValueError("Chemistry thickness path not provided or "
"does not exist.")
chem_thickness = pd.read_csv(self.chemistry_thickness_path)
chem_thickness.set_index("Sample", inplace=True)
oxides = [
"SiO2",
"TiO2",
"Al2O3",
"Fe2O3",
"FeO",
"MnO",
"MgO",
"CaO",
"Na2O",
"K2O",
"P2O5",
]
thicknesses = [
"Thickness",
"Sigma_Thickness"
]
missing_columns = [col for col in oxides+thicknesses if col
not in chem_thickness.columns]
if missing_columns:
warnings.warn(f"Missing required columns: {missing_columns}. "
f"Column created and filled with NaN. "
f"Please correct the values.",
UserWarning,
stacklevel=2)
for col in missing_columns:
chem_thickness[col] = np.nan
chemistry = chem_thickness.loc[
:,
oxides,
].fillna(0)
thickness = chem_thickness.loc[:, thicknesses]
self.chemistry = chemistry
self.thickness = thickness
return self.chemistry, self.thickness
[docs]
def load_all_data(self, wn_high=5500, wn_low=1000):
if self.spectrum_path is None:
raise ValueError("Spectrum path is not provided.")
if self.chemistry_thickness_path is None:
raise ValueError("Chemistry thickness path is not provided.")
dfs_dict = self.load_spectrum_directory(wn_high=wn_high,
wn_low=wn_low)
chemistry, thickness = self.load_chemistry_thickness()
self.dfs_dict = dfs_dict
self.chemistry = chemistry
self.thickness = thickness
return (
self.dfs_dict,
self.chemistry,
self.thickness,
)
[docs]
class VectorLoader:
"""
A class for loading spectral data vectors, including principal components
and wavenumbers, from specified NPZ files. It preloads vectors from default
NPZ files upon instantiation.
Attributes:
base_path (str): The base directory path where NPZ files are located.
Baseline_PC_path (str): The path to the NPZ file containing the
baseline principal components.
H2Om_PC_path (str): The path to the NPZ file containing the H2Om
principal components.
wn_high (int): The upper limit of the wavenumber range to be
considered.
wn_low (int): The lower limit of the wavenumber range to be
considered.
Wavenumber (np.ndarray): The array of wavenumbers loaded from the
baseline NPZ file.
Baseline_PC (np.ndarray): The matrix of baseline principal components.
H2Om_PC (np.ndarray): The matrix of H2O-modified principal components.
Methods:
load_PC(file_name): Loads baseline principal components from NPZ.
load_wavenumber(file_name): Loads wavenumbers from NPZ.
"""
def __init__(self):
self.base_path = os.path.dirname(__file__)
self.baseline_PC_path = "BaselineAvgPC.npz"
self.H2Om_PC_path = "H2Om1635PC.npz"
self.wn_high = 2400
self.wn_low = 1250
# Load default files
self.wavenumber = self.load_wavenumber(self.baseline_PC_path)
self.baseline_PC = self.load_PC(self.baseline_PC_path)
self.H2Om_PC = self.load_PC(self.H2Om_PC_path)
[docs]
def load_PC(self, file_name):
file_path = os.path.join(self.base_path, file_name)
npz = np.load(file_path)
PC_DF = pd.DataFrame(npz["data"], columns=npz["columns"])
PC_DF = PC_DF.set_index("Wavenumber")
PC_DF = PC_DF.loc[self.wn_low: self.wn_high]
PC_matrix = PC_DF.to_numpy()
return PC_matrix
[docs]
def load_wavenumber(self, file_name):
file_path = os.path.join(self.base_path, file_name)
npz = np.load(file_path)
wavenumber_DF = pd.DataFrame(npz["data"], columns=npz["columns"])
wavenumber_DF = wavenumber_DF.set_index("Wavenumber")
wavenumber_DF = wavenumber_DF.loc[self.wn_low: self.wn_high]
wavenumber = np.array(wavenumber_DF.index)
return wavenumber
[docs]
def gauss(x, mu, sd, A=1):
"""
Return a Gaussian fit for the CO_3^{2-} doublet peaks at 1515 and
1430 cm^-1 peaks.
Parameters:
x (numeric): The wavenumbers of interest.
mu (numeric): The center of the peak.
sd (numeric): The standard deviation (or width of peak).
A (numeric, optional): The amplitude. Defaults to 1.
Returns:
G (np.ndarray): The Gaussian fit.
"""
G = A * np.exp(-((x - mu) ** 2) / (2 * sd**2))
return G
[docs]
def linear(x, m, b):
"""
Calculate a linear offset for adjusting model data.
Parameters:
x (np.ndarray): Input values.
m (float): Tilt of the linear offset.
b (float): Offset of the linear offset.
Returns:
tilt_offset (np.ndarray): Linear offset.
"""
tilt_offset = m * np.arange(0, x.size) + b
return tilt_offset
[docs]
def carbonate(P, x, PCmatrix, H2Om1635_PCmatrix, Nvectors=5):
"""
The carbonate function takes in the inputs of fitting parameters P,
wavenumbers x, principal component matrix, number of principal
component vectors of interest. The function calculates the
H2Om_{1635} peak, CO_3^{2-} Gaussian peaks, linear
offset, and baseline. The function then returns the model data.
Parameters:
P (np.ndarray): Fitting parameters including principal component
and peak weights, Gaussian parameters, linear offset slope
and intercept.
x (np.ndarray): Wavenumbers of interest.
PCmatrix (matrix): Principal components matrix.
H2Om1635_PCmatrix (matrix): Principal components matrix for the
1635 peak.
Nvectors (int, optional): Number of principal components vectors
of interest. Default is 5.
Returns:
model_data (np.ndarray): Model data for the carbonate spectra.
"""
PC_Weights = np.array([P[0:Nvectors]])
Peak_Weights = np.array([P[-5:-2]])
(
peak_G1430,
std_G1430,
G1430_amplitude,
peak_G1515,
std_G1515,
G1515_amplitude,
) = P[Nvectors:-5]
m, b = P[-2:None]
Peak_1635 = Peak_Weights @ H2Om1635_PCmatrix.T
G1515 = gauss(x, peak_G1515, std_G1515, A=G1515_amplitude)
G1430 = gauss(x, peak_G1430, std_G1430, A=G1430_amplitude)
linear_offset = linear(x, m, b)
baseline = PC_Weights @ PCmatrix.T
model_data = baseline + linear_offset + Peak_1635 + G1515 + G1430
model_data = np.array(model_data)[0, :]
return model_data
[docs]
def als_baseline(intensities, asymmetry_param=0.05, smoothness_param=5e5,
max_iters=15, conv_thresh=1e-5, verbose=False):
"""
Computes the asymmetric least squares baseline.
http://www.science.uva.nl/~hboelens/publications/draftpub/Eilers_2005.pdf
Parameters:
intensities (array-like): Input signal to baseline.
asymmetry_param (float, optional): Asymmetry parameter (p) that
controls the weights. Defaults to 0.05.
smoothness_param (float, optional): Smoothness parameter (s) that
controls the smoothness of the baseline.
max_iters (int, optional): Maximum number of iterations.
conv_thresh (float, optional): Convergence threshold. Iteration
stops when the change in the weights < threshold.
verbose (bool, optional): If True, prints the iteration number and
convergence at each iteration.
Returns:
z (np.ndarray): Baseline of the input signal.
"""
smoother = WhittakerSmoother(intensities, smoothness_param, deriv_order=2)
# Rename p to be concise.
p = asymmetry_param
# Initialize weights.
w = np.ones(intensities.shape[0])
for i in range(max_iters):
z = smoother.smooth(w)
mask = intensities > z
new_w = p * mask + (1 - p) * (~mask)
conv = np.linalg.norm(new_w - w)
if verbose:
print(i + 1, conv)
if conv < conv_thresh:
break
w = new_w
else:
print("ALS did not converge in %d iterations" % max_iters)
return z
[docs]
class WhittakerSmoother(object):
"""
Implements the Whittaker smoother for smoothing a signal.
http://www.science.uva.nl/~hboelens/publications/draftpub/Eilers_2005.pdf
Parameters:
signal (array-like): Input signal to be smoothed.
smoothness_param (float): Relative importance of smoothness of the
predicted response.
deriv_order (int, optional): Order of the derivative of the identity
matrix.
Attributes:
y (array-like): Input signal to be smoothed.
upper_bands (array-like): Upper triangular bands of the matrix used
for smoothing.
"""
def __init__(self, signal, smoothness_param, deriv_order=1):
self.y = signal
assert deriv_order > 0, "deriv_order must be an int > 0"
# Compute the fixed derivative of identity (D).
d = np.zeros(deriv_order * 2 + 1, dtype=int)
d[deriv_order] = 1
d = np.diff(d, n=deriv_order)
n = self.y.shape[0]
k = len(d)
s = float(smoothness_param)
# Here be dragons: essentially we are faking a big banded matrix D,
# doing s * D.T.dot(D) with it, then taking the upper triangular bands.
diag_sums = np.vstack([
np.pad(s * np.cumsum(d[-i:] * d[:i]), ((k - i, 0),), "constant")
for i in range(1, k + 1)])
upper_bands = np.tile(diag_sums[:, -1:], n)
upper_bands[:, :k] = diag_sums
for i, ds in enumerate(diag_sums):
upper_bands[i, -i - 1:] = ds[::-1][: i + 1]
self.upper_bands = upper_bands
def smooth(self, w):
foo = self.upper_bands.copy()
foo[-1] += w # last row is the diagonal
return solveh_banded(foo, w * self.y, overwrite_ab=True,
overwrite_b=True)
[docs]
def NIR_process(data, wn_low, wn_high, peak):
"""
The NIR_process function processes Near-IR absorbance data and returns
relevant information, such as the fit absorbance data, kriged data, peak
height, and signal to noise ratio. The function first filters the data,
then fits a baseline and subtracts it to determine the peak absorbance.
Next, the data are kriged to further reduce noise and obtain peak height.
Finally, the signal to noise ratio is calculated, and a warning is issued
if the ratio is high. The function is used three times with different
H2Om wavenumber ranges for uncertainty assessment.
Parameters:
data (pd.DataFrame): A DataFrame of absorbance data.
wn_low (int): The lower bound wavenumber for NIR H2Om or OH.
wn_high (int): The higher bound wavenumber for NIR H2Om or OH.
peak (str): The H2Om or OH peak of interest.
Returns:
peak_fit (pd.DataFrame): A DataFrame of the absorbance data in
the region of interest, median filtered data, baseline
subtracted absorbance, and the subtracted peak.
krige_out (pd.DataFrame): A DataFrame of kriged data output.
PH_krige (float): The peak height obtained after kriging.
STN (float): The signal to noise ratio.
"""
data_H2O = data.loc[wn_low:wn_high]
data_output = pd.DataFrame(
columns=["Absorbance", "Absorbance_Filt",
"Baseline_NIR", "Peak_Subtract"],
index=data_H2O.index,
)
data_output["Absorbance"] = data_H2O
data_output["Absorbance_Filt"] = signal.medfilt(data_H2O["Absorbance"], 5)
data_output["Baseline_NIR"] = als_baseline(
data_output["Absorbance_Filt"],
asymmetry_param=0.001,
smoothness_param=1e9,
max_iters=20,
conv_thresh=1e-5,
)
data_output["Peak_Subtract"] = (
data_output["Absorbance_Filt"] - data_output["Baseline_NIR"]
)
krige_wn_range = np.linspace(wn_low - 5, wn_high + 5,
wn_high - wn_low + 11)
krige_peak = OrdinaryKriging(
data_H2O.index,
np.zeros(data_output["Peak_Subtract"].shape),
data_output["Peak_Subtract"],
variogram_model="gaussian",
)
krige_abs, _ = krige_peak.execute("grid", krige_wn_range,
np.array([0.0]))
krige_out = pd.DataFrame(
{
"Absorbance": krige_abs.squeeze(),
},
index=krige_wn_range,
)
if peak == "OH": # 4500 peak
pr_low, pr_high = 4400, 4600
elif peak == "H2Om": # 5200 peak
pr_low, pr_high = 5100, 5300
else:
raise ValueError(f"Invalid peak type: {peak}")
PH_max = data_output["Peak_Subtract"].loc[pr_low:pr_high].max()
PH_krige = (
krige_out["Absorbance"].loc[pr_low:pr_high].max()
- krige_out["Absorbance"].min()
)
PH_krige_index = int(
data_output["Peak_Subtract"][
data_output["Peak_Subtract"] == PH_max
].index.to_numpy()[0]
)
PH_std = (
data_output["Peak_Subtract"]
.loc[PH_krige_index - 50: PH_krige_index + 50]
.std()
)
STN = PH_krige / PH_std
return data_output, krige_out, PH_krige, STN
[docs]
def MIR_process(data, wn_low, wn_high):
"""
The MIR_process function processes the Mid-IR H2Ot, 3550 peak and
returns the fit absorbance data, kriged data, peak height. The
function first filters the data, then fits a baseline and subtracts
it to determine the peak absorbance. Next, the data are kriged to
further reduce noise and obtain peak height. The function is used
three times with different H2Ot wavenumber ranges for uncertainty
assessment.
Parameters:
data (pd.DataFrame): A DataFrame of absorbance data.
wn_low (int): The lower bound wavenumber for MIR H2Ot, 3550.
wn_high (int): The higher bound wavenumber for MIR H2Ot, 3550.
Returns:
data_output (pd.DataFrame): A DataFrame of absorbance data,
median filtered data, baseline subtracted absorbance,
and the subtracted peak.
krige_out (pd.DataFrame): A DataFrame of kriged data output.
PH_krige (float): The peak height obtained after kriging.
"""
data_H2Ot_3550 = data.loc[wn_low:wn_high]
data_output = pd.DataFrame(
columns=["Absorbance", "Baseline_MIR",
"Peak_Subtract", "Peak_Subtract_Filt"],
index=data_H2Ot_3550.index,
)
data_output["Absorbance"] = data_H2Ot_3550["Absorbance"]
data_output["Baseline_MIR"] = als_baseline(
data_H2Ot_3550["Absorbance"],
asymmetry_param=0.001,
smoothness_param=1e11,
max_iters=20,
conv_thresh=1e-7,
)
data_output["Peak_Subtract"] = (
data_H2Ot_3550["Absorbance"] - data_output["Baseline_MIR"]
)
data_output["Peak_Subtract_Filt"] = signal.medfilt(
data_output["Peak_Subtract"], 21)
peak_wn_low, peak_wn_high = 3300, 3600
plot_output = data_output.loc[peak_wn_low:peak_wn_high]
PH_3550 = np.max(plot_output["Peak_Subtract_Filt"])
plotindex = np.argmax(plot_output["Absorbance"].index.to_numpy() > 3400)
return data_output, plot_output, PH_3550, plotindex
[docs]
def MCMC(data, uncert, indparams, log, savefile):
"""
Runs Monte Carlo-Markov Chain and outputs the best fit parameters and
standard deviations.
Parameters:
data (np.ndarray): 1D array of data points.
uncert (np.ndarray): 1D array of uncertainties on the data.
indparams (np.ndarray): 2D array of independent parameters. Each row
corresponds to one data point and each column corresponds to a
different independent parameter.
log (bool, optional): Flag indicating whether to log the output or not.
Defaults to False.
savefile (str, optional): File name to save output to. If None, output
is not saved. Defaults to None.
Returns:
mc3_output (mc3.output): The output of the Monte Carlo-Markov Chain.
"""
# Define initial values, limits, and step sizes for parameters
func = carbonate
params = np.array([1.25, 2.00, 0.25, 0.005, 0.001, 1430, 30.0, 0.01,
1510, 30.0, 0.01, 0.100, 0.020, 0.01, 5e-4, 0.50])
pmin = np.array([0.00, -3.000, -2.00, -0.600, -0.300, 1415, 22.5, 0.00,
1500, 22.50, 0.00, 0.000, -2.000, -2.00, -5e-1, -1.00])
pmax = np.array([4.00, 3.00, 2.00, 0.600, 0.300, 1445, 40.0, 3.00,
1535, 40.0, 3.00, 3.000, 2.000, 2.00, 5e-1, 3.00])
pstep = np.abs(pmin - pmax) * 0.01
# Define prior limits for parameters
prior = np.array([0.00, 0.00, 0.00, 0.00, 0.00, 0.0, 30.0, 0.0, 0.0,
30.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
priorlow = np.array([0.00, 0.00, 0.00, 0.00, 0.00, 0.0, 5.00, 0.0, 0.0,
5.00, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
priorup = np.array([0.00, 0.00, 0.00, 0.00, 0.00, 0.0, 5.00, 0.0, 0.0,
5.00, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
pnames = ["B_mean", "B_PC1", "B_PC2", "B_PC3", "B_PC4",
"G1430_peak", "G1430_std", "G1430_amp",
"G1515_peak", "G1515_std", "G1515_amp",
"H1635_mean", "H1635_PC1", "H1635_PC2", "m", "b"]
texnames = ["$\\overline{B}$", "$\\overline{B}_{PC1}$",
"$\\overline{B}_{PC2}$", "$\\overline{B}_{PC3}$",
"$\\overline{B}_{PC4}$",
"$\\mu_{1430}$", "$\\sigma_{1430}$", "$a_{1430}$",
"$\\mu_{1515}$", "$\\sigma_{1515}$", "$a_{1515}$",
"$\\overline{H_{1635}}$",
"$\\overline{H_{1635}}_{PC1}$",
"$\\overline{H_{1635}}_{PC2}$", "$m$", "$b$"]
mc3_output = mc3.sample(data=data, uncert=uncert, func=func, params=params,
indparams=indparams, pmin=pmin, pmax=pmax,
pstep=pstep, prior=prior, priorlow=priorlow,
priorup=priorup, pnames=pnames, texnames=texnames,
sampler="snooker", rms=False, nsamples=1e6,
nchains=9, ncpu=4, burnin=2e4, thinning=5,
leastsq="trf", chisqscale=False, grtest=True,
grbreak=1.01, grnmin=0.5, hsize=10,
kickoff="normal", wlike=False, plots=False,
log=log, savefile=savefile)
return mc3_output
[docs]
def create_output_dirs(base_path, export_path):
"""
Create output directories for storing resulting files.
Parameters:
base_path (str): The base path for directories.
export_path (str): The directory for exported files.
Returns:
dict: A dictionary with directory names as keys and paths as values.
"""
if isinstance(export_path, list):
if len(export_path) == 0:
warnings.warn("export_path list is empty; provide a valid path.", UserWarning)
return {}
elif len(export_path) == 1:
export_path = export_path[0].strip(" /").rstrip('/')
else:
warnings.warn(f"Please provide a single directory path as a "
f"string or a list containing one item.",
UserWarning,
stacklevel=2)
return {}
elif isinstance(export_path, str):
export_path = export_path.strip(" /").rstrip('/')
else:
warnings.warn(f"The provided 'export_path' should be a string or a "
f"list containing one item.",
UserWarning,
stacklevel=2)
return {}
output_dirs = [
"FIGURES",
"PLOTFILES",
"NPZTXTFILES",
"LOGFILES",
"BLPEAKFILES",
"PKLFILES",
"FINALDATA",
]
add_dirs = [
"TRACE",
"HISTOGRAM",
"PAIRWISE",
"MODELFIT",
]
paths = {}
for dir_name in output_dirs:
if dir_name == "FINALDATA":
full_path = os.path.join(base_path, dir_name)
else:
full_path = os.path.join(base_path, dir_name, export_path)
os.makedirs(full_path, exist_ok=True)
paths[dir_name] = full_path
plotfile_path = paths["PLOTFILES"]
for add_dir in add_dirs:
os.makedirs(os.path.join(plotfile_path, add_dir), exist_ok=True)
return paths
[docs]
def calculate_baselines(dfs_dict, export_path, ignore_NIR=False):
"""
The calculate_baselines function processes a collection of spectral data
to fit baselines to all H2O and CO2 related peaks in basaltic-andesitic
spectra. The function inputs the dictionary of DataFrames that were
created by the SampleDataLoader class and determines best-fit (and
standard deviations) baselines, peak heights, peak locations, peak
widths, and principal component vectors used to fit the spectra. These
values are exported in a csv file and figures are created for each
individual sample. Optionally, it generates visual representations of
the spectra and analysis outcomes, alongside saving detailed analysis
results in various file formats for further review.
Parameters:
dfs_dict (dict): A dictionary where keys are file identifiers and
values are DataFrames containing the spectral data for each
sample. The spectral data is expected to have columns for
wavenumbers and absorbance values.
export_path (str, None): The directory path where the output files
(CSVs, figures, logs, etc.) should be saved. If None, no
files will be saved.
ignore_NIR (bool, optional): If True, skips the Near-IR peaks
Returns:
Volatile_PH (pd.DataFrame): A DataFrame of absorbance data,
median filtered data, baseline subtracted absorbance,
and the subtracted peak.
failures (list): A list of file identifiers for which the analysis
failed, possibly due to data issues or processing errors.
"""
# Load files with PC vectors for the baseline and H2Om, 1635 peak.
vector_loader = VectorLoader()
wavenumber = vector_loader.wavenumber
PCmatrix = vector_loader.baseline_PC
H2Om1635_PCmatrix = vector_loader.H2Om_PC
Nvectors = 5
indparams = [wavenumber, PCmatrix, H2Om1635_PCmatrix, Nvectors]
if export_path is not None:
paths = create_output_dirs(os.getcwd(), export_path)
full_path = paths["FINALDATA"]
file_name = f"{export_path}_DF.csv"
else:
full_path = os.path.join(os.getcwd(), "FINALDATA")
file_name = "DF.csv"
os.makedirs(full_path, exist_ok=True)
# Create DataFrames to store peak height data:
# P_ = peak_, _BP = best parameter, #_STD = _stdev
H2O_3550_PH = pd.DataFrame(columns=["PH_3550_M", "PH_3550_STD",
"H2Ot_3550_MAX",
"BL_H2Ot_3550_MAX",
"H2Ot_3550_SAT"])
DF_Output = pd.DataFrame(columns=["PH_1635_BP", "PH_1635_STD",
"PH_1515_BP", "PH_1515_STD",
"P_1515_BP", "P_1515_STD",
"STD_1515_BP", "STD_1515_STD",
"PH_1430_BP", "PH_1430_STD",
"P_1430_BP", "P_1430_STD",
"STD_1430_BP", "STD_1430_STD"])
PC_Output = pd.DataFrame(columns=["AVG_BL_BP", "AVG_BL_STD",
"PC1_BP", "PC1_STD",
"PC2_BP", "PC2_STD",
"PC3_BP", "PC3_STD",
"PC4_BP", "PC4_STD",
"m_BP", "m_STD", "b_BP", "b_STD",
"PH_1635_PC1_BP", "PH_1635_PC1_STD",
"PH_1635_PC2_BP", "PH_1635_PC2_STD"])
NEAR_IR_PH = pd.DataFrame(columns=["PH_5200_M", "PH_5200_STD",
"PH_4500_M", "PH_4500_STD",
"STN_P5200", "ERR_5200",
"STN_P4500", "ERR_4500"])
# Initialize lists for failures and errors
failures = []
error_3550 = []
error_4500 = []
error_5200 = []
# Initialize tolerance for baseline range
tolerance = 5
# Determine best-fit baselines for all peaks with ALS (H2Om_{5200},
# OH_{4500}, H2Ot_{3550}) and PyIRoGlass mc3 (H2Om_{1635}, CO3^{2-})
for files, data in dfs_dict.items():
try:
# Ensure the data spans the required wavenumbers
if not ignore_NIR:
upper_wavenumber = 5500
else:
upper_wavenumber = 4400
if (data.index.min() > (1000 + tolerance) or
data.index.max() < (upper_wavenumber - tolerance)):
warnings.warn(f"{files} data do not span the required "
f"wavenumbers of 1000-{upper_wavenumber} cm^-1, with "
f"{tolerance} cm^-1 tolerance. "
f"Available range: {data.index.min():.2f}"
f"-{data.index.max():.2f} cm^-1.",
UserWarning,
stacklevel=2)
# Three repeat baselines for the OH_{4500}
OH_4500_results = None
H2Om_5200_results = None
if not ignore_NIR:
OH_4500_peak_ranges = [(4250, 4675), (4225, 4650), (4275, 4700)]
OH_4500_results = list(map(lambda peak_range: {
"peak_fit": (result := NIR_process(data,
peak_range[0],
peak_range[1],
"OH"))[0],
"peak_krige": result[1],
"PH_krige": result[2],
"STN": result[3]
}, OH_4500_peak_ranges))
# Three repeat baselines for the H2Om_{5200}
H2Om_5200_peak_ranges = [(4875, 5400), (4850, 5375), (4900, 5425)]
H2Om_5200_results = list(map(lambda peak_range: {
"peak_fit": (result := NIR_process(data,
peak_range[0],
peak_range[1],
"H2Om"))[0],
"peak_krige": result[1],
"PH_krige": result[2],
"STN": result[3]
}, H2Om_5200_peak_ranges))
# Kriged peak heights
PH_4500_krige = [result["PH_krige"] for result in
OH_4500_results]
PH_4500_krige_M, PH_4500_krige_STD = (
np.mean(PH_4500_krige),
np.std(PH_4500_krige),
)
PH_5200_krige = [result["PH_krige"] for result in
H2Om_5200_results]
PH_5200_krige_M, PH_5200_krige_STD = (
np.mean(PH_5200_krige),
np.std(PH_5200_krige),
)
# Calculate signal to noise ratio
STN_4500_M = np.mean([result["STN"] for result in
OH_4500_results])
STN_5200_M = np.mean([result["STN"] for result in
H2Om_5200_results])
# Consider strength of signal
error_4500 = "-" if STN_4500_M >= 4.0 else "*"
error_5200 = "-" if STN_5200_M >= 4.0 else "*"
# Save NIR peak heights
NEAR_IR_PH.loc[files] = pd.Series(
{
"PH_5200_M": PH_5200_krige_M,
"PH_5200_STD": PH_5200_krige_STD,
"PH_4500_M": PH_4500_krige_M,
"PH_4500_STD": PH_4500_krige_STD,
"STN_P5200": STN_5200_M,
"ERR_5200": error_5200,
"STN_P4500": STN_4500_M,
"ERR_4500": error_4500,
}
)
else:
NEAR_IR_PH.loc[files] = pd.Series(
{
"PH_5200_M": np.nan,
"PH_5200_STD": np.nan,
"PH_4500_M": np.nan,
"PH_4500_STD": np.nan,
"STN_P5200": np.nan,
"ERR_5200": np.nan,
"STN_P4500": np.nan,
"ERR_4500": np.nan,
}
)
# Three repeat baselines for the H2Ot_{3550}
H2Ot_3550_peak_ranges = [(1900, 4400), (2100, 4200), (2300, 4000)]
H2Ot_3550_results = list(map(lambda peak_range: {
"peak_fit": (result := MIR_process(data,
peak_range[0],
peak_range[1]))[0],
"plot_output": result[1],
"PH": result[2],
}, H2Ot_3550_peak_ranges))
PH_3550 = [result["PH"] for result in H2Ot_3550_results]
PH_3550_M, PH_3550_STD = np.mean(PH_3550), np.std(PH_3550)
# Determine maximum absorbances for H2Ot_{3550} to check for
# saturation
data_H2Ot_3550_1 = H2Ot_3550_results[0]["peak_fit"]
MAX_3550_ABS = data_H2Ot_3550_1[data_H2Ot_3550_1.index > 3550][
"Absorbance"
].iloc[0]
BL_MAX_3550_ABS = data_H2Ot_3550_1[data_H2Ot_3550_1.index > 3550][
"Baseline_MIR"
].iloc[0]
# Assign * marker to saturated samples H2Ot_{3550} absorbance > 2
error_3550 = "-" if MAX_3550_ABS < 2 else "*"
# Save H2Om_{3550} peak heights and saturation information
H2O_3550_PH.loc[files] = pd.Series(
{
"PH_3550_M": PH_3550_M,
"PH_3550_STD": PH_3550_STD,
"H2Ot_3550_MAX": MAX_3550_ABS,
"BL_H2Ot_3550_MAX": BL_MAX_3550_ABS,
"H2Ot_3550_SAT": error_3550,
}
)
# Initialize PyIRoGlass mc3 fit for H2Om_{1635} and CO3^{2-}
df_length = np.shape(wavenumber)[0]
CO2_wn_high, CO2_wn_low = 2400, 1250
spec = data.loc[CO2_wn_low:CO2_wn_high]
# Interpolate data to wavenumber spacing, to prepare for mc3
if spec.shape[0] != df_length:
interp_wn = np.linspace(spec.index[0],
spec.index[-1],
df_length)
interp_abs = interpolate.interp1d(
spec.index, spec["Absorbance"])(interp_wn)
spec = spec.reindex(index=interp_wn)
spec["Absorbance"] = interp_abs
spec_mc3 = spec["Absorbance"].to_numpy()
elif spec.shape[0] == df_length:
spec_mc3 = spec["Absorbance"].to_numpy()
# Set uncertainty
uncert = np.ones_like(spec_mc3) * 0.01
# Run PyIRoGlass mc3!!!
if export_path is not None:
warnings.filterwarnings("ignore", category=DeprecationWarning)
als_bls = {"H2Ot_3550_results": H2Ot_3550_results}
if not ignore_NIR:
als_bls["OH_4500_results"] = OH_4500_results
als_bls["H2Om_5200_results"] = H2Om_5200_results
with open(os.path.join(paths["PKLFILES"],
f"{files}.pkl"), "wb") as handle:
pickle.dump(als_bls, handle,
protocol=pickle.HIGHEST_PROTOCOL)
mc3_output = MCMC(
data=spec_mc3,
uncert=uncert,
indparams=indparams,
log=os.path.join(paths["LOGFILES"], f"{files}.log"),
savefile=os.path.join(paths["NPZTXTFILES"], f"{files}.npz"),
)
texnames = [
"$\\overline{B}$",
"$\\overline{B}_{PC1}$",
"$\\overline{B}_{PC2}$",
"$\\overline{B}_{PC3}$",
"$\\overline{B}_{PC4}$",
"$\\mu_{1430}$",
"$\\sigma_{1430}$",
"$a_{1430}$",
"$\\mu_{1515}$",
"$\\sigma_{1515}$",
"$a_{1515}$",
"$\\overline{H_{1635}}$",
"$\\overline{H_{1635}}_{PC1}$",
"$\\overline{H_{1635}}_{PC2}$",
"$m$",
"$b$",
]
posterior, _, _ = mu.burn(mc3_output)
post = mp.Posterior(posterior, texnames)
post.plot()
plt.suptitle(files)
plt.savefig(os.path.join(paths["PLOTFILES"], "PAIRWISE",
f"{files}_pairwise.pdf"),
bbox_inches="tight")
plt.close("all")
post.plot_histogram(nx=4, ny=4)
plt.suptitle(files, y=1.015)
plt.savefig(os.path.join(paths["PLOTFILES"], "HISTOGRAM",
f"{files}_histogram.pdf"),
bbox_inches="tight")
plt.close("all")
plot_trace(
mc3_output["posterior"],
title=files,
zchain=mc3_output["zchain"],
burnin=mc3_output["burnin"],
pnames=texnames,
savefile=os.path.join(paths["PLOTFILES"], "TRACE",
f"{files}_trace.pdf"),)
plt.close("all")
plot_modelfit(
spec_mc3,
uncert,
indparams[0],
mc3_output["best_model"],
title=files,
savefile=os.path.join(paths["PLOTFILES"], "MODELFIT",
f"{files}_modelfit.pdf"),)
plt.close("all")
else:
mc3_output = MCMC(
data=spec_mc3,
uncert=uncert,
indparams=indparams,
log=None,
savefile=None,
)
# Initialize plotting, create subplots of H2Om_{5200} and OH_{4500}
# baselines and peak fits
if export_path is not None:
warnings.filterwarnings("ignore", module="matplotlib\\..*")
if not ignore_NIR:
fig = plt.figure(figsize=(26, 8))
ax1 = plt.subplot2grid((2, 3), (0, 0), fig=fig)
ax2 = plt.subplot2grid((2, 3), (1, 0), fig=fig)
ax3 = plt.subplot2grid((2, 3), (0, 1), rowspan=2, fig=fig)
ax4 = plt.subplot2grid((2, 3), (0, 2), rowspan=2, fig=fig)
# Create subplot of H2Om_{5200}, OH_{4500} baselines/peak fits
plot_H2Om_OH(data, files, als_bls, ax_top=ax1, ax_bot=ax2)
# Create subplot of H2Ot_{3550} baselines/peak fits
plot_H2Ot_3550(data, files, als_bls, ax=ax3)
# Create subplot of CO_3^{2-} baselines/peak fits
plot_carbonate(data, files, mc3_output, export_path, ax=ax4)
plt.tight_layout()
plt.savefig(os.path.join(paths["FIGURES"], f"{files}.pdf"),)
plt.close("all")
else:
fig, ax = plt.subplots(1, 2, figsize=(16, 8))
ax = ax.flatten()
# Create subplot of H2Om_{5200}, OH_{4500} baselines/peak fits
plot_H2Ot_3550(data, files, als_bls, ax=ax[0])
# Create subplot of CO_3^{2-} baselines/peak fits
plot_carbonate(data, files, mc3_output, export_path, ax=ax[1])
plt.tight_layout()
plt.savefig(os.path.join(paths["FIGURES"], f"{files}.pdf"),)
plt.close("all")
except Exception as e:
failures.append(files)
print(f"{files} failed. Reason: {str(e)}")
else:
pass
# Create DataFrame of best fit parameters and standard deviations
DF_Output.loc[files] = pd.Series(
{
"PH_1635_BP": mc3_output["bestp"][-5],
"PH_1635_STD": mc3_output["stdp"][-5],
"PH_1515_BP": mc3_output["bestp"][-6],
"PH_1515_STD": mc3_output["stdp"][-6],
"P_1515_BP": mc3_output["bestp"][-8],
"P_1515_STD": mc3_output["stdp"][-8],
"STD_1515_BP": mc3_output["bestp"][-7],
"STD_1515_STD": mc3_output["stdp"][-7],
"PH_1430_BP": mc3_output["bestp"][-9],
"PH_1430_STD": mc3_output["stdp"][-9],
"P_1430_BP": mc3_output["bestp"][-11],
"P_1430_STD": mc3_output["stdp"][-11],
"STD_1430_BP": mc3_output["bestp"][-10],
"STD_1430_STD": mc3_output["stdp"][-10],
}
)
PC_Output.loc[files] = pd.Series(
{
"AVG_BL_BP": mc3_output["bestp"][0],
"AVG_BL_STD": mc3_output["stdp"][0],
"PC1_BP": mc3_output["bestp"][1],
"PC1_STD": mc3_output["stdp"][1],
"PC2_BP": mc3_output["bestp"][2],
"PC2_STD": mc3_output["stdp"][2],
"PC3_BP": mc3_output["bestp"][3],
"PC3_STD": mc3_output["stdp"][3],
"PC4_BP": mc3_output["bestp"][4],
"PC4_STD": mc3_output["stdp"][4],
"m_BP": mc3_output["bestp"][-2],
"m_STD": mc3_output["stdp"][-2],
"b_BP": mc3_output["bestp"][-1],
"b_STD": mc3_output["stdp"][-1],
"PH_1635_PC1_BP": mc3_output["bestp"][-4],
"PH_1635_PC1_STD": mc3_output["stdp"][-4],
"PH_1635_PC2_BP": mc3_output["bestp"][-3],
"PH_1635_PC2_STD": mc3_output["stdp"][-3],
}
)
Volatile_PH = pd.concat([H2O_3550_PH, DF_Output, NEAR_IR_PH, PC_Output],
axis=1)
full_file_path = os.path.join(full_path, file_name)
Volatile_PH.to_csv(full_file_path)
return Volatile_PH, failures
[docs]
def beer_lambert(molar_mass, absorbance, density, thickness, epsilon):
"""
Applies the Beer-Lambert Law to calculate concentration from given inputs.
Parameters:
molar_mass (float): The molar mass of the substance in grams per mole.
absorbance (float): The absorbance of the substance, measured in
optical density units.
density (float): The density of the substance in grams per cubic
centimeter.
thickness (float): The thickness of the sample in centimeters.
epsilon (float): The molar extinction coefficient, measured in liters
per mole per centimeter.
Returns:
pd.Series: pandas series containing the concentration in wt.%.
Notes:
The Beer-Lambert Law states that the absorbance of a substance is
proportional to its concentration. The formula for calculating
concentration from absorbance is:
concentration = (1e6*molar_mass*absorbance)/(density*thickness*epsilon)
# https://sites.fas.harvard.edu/~scphys/nsta/error_propagation.pdf
"""
concentration = pd.Series(dtype="float")
concentration = ((1e6 * molar_mass * absorbance) /
(density * thickness * epsilon))
return concentration
[docs]
def beer_lambert_error(N, molar_mass,
absorbance, sigma_absorbance,
density, sigma_density,
thickness, sigma_thickness,
epsilon, sigma_epsilon):
"""
Applies a Monte Carlo simulation to estimate the uncertainty in
concentration calculated using the Beer-Lambert Law.
Parameters:
N (int): The number of Monte Carlo samples to generate.
molar_mass (float): The molar mass of the substance in grams
per mole.
absorbance (float): The absorbance of the substance, measured
in optical density units.
sigma_absorbance (float): The uncertainty associated with the
absorbance measurement.
density (float): The density of the substance in grams per
cubic centimeter.
sigma_density (float): The uncertainty associated with the
density measurement.
thickness (float): The thickness of the sample in centimeters.
sigma_thickness (float): The uncertainty associated with the
sample thickness measurement.
epsilon (float): The molar extinction coefficient, measured
in liters per mole per centimeter.
sigma_epsilon (float): The uncertainty associated with the molar
extinction coefficient measurement.
Returns:
float: The estimated uncertainty in concentration in wt.%.
Notes:
The Beer-Lambert Law states that the absorbance of a substance is
proportional to its concentration. The formula for calculating
concentration from absorbance is:
concentration = (1e6*molar_mass*absorbance)/(density*thickness*epsilon)
This function estimates the uncertainty in concentration using a
Monte Carlo simulation. Absorbance, density, and thickness are assumed
to follow Gaussian distributions with means from the input values
and standard deviations from the input uncertainties. The absorbance
coefficient is assumed to follow a uniform distribution with a mean
given by the input value and a range from the input uncertainty.
The function returns the standard deviation of the resulting
concentration distribution.
# https://astrofrog.github.io/py4sci/_static/Practice%20Problem%20-%20Monte-Carlo%20Error%20Propagation%20-%20Sample%20Solution.html
"""
gaussian_concentration = (
1e6 * molar_mass * np.random.normal(absorbance, sigma_absorbance,
N)
) / (
np.random.normal(density, sigma_density, N)
* np.random.normal(thickness, sigma_thickness, N)
* np.random.uniform(epsilon - sigma_epsilon, epsilon + sigma_epsilon,
N)
)
concentration_std = np.std(gaussian_concentration)
return concentration_std
[docs]
def calculate_density(chemistry, T, P, model="LS"):
"""
The calculate_density function inputs the MI chemistry file and outputs
the glass density at the temperature and pressure of analysis. The mole
fraction is calculated. The total molar volume xivibari is determined from
sum of the mole fractions of each oxide * partial molar volume at room
temperature and pressure of analysis. The gram formula weight gfw is then
determined by summing the mole fractions*molar masses. The density is
finally determined by dividing gram formula weight by total molar volume.
Parameters:
chemistry (pd.DataFrame): Dataframe containing oxide weight
percentages for the glass chemistry.
T (float): temperature at which the density is calculated (in Celsius)
P (float): pressure at which the density is calculated (in bars)
model (str): Choice of density model. "LS" for Lesher and Spera (2015),
"IT" for Iacovino and Till (2019). Default is "LS".
Returns:
mol (pd.DataFrame): DataFrame containing the oxide mole fraction for
the glass chemistry
density (float): glass density at room temperature and pressure
(in kg/m^3)
"""
# Define a dictionary of molar masses for each oxide
molar_mass = {
"SiO2": 60.08,
"TiO2": 79.866,
"Al2O3": 101.96,
"Fe2O3": 159.69,
"FeO": 71.844,
"MnO": 70.9374,
"MgO": 40.3044,
"CaO": 56.0774,
"Na2O": 61.9789,
"K2O": 94.2,
"P2O5": 141.9445,
"H2O": 18.01528,
"CO2": 44.01,
}
# Convert room temperature from Celsius to Kelvin
T_K = T + 273.15
if model == "LS":
# Define a dictionary of partial molar volumes for each oxide, based on
# data compiled from Lesher and Spera (2015).
# Tref = 1400C, Pref = 10**-4 GPa (1 bar).
par_molar_vol = {
"SiO2": (26.86 - 1.89 * P / 1000),
"TiO2": (23.16 + 7.24 * (T_K - 1673) / 1000 - 2.31 * P / 1000),
"Al2O3": (37.42 - 2.26 * P / 1000),
"Fe2O3": (42.13 + 9.09 * (T_K - 1673) / 1000 - 2.53 * P / 1000),
"FeO": (13.65 + 2.92 * (T_K - 1673) / 1000 - 0.45 * P / 1000),
"MgO": (11.69 + 3.27 * (T_K - 1673) / 1000 + 0.27 * P / 1000),
"CaO": (16.53 + 3.74 * (T_K - 1673) / 1000 + 0.34 * P / 1000),
"Na2O": (28.88 + 7.68 * (T_K - 1673) / 1000 - 2.4 * P / 1000),
"K2O": (45.07 + 12.08 * (T_K - 1673) / 1000 - 6.75 * P / 1000),
"H2O": (26.27 + 9.46 * (T_K - 1673) / 1000 - 3.15 * P / 1000),
}
else:
# data compiled from Iacovino and Till (2019)
par_molar_vol = {
"SiO2": (26.86 - 1.89 * P / 1000),
"TiO2": (28.32 + 7.24 * (T_K - 1773) / 1000 - 2.31 * P / 1000),
"Al2O3": (37.42 + 2.62 * (T_K - 1773) / 1000 - 2.26 * P / 1000),
"Fe2O3": (41.5 - 2.53 * P / 1000),
"FeO": (12.68 + 3.69 * (T_K - 1723) / 1000 - 0.45 * P / 1000),
"MgO": (12.02 + 3.27 * (T_K - 1773) / 1000 + 0.27 * P / 1000),
"CaO": (16.9 + 3.74 * (T_K - 1773) / 1000 + 0.34 * P / 1000),
"Na2O": (29.65 + 7.68 * (T_K - 1773) / 1000 - 2.4 * P / 1000),
"K2O": (47.28 + 12.08 * (T_K - 1773) / 1000 - 6.75 * P / 1000),
"H2O": (22.9 + 9.5 * (T_K - 1273) / 1000 - 3.20 * P / 1000),
}
# Create an DataFrame to store oxide moles for chemistry
mol = pd.DataFrame()
# Calculate oxide moles by dividing weight percentage by molar mass
for oxide in chemistry:
mol[oxide] = chemistry[oxide] / molar_mass[oxide]
# Calculate the total moles for the MI chemistry
mol_tot = mol.sum(axis=1)
# Create empty DataFrames to store the partial molar volume, gram formula
# weight, oxide density, and the product of mole fraction and partial
# molar volume for each oxide
xivbari = pd.DataFrame()
gfw = pd.DataFrame()
for oxide in chemistry:
# If the oxide is included in the partial molar volume dictionary,
# calculate its partial molar volume and gram formula weight
if oxide in par_molar_vol:
# partial molar volume
xivbari[oxide] = mol[oxide] / mol_tot * par_molar_vol[oxide]
# gram formula weight
gfw[oxide] = mol[oxide] / mol_tot * molar_mass[oxide]
xivbari_tot = xivbari.sum(axis=1)
gfw_tot = gfw.sum(axis=1)
# Calculate density of glass, convert by *1000 for values in kg/m^3
density = 1000 * gfw_tot / xivbari_tot
return mol, density
[docs]
def calculate_epsilon(chemistry, T, P):
"""
The calculate_epsilon function computes the extinction coefficients
and their uncertainties for various molecular species in a given MI
or glass chemistry dataset.
Parameters:
chemistry (pd.DataFrame): Dataframe containing oxide weight
percentages for the glass chemistry.
T (int): temperature at which the density is calculated (in Celsius)
P (int): pressure at which the density is calculated (in bars)
Returns:
mol (pd.DataFrame): Dataframe of the mole fraction of each oxide in
the glass chemistry
density (pd.DataFrame): Dataframe of glass density at room temperature
and pressure (in kg/m^3)
"""
epsilon = pd.DataFrame(
columns=[
"Tau",
"Eta",
"epsilon_H2Ot_3550",
"sigma_epsilon_H2Ot_3550",
"epsilon_H2Om_1635",
"sigma_epsilon_H2Om_1635",
"epsilon_CO2",
"sigma_epsilon_CO2",
"epsilon_H2Om_5200",
"sigma_epsilon_H2Om_5200",
"epsilon_OH_4500",
"sigma_epsilon_OH_4500",
]
)
mol, _ = calculate_density(chemistry, T, P)
# Calculate extinction coefficient
cation_tot = (mol.sum(axis=1) +
mol["Al2O3"] + mol["Na2O"] + mol["K2O"] + mol["P2O5"])
Na_NaCa = (2 * mol["Na2O"]) / ((2 * mol["Na2O"]) + mol["CaO"])
SiAl_tot = (mol["SiO2"] + (2 * mol["Al2O3"])) / cation_tot
# Set up extinction coefficient inversion best-fit parameters and
# covariance matrices
mest_5200 = np.array([-2.29142025, 4.67552848])
covm_est_5200 = np.diag([0.01285319, 0.02764414])
mest_4500 = np.array([-1.63273006, 3.53252218])
covm_est_4500 = np.diag([0.03292054, 0.07082581])
mest_3550 = np.array([15.73722543, 71.39668681])
covm_est_3550 = np.diag([38.05316054, 77.3885357])
mest_1635 = np.array([-50.3975642, 124.2505339])
covm_est_1635 = np.diag([20.85034888, 39.38749563])
mest_CO2 = np.array([417.17390625, -318.09377591])
covm_est_CO2 = np.diag([84.84230954, 339.64346778])
# Set up matrices for calculating uncertainties on extinction coefficients
G_SiAl = np.ones((2, 1))
G_NaCa = np.ones((2, 1))
covz_error_SiAl = np.zeros((2, 2))
covz_error_NaCa = np.zeros((2, 2))
# Define calibration ranges
tau_ranges = {
"epsilon_H2Om_5200": (0.6, 0.8641748075261225),
"epsilon_OH_4500": (0.6, 0.8641748075261225),
"epsilon_H2Ot_3550": (0.5078379123837519, 0.9023427189457927),
"epsilon_H2Om_1635": (0.627337511542476, 0.86)
}
eta_range = (0.23157895182099122, 0.8406489374848108)
# Loop through and calculate for all MI or glass chemistrys.
for i in chemistry.index:
# Calculate extinction coefficients with best-fit parameters
epsilon_H2Ot_3550 = mest_3550[0] + (mest_3550[1] * SiAl_tot[i])
epsilon_H2Om_1635 = mest_1635[0] + (mest_1635[1] * SiAl_tot[i])
epsilon_CO2 = mest_CO2[0] + (mest_CO2[1] * Na_NaCa[i])
epsilon_H2Om_5200 = mest_5200[0] + (mest_5200[1] * SiAl_tot[i])
epsilon_OH_4500 = mest_4500[0] + (mest_4500[1] * SiAl_tot[i])
# Calculate extinction coefficient uncertainties
G_SiAl[1, 0] = SiAl_tot[i]
G_NaCa[1, 0] = Na_NaCa[i]
covz_error_SiAl[1, 1] = SiAl_tot[i] * 0.01 # 1 sigma
covz_error_NaCa[1, 1] = Na_NaCa[i] * 0.01
CT_int_3550 = (G_SiAl * covm_est_3550 * np.transpose(G_SiAl)) + (
mest_3550 * covz_error_SiAl * np.transpose(mest_3550)
)
CT68_3550 = (np.mean(np.diag(CT_int_3550))) ** (1 / 2)
CT_int_1635 = (G_SiAl * covm_est_1635 * np.transpose(G_SiAl)) + (
mest_1635 * covz_error_SiAl * np.transpose(mest_1635)
)
CT68_1635 = (np.mean(np.diag(CT_int_1635))) ** (1 / 2)
CT_int_CO2 = (G_NaCa * covm_est_CO2 * np.transpose(G_NaCa)) + (
mest_CO2 * covz_error_NaCa * np.transpose(mest_CO2)
)
CT68_CO2 = (np.mean(np.diag(CT_int_CO2))) ** (1 / 2)
CT_int_5200 = (G_SiAl * covm_est_5200 * np.transpose(G_SiAl)) + (
mest_5200 * covz_error_SiAl * np.transpose(mest_5200)
)
CT68_5200 = (np.mean(np.diag(CT_int_5200))) ** (1 / 2)
CT_int_4500 = (G_SiAl * covm_est_4500 * np.transpose(G_SiAl)) + (
mest_4500 * covz_error_SiAl * np.transpose(mest_4500)
)
CT68_4500 = (np.mean(np.diag(CT_int_4500))) ** (1 / 2)
# Save outputs of extinction coefficients to DataFrame epsilon
epsilon.loc[i] = pd.Series(
{
"Tau": SiAl_tot[i],
"Eta": Na_NaCa[i],
"epsilon_H2Ot_3550": epsilon_H2Ot_3550,
"sigma_epsilon_H2Ot_3550": CT68_3550,
"epsilon_H2Om_1635": epsilon_H2Om_1635,
"sigma_epsilon_H2Om_1635": CT68_1635,
"epsilon_CO2": epsilon_CO2,
"sigma_epsilon_CO2": CT68_CO2,
"epsilon_H2Om_5200": epsilon_H2Om_5200,
"sigma_epsilon_H2Om_5200": CT68_5200,
"epsilon_OH_4500": epsilon_OH_4500,
"sigma_epsilon_OH_4500": CT68_4500,
}
)
# Check whether Tau and Eta are in calibration ranges, warn if not
# Same ranges for epsilon_H2Om_5200 and epsilon_OH_4500, simplify
if not (tau_ranges["epsilon_H2Om_5200"][0]
<= SiAl_tot[i] <= tau_ranges["epsilon_H2Om_5200"][1]):
warnings.warn(f"Tau ({round(SiAl_tot[i], 4)}) for {i} "
f"is outside the calibration range for "
f"epsilon_H2Om_5200 and epsilon_OH_4500. "
f"Use caution.",
UserWarning,
stacklevel=2)
if not (tau_ranges["epsilon_H2Ot_3550"][0]
<= SiAl_tot[i] <= tau_ranges["epsilon_H2Ot_3550"][1]):
warnings.warn(f"Tau ({round(SiAl_tot[i], 4)}) for {i} "
f"is outside the calibration range for "
f"epsilon_H2Ot_3550. Use caution.",
UserWarning,
stacklevel=2)
if not (tau_ranges["epsilon_H2Om_1635"][0]
<= SiAl_tot[i] <= tau_ranges["epsilon_H2Om_1635"][1]):
warnings.warn(f"Tau ({round(SiAl_tot[i], 4)}) for {i} "
f"is outside the calibration range for "
f"epsilon_H2Om_1635. Use caution.",
UserWarning,
stacklevel=2)
if not (eta_range[0] <= Na_NaCa[i] <= eta_range[1]):
warnings.warn(f"Eta ({round(Na_NaCa[i], 4)}) for {i} "
f"is outside the calibration range for "
f"epsilon_CO2. Use caution.",
UserWarning,
stacklevel=2)
return epsilon
[docs]
def calculate_concentrations(Volatile_PH, chemistry, thickness,
export_path, N=500000, T=25, P=1,
model="LS", ignore_NIR=False):
"""
The calculate_concentrations function calculates the concentrations
and uncertainties of volatile components (H2O peak (3550 cm^-1),
molecular H2O peak (1635 cm^-1), and carbonate peaks (1515 and
1430 cm^-1) in a glass sample based on peak height data, sample
chemistry, and wafer thickness. This function uses the Beer-Lambert
law for absorbance to estimate concentrations and applies Monte Carlo
simulations to quantify uncertainties. It iteratively adjusts for the
effect of water content on glass density to improve accuracy.
Parameters:
Volatile_PH (pd.DataFrame): DataFrame with columns for peak heights
of total H2O (3550 cm^-1), molecular H2O (1635 cm^-1), and
carbonate peaks (1515 and 1430 cm^-1). Each row represents a
different sample.
chemistry (pd.DataFrame): Dataframe containing oxide weight
percentages for the glass chemistry.
thickness (pd.DataFrame): DataFrame with "Thickness" column
indicating wafer thickness in micrometers (µm) for each sample.
N (int): Number of Monte Carlo simulations to perform for uncertainty
estimation. Default is 500,000.
T (int): Temperature in Celsius at which the density is calculated.
Default is 25°C.
P (int): Pressure in bars at which the density is calculated.
Default is 1 bar.
model (str): Choice of density model. "LS" for Lesher and Spera (2015),
"IT" for Iacovino and Till (2019). Default is "LS".
ignore_NIR (bool): If True, skips calculations for near-infrared.
Returns:
concentrations_df (pd.DataFrame): DataFrame containing calculated
volatile concentrations and their uncertainties for each sample,
including columns for mean and standard deviation of H2O and CO2
species concentrations. ALso contains density ("Density" column)
and extinction coefficient ("epsilon" column) for each sample,
providing insight into the properties of the glass under analysis.
Note:
The function assumes that the input chemistry includes all relevant
oxides and that the Volatile_PH DataFrame contains peak height data for
all specified peaks. Errors in input data or missing values may affect
the accuracy of the results.
"""
# Use the create_output_dirs function to get output directories
if export_path is not None:
paths = create_output_dirs(os.getcwd(), export_path)
full_path = paths["FINALDATA"]
file_name = f"{export_path}_H2OCO2.csv"
else:
full_path = os.path.join(os.getcwd(), "FINALDATA")
file_name = "H2OCO2.csv"
os.makedirs(full_path, exist_ok=True)
full_file_path = os.path.join(full_path, file_name)
common = Volatile_PH.index.intersection(chemistry.index)
if common.empty:
raise ValueError("No sample names overlap between Volatile_PH and chemistry!")
# warn about what’s dropped
dropped_vol = set(Volatile_PH.index) - set(common)
dropped_chem = set(chemistry.index) - set(common)
if dropped_vol or dropped_chem:
print(f"Warning: Dropping {len(dropped_vol)} Volatile_PH samples and "
f"{len(dropped_chem)} chemistry samples names that do not match.")
# filter both DataFrames
Volatile_PH = Volatile_PH.loc[common]
chemistry = chemistry.loc[common]
# Define a dictionary of molar masses for each oxide
molar_mass = {
"SiO2": 60.08,
"TiO2": 79.866,
"Al2O3": 101.96,
"Fe2O3": 159.69,
"FeO": 71.844,
"MnO": 70.9374,
"MgO": 40.3044,
"CaO": 56.0774,
"Na2O": 61.9789,
"K2O": 94.2,
"P2O5": 141.9445,
"H2O": 18.01528,
"CO2": 44.01,
}
# Create DataFrames to store volatile data:
concentrations = pd.DataFrame(
columns=[
"H2Ot_MEAN",
"H2Ot_STD",
"H2Ot_3550_M",
"H2Ot_3550_STD",
"H2Ot_3550_SAT",
"H2Om_1635_BP",
"H2Om_1635_STD",
"CO2_MEAN",
"CO2_STD",
"CO2_1515_BP",
"CO2_1515_STD",
"CO2_1430_BP",
"CO2_1430_STD",
"H2Om_5200_M",
"H2Om_5200_STD",
"OH_4500_M",
"OH_4500_STD",
]
)
# Dataframe for saturated concentrations
concentrations_sat = pd.DataFrame(columns=concentrations.columns)
# Dataframe for storing glass density data
density_df = pd.DataFrame(columns=["Density"])
# Dataframe for storing saturated glass density data
density_sat_df = pd.DataFrame(columns=["Density_Sat"])
# Dataframe for storing mean volatile data
mean_vol = pd.DataFrame(columns=["H2Ot_MEAN", "H2Ot_STD",
"CO2_MEAN", "CO2_STD"])
# Dataframe for storing signal-to-noise error data
stnerror = pd.DataFrame(
columns=["PH_5200_STN", "ERR_5200", "PH_4500_STN", "ERR_4500"]
)
# Initialize density calculation with 0 wt.% H2O.
chemistry["H2O"] = 0
_, density = calculate_density(chemistry, T, P, model)
epsilon = calculate_epsilon(chemistry, T, P)
# Doing density-H2O iterations:
for jj in range(10):
H2Ot_3550_I = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_3550_M"],
density,
thickness["Thickness"],
epsilon["epsilon_H2Ot_3550"],
)
chemistry["H2O"] = H2Ot_3550_I
_, density = calculate_density(chemistry, T, P, model)
# Doing density-H2O iterations:
for kk in Volatile_PH.index:
# Calculate volatile species concentrations
H2Ot_3550_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_3550_M"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_H2Ot_3550"][kk],
)
H2Om_1635_BP = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_1635_BP"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_H2Om_1635"][kk],
)
CO2_1515_BP = beer_lambert(
molar_mass["CO2"],
Volatile_PH["PH_1515_BP"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_CO2"][kk],
)
CO2_1430_BP = beer_lambert(
molar_mass["CO2"],
Volatile_PH["PH_1430_BP"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_CO2"][kk],
)
H2Om_5200_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_5200_M"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_H2Om_5200"][kk],
)
OH_4500_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_4500_M"][kk],
density[kk],
thickness["Thickness"][kk],
epsilon["epsilon_OH_4500"][kk],
)
# Multiply by 1e4 to convert CO2 concentrations to ppm
CO2_1515_BP *= 10000
CO2_1430_BP *= 10000
# Calculate volatile species concentration uncertainties
H2Ot_3550_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_3550_M"][kk],
Volatile_PH["PH_3550_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_H2Ot_3550"][kk],
epsilon["sigma_epsilon_H2Ot_3550"][kk],
)
H2Om_1635_BP_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_1635_BP"][kk],
Volatile_PH["PH_1635_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_H2Om_1635"][kk],
epsilon["sigma_epsilon_H2Om_1635"][kk],
)
CO2_1515_BP_STD = beer_lambert_error(
N,
molar_mass["CO2"],
Volatile_PH["PH_1515_BP"][kk],
Volatile_PH["PH_1515_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_CO2"][kk],
epsilon["sigma_epsilon_CO2"][kk],
)
CO2_1430_BP_STD = beer_lambert_error(
N,
molar_mass["CO2"],
Volatile_PH["PH_1430_BP"][kk],
Volatile_PH["PH_1430_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_CO2"][kk],
epsilon["sigma_epsilon_CO2"][kk],
)
H2Om_5200_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_5200_M"][kk],
Volatile_PH["PH_5200_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_H2Om_5200"][kk],
epsilon["sigma_epsilon_H2Om_5200"][kk],
)
OH_4500_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_4500_M"][kk],
Volatile_PH["PH_4500_STD"][kk],
density[kk],
density[kk] * 0.025,
thickness["Thickness"][kk],
thickness["Sigma_Thickness"][kk],
epsilon["epsilon_OH_4500"][kk],
epsilon["sigma_epsilon_OH_4500"][kk],
)
# Multiply by 1e4 to convert CO2 uncertainties to ppm
CO2_1515_BP_STD *= 10000
CO2_1430_BP_STD *= 10000
# Save volatile concentrations and uncertainties to DataFrame
density_df.loc[kk] = pd.Series({"Density": density[kk]})
concentrations.loc[kk] = pd.Series(
{
"H2Ot_3550_M": H2Ot_3550_M,
"H2Ot_3550_STD": H2Ot_3550_M_STD,
"H2Ot_3550_SAT": Volatile_PH["H2Ot_3550_SAT"][kk],
"H2Om_1635_BP": H2Om_1635_BP,
"H2Om_1635_STD": H2Om_1635_BP_STD,
"CO2_1515_BP": CO2_1515_BP,
"CO2_1515_STD": CO2_1515_BP_STD,
"CO2_1430_BP": CO2_1430_BP,
"CO2_1430_STD": CO2_1430_BP_STD,
"H2Om_5200_M": H2Om_5200_M,
"H2Om_5200_STD": H2Om_5200_M_STD,
"OH_4500_M": OH_4500_M,
"OH_4500_STD": OH_4500_M_STD,
}
)
# Loops through for samples to perform two operations depending on
# saturation. For unsaturated samples, take concentrations and
# uncertainties from above. For saturated samples, perform the
# Beer-Lambert calculation again, and have total H2O = H2Om + OH-
for ll in Volatile_PH.index:
if Volatile_PH["H2Ot_3550_SAT"][ll] == "-":
H2Ot_3550_M = concentrations["H2Ot_3550_M"][ll]
H2Om_1635_BP = concentrations["H2Om_1635_BP"][ll]
CO2_1515_BP = concentrations["CO2_1515_BP"][ll]
CO2_1430_BP = concentrations["CO2_1430_BP"][ll]
H2Om_5200_M = concentrations["H2Om_5200_M"][ll]
OH_4500_M = concentrations["OH_4500_M"][ll]
H2Ot_3550_M_STD = concentrations["H2Ot_3550_STD"][ll]
H2Om_1635_BP_STD = concentrations["H2Om_1635_STD"][ll]
CO2_1515_BP_STD = concentrations["CO2_1515_STD"][ll]
CO2_1430_BP_STD = concentrations["CO2_1430_STD"][ll]
H2Om_5200_M_STD = concentrations["H2Om_5200_STD"][ll]
OH_4500_M_STD = concentrations["OH_4500_STD"][ll]
density_sat = density_df["Density"][ll]
elif Volatile_PH["H2Ot_3550_SAT"][ll] == "*":
if ignore_NIR or pd.isna(Volatile_PH.loc[ll, "PH_4500_M"]):
density_sat = density_df["Density"][ll]
else:
sat_chemistry = chemistry.copy()
density_curr = density.copy()
for m in range(20):
H2Om_1635_BP = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_1635_BP"][ll],
density_curr[ll],
thickness["Thickness"][ll],
epsilon["epsilon_H2Om_1635"][ll],
)
OH_4500_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_4500_M"][ll],
density_curr[ll],
thickness["Thickness"][ll],
epsilon["epsilon_OH_4500"][ll],
)
sat_chemistry.loc[ll, "H2O"] = H2Om_1635_BP + OH_4500_M
mol_sat, density_curr = calculate_density(sat_chemistry,
T, P, model)
density_sat = density_curr[ll]
H2Ot_3550_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_3550_M"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_H2Ot_3550"][ll],
)
H2Om_1635_BP = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_1635_BP"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_H2Om_1635"][ll],
)
CO2_1515_BP = beer_lambert(
molar_mass["CO2"],
Volatile_PH["PH_1515_BP"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_CO2"][ll],
)
CO2_1430_BP = beer_lambert(
molar_mass["CO2"],
Volatile_PH["PH_1430_BP"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_CO2"][ll],
)
H2Om_5200_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_5200_M"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_H2Om_5200"][ll],
)
OH_4500_M = beer_lambert(
molar_mass["H2O"],
Volatile_PH["PH_4500_M"][ll],
density_sat,
thickness["Thickness"][ll],
epsilon["epsilon_OH_4500"][ll],
)
CO2_1515_BP *= 10000
CO2_1430_BP *= 10000
H2Ot_3550_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_3550_M"][ll],
Volatile_PH["PH_3550_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_H2Ot_3550"][ll],
epsilon["sigma_epsilon_H2Ot_3550"][ll],
)
H2Om_1635_BP_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_1635_BP"][ll],
Volatile_PH["PH_1635_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_H2Om_1635"][ll],
epsilon["sigma_epsilon_H2Om_1635"][ll],
)
CO2_1515_BP_STD = beer_lambert_error(
N,
molar_mass["CO2"],
Volatile_PH["PH_1515_BP"][ll],
Volatile_PH["PH_1515_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_CO2"][ll],
epsilon["sigma_epsilon_CO2"][ll],
)
CO2_1430_BP_STD = beer_lambert_error(
N,
molar_mass["CO2"],
Volatile_PH["PH_1430_BP"][ll],
Volatile_PH["PH_1430_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_CO2"][ll],
epsilon["sigma_epsilon_CO2"][ll],
)
H2Om_5200_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_5200_M"][ll],
Volatile_PH["PH_5200_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_H2Om_5200"][ll],
epsilon["sigma_epsilon_H2Om_5200"][ll],
)
OH_4500_M_STD = beer_lambert_error(
N,
molar_mass["H2O"],
Volatile_PH["PH_4500_M"][ll],
Volatile_PH["PH_4500_STD"][ll],
density_sat,
density_sat * 0.025,
thickness["Thickness"][ll],
thickness["Sigma_Thickness"][ll],
epsilon["epsilon_OH_4500"][ll],
epsilon["sigma_epsilon_OH_4500"][ll],
)
CO2_1515_BP_STD *= 10000
CO2_1430_BP_STD *= 10000
density_sat_df.loc[ll] = pd.Series({"Density_Sat": density_sat})
# contains both saturated and unsaturated values despite being
# called concentrations_sat, pulls correct values from both dfs
concentrations_sat.loc[ll] = pd.Series(
{
"H2Ot_3550_M": H2Ot_3550_M,
"H2Ot_3550_SAT": Volatile_PH["H2Ot_3550_SAT"][ll],
"H2Ot_3550_STD": H2Ot_3550_M_STD,
"H2Om_1635_BP": H2Om_1635_BP,
"H2Om_1635_STD": H2Om_1635_BP_STD,
"CO2_1515_BP": CO2_1515_BP,
"CO2_1515_STD": CO2_1515_BP_STD,
"CO2_1430_BP": CO2_1430_BP,
"CO2_1430_STD": CO2_1430_BP_STD,
"H2Om_5200_M": H2Om_5200_M,
"H2Om_5200_STD": H2Om_5200_M_STD,
"OH_4500_M": OH_4500_M,
"OH_4500_STD": OH_4500_M_STD,
}
)
stnerror.loc[ll] = pd.Series(
{
"PH_5200_STN": Volatile_PH["STN_P5200"][ll],
"PH_4500_STN": Volatile_PH["STN_P4500"][ll],
"ERR_5200": Volatile_PH["ERR_5200"][ll],
"ERR_4500": Volatile_PH["ERR_4500"][ll],
}
)
# Create final spreadsheet
concentrations_df = pd.concat(
[concentrations_sat, stnerror,
density_df, density_sat_df, epsilon],
axis=1
)
# Output different values depending on saturation.
for m in concentrations.index:
if concentrations["H2Ot_3550_SAT"][m] == "*":
if not ignore_NIR: # option if using NIR data for saturated
H2O_mean = (concentrations_sat["H2Om_1635_BP"][m] +
concentrations_sat["OH_4500_M"][m])
H2O_std = (
(concentrations_sat["H2Om_1635_STD"][m] ** 2)
+ (concentrations_sat["OH_4500_STD"][m] ** 2)
) ** (1 / 2) / 2
else: # option if ignoring NIR data for saturated
H2O_mean = concentrations_sat["H2Ot_3550_M"][m]
H2O_std = concentrations_sat["H2Ot_3550_STD"][m]
elif concentrations["H2Ot_3550_SAT"][m] == "-":
H2O_mean = concentrations_sat["H2Ot_3550_M"][m]
H2O_std = concentrations_sat["H2Ot_3550_STD"][m]
mean_vol.loc[m] = pd.Series({"H2Ot_MEAN": H2O_mean,
"H2Ot_STD": H2O_std})
mean_vol["CO2_MEAN"] = (concentrations_sat["CO2_1515_BP"] +
concentrations_sat["CO2_1430_BP"]) / 2
mean_vol["CO2_STD"] = (
(concentrations_sat["CO2_1515_STD"] ** 2) +
(concentrations_sat["CO2_1430_STD"] ** 2)
) ** (1 / 2) / 2
concentrations_df["H2Ot_MEAN"] = mean_vol["H2Ot_MEAN"]
concentrations_df["H2Ot_STD"] = mean_vol["H2Ot_STD"]
concentrations_df["CO2_MEAN"] = mean_vol["CO2_MEAN"]
concentrations_df["CO2_STD"] = mean_vol["CO2_STD"]
concentrations_df.to_csv(full_file_path)
return concentrations_df
# %% Plotting Functions
[docs]
def plot_H2Om_OH(data, files, als_bls, ax_top=None, ax_bot=None):
"""
Visualizes NIR spectrum data along with baseline-subtracted and kriged
peak fits for H2Om and OH components in spectral data. The function plots
the original NIR spectrum and overlays the median-filtered peak fits and
baseline fits for H2Om (5200 cm^-1) and OH- (4500 cm^-1) peaks on the top
axis. The bottom axis displays the baseline-subtracted peaks and the
kriged peak fits, emphasizing the isolated peak shapes and their
respective signal-to-noise ratios.
Parameters:
data (pd.DataFrame): DataFrame containing spectral data with
"Absorbance" values indexed by wavenumber.
files (str): The identifier for the current sample set being
processed, used for titling the plot.
als_bls (dict): A dictionary containing results from baseline
fitting and peak analysis. Expected keys include
"H2Om_5200_results" and "OH_4500_results", each storing a
list of result dictionaries from the ALS (Asymmetric Least
Squares) processing for respective peaks.
ax_top (matplotlib.axes.Axes, optional): The top subplot axis
for plotting the NIR spectrum and peak fits. If not provided,
a new figure with subplots will be created.
ax_bot (matplotlib.axes.Axes, optional): The bottom subplot
axis for plotting baseline-subtracted peaks and kriged peak
fits. If not provided, it will be created along with `ax_top`
in a new figure.
Returns:
None: This function does not return any value. It generates a plot
visualizing the spectral data, baseline fits, and peak fits for the
provided sample.
Note:
The function is designed to work within a larger analytical framework,
where spectral preprocessing and peak fitting have been previously
conducted. It expects specific data structures from these analyses as
input. The function modifies the provided `ax_top` and `ax_bot`
axes in place if they are provided; otherwise, it creates a new
figure and axes for plotting.
"""
if ax_top is None or ax_bot is None:
fig, (ax_top, ax_bot) = plt.subplots(2, 1, figsize=(8, 8))
ax_top.set_title(files)
H2Om_5200_results = als_bls["H2Om_5200_results"]
OH_4500_results = als_bls["OH_4500_results"]
# Calculate peak heights
PH_4500_krige = [result["PH_krige"] for result in OH_4500_results]
PH_4500_krige_M, PH_4500_krige_STD = (np.mean(PH_4500_krige),
np.std(PH_4500_krige))
PH_5200_krige = [result["PH_krige"] for result in H2Om_5200_results]
PH_5200_krige_M, PH_5200_krige_STD = (np.mean(PH_5200_krige),
np.std(PH_5200_krige))
# Calculate signal to noise ratio
STN_4500_M = np.mean([result["STN"] for result in OH_4500_results])
STN_5200_M = np.mean([result["STN"] for result in H2Om_5200_results])
warnings.filterwarnings("ignore", module="matplotlib\\..*")
warnings.filterwarnings("ignore", category=UserWarning)
ax_top.plot(data.index, data["Absorbance"], "k",
linewidth=1.5, label="NIR Spectrum")
for result in H2Om_5200_results:
ax_top.plot(result["peak_fit"].index,
result["peak_fit"]["Absorbance_Filt"],
"tab:blue",
label=(r"$\mathregular{H_2O_{m, 5200}}$ Median Filtered" if
result is H2Om_5200_results[0] else "_"))
for result in OH_4500_results:
ax_top.plot(result["peak_fit"].index,
result["peak_fit"]["Absorbance_Filt"],
"tab:orange",
label=(r"$\mathregular{OH^{-}_{4500}}$ Median Filtered" if
result is OH_4500_results[0] else "_"))
for result in H2Om_5200_results:
ax_top.plot(result["peak_fit"].index,
result["peak_fit"]["Baseline_NIR"],
"lightsteelblue",
label=r"$\mathregular{H_2O_{m, 5200}}$ Baseline" if
result is H2Om_5200_results[0] else "_")
for result in OH_4500_results:
ax_top.plot(result["peak_fit"].index,
result["peak_fit"]["Baseline_NIR"],
"bisque",
label=r"$\mathregular{OH^{-}_{4500}}$ Baseline" if
result is OH_4500_results[0] else "_")
handles_top, labels_top = ax_top.get_legend_handles_labels()
filtered_handles_top = [h_t for h_t, l_t in
zip(handles_top, labels_top)
if not l_t.startswith("_")]
filtered_labels_top = [l_t for l_t in labels_top if not
l_t.startswith("_")]
ax_top.legend(filtered_handles_top, filtered_labels_top, prop={"size": 10})
ax_top.annotate(
r"$\mathregular{H_2O_{m, 5200}}$ Peak Height: "
+ f"{PH_5200_krige_M:.4f} "
+ r"± "
+ f"{PH_5200_krige_STD:.4f}, S2N={STN_5200_M:.2f}",
(0.025, 0.9),
xycoords="axes fraction",
)
ax_top.annotate(
r"$\mathregular{OH^{-}_{4500}}$ Peak Height: "
+ f"{PH_4500_krige_M:.4f} "
+ r"± "
+ f"{PH_4500_krige_STD:.4f}, S2N={STN_4500_M:.2f}",
(0.025, 0.8),
xycoords="axes fraction",
)
plotmin = np.round(np.min(data.loc[4250:5400]["Absorbance"]), decimals=1)
plotmax = np.round(np.max(data.loc[4250:5400]["Absorbance"]), decimals=1)
ax_top.set_xlim([4200, 5400])
ax_top.set_ylim([plotmin - 0.075, plotmax + 0.075])
ax_top.tick_params(axis="x", direction="in", length=5, pad=6.5)
ax_top.tick_params(axis="y", direction="in", length=5, pad=6.5)
ax_top.invert_xaxis()
warnings.filterwarnings("ignore", module="matplotlib\\..*")
warnings.filterwarnings("ignore", category=UserWarning)
for result in H2Om_5200_results:
baseline_subtracted = (result["peak_fit"]["Peak_Subtract"] -
np.min(result["peak_krige"]["Absorbance"]))
ax_bot.plot(result["peak_fit"].index, baseline_subtracted, "k",
label=(r"$\mathregular{H_2O_{m,5200}}$ Baseline Subtracted"
if result is H2Om_5200_results[0] else "_"))
for result in OH_4500_results:
baseline_subtracted = (result["peak_fit"]["Peak_Subtract"] -
np.min(result["peak_krige"]["Absorbance"]))
ax_bot.plot(result["peak_fit"].index, baseline_subtracted, "k",
label=(r"$\mathregular{OH^{-}_{4500}}$ Baseline Subtracted"
if result is OH_4500_results[0] else "_"))
for result in H2Om_5200_results:
kriged_peak = (result["peak_krige"]["Absorbance"] -
np.min(result["peak_krige"]["Absorbance"]))
ax_bot.plot(result["peak_krige"].index, kriged_peak, "tab:blue",
label=(r"$\mathregular{H_2O_{m,5200}}$ Kriged Peak"
if result is H2Om_5200_results[0] else "_"))
for result in OH_4500_results:
kriged_peak = (result["peak_krige"]["Absorbance"] -
np.min(result["peak_krige"]["Absorbance"]))
ax_bot.plot(result["peak_krige"].index, kriged_peak, "tab:orange",
label=(r"$\mathregular{OH^{-}_{4500}}$ Kriged Peak"
if result is OH_4500_results[0] else "_"))
# Handling legend entries to avoid duplicates
handles_bottom, labels_bottom = ax_bot.get_legend_handles_labels()
filtered_handles_bottom = [h_b for h_b, l_b in
zip(handles_bottom, labels_bottom)
if not l_b.startswith("_")]
filtered_labels_bottom = [l_b for l_b in labels_bottom if not
l_b.startswith("_")]
ax_bot.legend(filtered_handles_bottom, filtered_labels_bottom,
prop={"size": 10})
ax_bot.set_xlim([4200, 5400])
plotmax = np.round(
np.max(OH_4500_results[0]["peak_fit"]["Peak_Subtract"]), decimals=1
)
ax_bot.set_ylim([0, plotmax + 0.05])
ax_bot.tick_params(axis="x", direction="in", length=5, pad=6.5)
ax_bot.tick_params(axis="y", direction="in", length=5, pad=6.5)
ax_bot.invert_xaxis()
[docs]
def plot_H2Ot_3550(data, files, als_bls, ax=None):
"""
Plots Mid Infrared (MIR) spectral data along with the baseline and
filtered peak for the total water (H2Ot) peak at 3550 cm^-1. This
function visualizes the original MIR spectrum, the baseline fit, and
the baseline-subtracted and filtered peak for H2Ot, highlighting the
isolated peak shape and its characteristics.
Parameters:
data (pd.DataFrame): DataFrame containing spectral data with
"Absorbance" values indexed by wavenumber. Expected to cover
the MIR range relevant for the H2Ot peak analysis.
files (str): Identifier for the current sample set being processed,
used for titling the plot.
als_bls (dict): Dictionary containing results from baseline fitting
and peak analysis. Expected to have a key "H2Ot_3550_results"
which stores a list of dictionaries with results from the
processing for the H2Ot peak.
ax (matplotlib.axes.Axes, optional): Matplotlib axis object where
the plot will be drawn. If None, a new figure and axis will be
created.
Returns:
None: This function does not return any value. It generates a plot
visualizing the MIR spectral data, baseline fit, and filtered
peak fit for the H2Ot peak at 3550 cm^-1.
Note:
The function is designed to work as part of a larger spectroscopic
analysis workflow, where spectral data preprocessing, baseline fitting,
and peak analysis have been previously conducted. It expects specific
data structures from these analyses as input. If "ax" is not provided,
the function creates a new figure and axis for plotting, which might
not be ideal for integrating this plot into multi-panel figures or
more complex visual layouts.
"""
if ax is None:
fig, ax = plt.subplots(figsize=(8, 8))
H2Ot_3550_results = als_bls["H2Ot_3550_results"]
# Calculate peak heights
PH_3550 = [result["PH"] for result in H2Ot_3550_results]
PH_3550_M, PH_3550_STD = np.mean(PH_3550), np.std(PH_3550)
ax.plot(data.index, data["Absorbance"], "k")
ax.plot(
H2Ot_3550_results[0]["peak_fit"]["Absorbance"].index,
H2Ot_3550_results[0]["peak_fit"]["Baseline_MIR"],
"silver",
label=r"$\mathregular{H_2O_{t, 3550}}$ Baseline",
)
ax.plot(
H2Ot_3550_results[1]["peak_fit"]["Absorbance"].index,
H2Ot_3550_results[1]["peak_fit"]["Baseline_MIR"],
"silver",
)
ax.plot(
H2Ot_3550_results[2]["peak_fit"]["Absorbance"].index,
H2Ot_3550_results[2]["peak_fit"]["Baseline_MIR"],
"silver",
)
ax.plot(
H2Ot_3550_results[0]["plot_output"].index,
(
H2Ot_3550_results[0]["plot_output"]["Peak_Subtract_Filt"]
+ H2Ot_3550_results[0]["plot_output"]["Baseline_MIR"]
),
"r",
linewidth=2,
)
ax.plot(
H2Ot_3550_results[1]["plot_output"].index,
(
H2Ot_3550_results[1]["plot_output"]["Peak_Subtract_Filt"]
+ H2Ot_3550_results[1]["plot_output"]["Baseline_MIR"]
),
"r",
linewidth=2,
)
ax.plot(
H2Ot_3550_results[2]["plot_output"].index,
(
H2Ot_3550_results[2]["plot_output"]["Peak_Subtract_Filt"]
+ H2Ot_3550_results[2]["plot_output"]["Baseline_MIR"]
),
"r",
linewidth=2,
)
ax.set_title(files)
ax.annotate(
r"$\mathregular{H_2O_{t, 3550}}$ Peak Height: "
+ f"{PH_3550_M:.4f} ± {PH_3550_STD:.4f}",
(0.025, 0.95),
xycoords="axes fraction",
)
ax.set_xlabel(r"Wavenumber $(\mathregular{cm^{-1}})$")
ax.set_xlim([1250, 4000])
ax.set_ylabel("Absorbance")
plotmax = np.round(np.max(data.loc[1250:4000]["Absorbance"].to_numpy()),
decimals=0)
plotmin = np.round(np.min(data.loc[1250:4000]["Absorbance"].to_numpy()),
decimals=0)
ax.set_ylim([plotmin - 0.25, plotmax + 0.5])
ax.tick_params(axis="x", direction="in", length=5, pad=6.5)
ax.tick_params(axis="y", direction="in", length=5, pad=6.5)
ax.legend(loc="upper right", prop={"size": 10})
ax.invert_xaxis()
[docs]
def derive_carbonate(data, files, mc3_output, export_path):
"""
Derives and saves carbonate region baseline and peak fits from FTIR
spectral data using the PyIRoGlass model outputs. This function
interpolates the spectral data to match the wavenumber spacing
required by the model, calculates the baseline and peak fits for
carbonate at 1430 and 1515 cm^-1, and water at 1635 cm^-1. It also
generates an ensemble of baseline fits from the posterior distributions
to estimate uncertainties. The results, including best fits and
baselines, can optionally be saved to CSV files.
Parameters:
data (pd.DataFrame): DataFrame containing FTIR spectral data with
"Absorbance" values indexed by wavenumber.
file (str): Spectrum sample name.
mc3_output (dict): Dictionary containing results from PyIRoGlass
fitting, including best fit parameters, posterior distributions,
and other model outputs relevant for calculating peak fits.
savefile (bool): If True, the function will save the best fits and
baselines data to CSV files named according to the "file"
parameter. If False, no files will be saved.
Returns:
bestfits (pd.DataFrame): DataFrame containing the original spectral
data, the complete spectrum fit from PyIRoGlass, individual peak
fits for carbonate and water, and the calculated baseline.
Indexed by wavenumber.
baselines (pd.DataFrame): DataFrame containing an ensemble of
baseline fits derived from the posterior distribution to represent
uncertainty in the baseline estimation. Indexed by wavenumber.
"""
vector_loader = VectorLoader()
wavenumber = vector_loader.wavenumber
PCmatrix = vector_loader.baseline_PC
H2Om1635_PCmatrix = vector_loader.H2Om_PC
Nvectors = 5
df_length = np.shape(wavenumber)[0]
CO2_wn_high, CO2_wn_low = 2400, 1250
spec = data.loc[CO2_wn_low:CO2_wn_high]
# Interpolate data to wavenumber spacing, to prepare for mc3
if spec.shape[0] != df_length:
interp_wn = np.linspace(spec.index[0],
spec.index[-1],
df_length)
interp_abs = interpolate.interp1d(
spec.index, spec["Absorbance"])(interp_wn)
spec = spec.reindex(index=interp_wn)
spec["Absorbance"] = interp_abs
spec_mc3 = spec["Absorbance"].to_numpy()
elif spec.shape[0] == df_length:
spec_mc3 = spec["Absorbance"].to_numpy()
Spectrum_Solve_BP = carbonate(
mc3_output["meanp"], wavenumber, PCmatrix, H2Om1635_PCmatrix, Nvectors
)
Baseline_Solve_BP = (
np.asarray(mc3_output["bestp"][0:Nvectors] @ PCmatrix.T).ravel() +
linear(wavenumber, mc3_output["bestp"][14], mc3_output["bestp"][15])
)
H1635_BP = np.asarray(mc3_output["bestp"][11:14] @
H2Om1635_PCmatrix.T).ravel()
CO2P1430_BP = gauss(
wavenumber,
mc3_output["bestp"][5],
mc3_output["bestp"][6],
A=mc3_output["bestp"][7],
)
CO2P1515_BP = gauss(
wavenumber,
mc3_output["bestp"][8],
mc3_output["bestp"][9],
A=mc3_output["bestp"][10],
)
H1635_SOLVE = H1635_BP + Baseline_Solve_BP
CO2P1515_SOLVE = CO2P1515_BP + Baseline_Solve_BP
CO2P1430_SOLVE = CO2P1430_BP + Baseline_Solve_BP
posterior = mc3_output["posterior"]
mask = mc3_output["zmask"]
masked_posterior = posterior[mask]
samplingerror = masked_posterior[:, 0:5]
samplingerror = samplingerror[
0: np.shape(masked_posterior[:, :])[0]: int(
np.shape(masked_posterior[:, :])[0] / 100
),
:,
]
lineerror = masked_posterior[:, -2:None]
lineerror = lineerror[
0: np.shape(masked_posterior[:, :])[0]: int(
np.shape(masked_posterior[:, :])[0] / 100
),
:,
]
Baseline_Array = np.array(samplingerror @ PCmatrix[:, :].T)
for i in range(np.shape(Baseline_Array)[0]):
linearray = linear(wavenumber, lineerror[i, 0], lineerror[i, 1])
Baseline_Array[i, :] += linearray
best_fits_data = {
"Wavenumber": wavenumber,
"Spectrum": spec_mc3,
"Spectrum_Fit": Spectrum_Solve_BP,
"Baseline": Baseline_Solve_BP,
"H2Om_1635": H1635_SOLVE,
"CO2_1515": CO2P1515_SOLVE,
"CO2_1430": CO2P1430_SOLVE,
}
bestfits = pd.DataFrame(best_fits_data)
bestfits.set_index("Wavenumber", inplace=True)
baselines = pd.DataFrame(
Baseline_Array.T,
index=wavenumber,
columns=[f"Baseline_{i}" for i in range(Baseline_Array.shape[0])],
)
baselines.index.name = "Wavenumber"
if export_path is not None:
bppath = os.path.join(os.getcwd(), "BLPEAKFILES", export_path)
os.makedirs(bppath, exist_ok=True)
bestfits.to_csv(os.path.join(bppath, f"{files}_bestfits.csv"))
baselines.to_csv(os.path.join(bppath, f"{files}_baselines.csv"))
return bestfits, baselines
[docs]
def plot_carbonate(data, files, mc3_output, export_path, ax=None):
"""
Plots the FTIR spectrum along with baseline fits, and model fits for
carbonate and water peaks. This function visualizes the original FTIR
spectrum, the baseline determined by PCA, and model fits for CO_3^{2-}
doublet, and the H2Om_1635 peak, generated from PyIRoGlass fitting.
Parameters:
data (pd.DataFrame): DataFrame containing FTIR spectral data with
"Absorbance" values indexed by wavenumber. Expected to cover
the range relevant for carbonate and water peak analysis.
files (str): Identifier for the current sample set being processed,
used for titling the plot.
mc3_output (dict): Dictionary containing results from PyIRoGlass
fitting, including best fit parameters, standard deviations,
posterior distributions, and other model outputs.
ax (matplotlib.axes.Axes, optional): Matplotlib axis object where the
plot will be drawn. If None, a new figure and axis will be created.
Returns:
None: This function does not return any value. It generates a plot
visualizing the FTIR spectral data, baseline fit, and model fits
for carbonate and water peaks.
Note:
The function is designed to work as part of a larger spectroscopic
analysis workflow, where spectral data preprocessing, baseline fitting,
and peak modeling have been previously conducted. It expects specific
data structures from these analyses as input. If "ax" is not provided,
the function creates a new figure and axis for plotting, which might
not be ideal for integrating this plot into multi-panel figures or
more complex visual layouts.
"""
if ax is None:
fig, ax = plt.subplots(figsize=(8, 8))
ax.set_title(files)
bestfits, baselines = derive_carbonate(data,
files,
mc3_output,
export_path)
ax.plot(baselines.index, baselines.to_numpy(), "dimgray", linewidth=0.25)
ax.plot(
bestfits.index,
bestfits.Spectrum,
"tab:blue",
linewidth=2.5,
label="FTIR Spectrum",
)
ax.plot(
bestfits.index,
bestfits.H2Om_1635,
"tab:orange",
linewidth=1.5,
label=r"$\mathregular{H_2O_{m, 1635}}$",
)
ax.plot(
bestfits.index,
bestfits.CO2_1515,
"tab:green",
linewidth=2.5,
label=r"$\mathregular{CO_{3, 1515}^{2-}}$",
)
ax.plot(
bestfits.index,
bestfits.CO2_1430,
"tab:red",
linewidth=2.5,
label=r"$\mathregular{CO_{3, 1430}^{2-}}$",
)
ax.plot(
bestfits.index,
bestfits.Spectrum_Fit,
"tab:purple",
linewidth=1.5,
label="PyIRoGlass Fit",
)
ax.plot(bestfits.index, bestfits.Baseline,
"k", linewidth=1.5, label="Baseline")
ax.annotate(
r"$\mathregular{H_2O_{m, 1635}}$ Peak Height: "
+ f"{mc3_output['bestp'][11]:.3f} ± {mc3_output['stdp'][11]:.3f}",
(0.025, 0.95),
xycoords="axes fraction",
)
ax.annotate(
r"$\mathregular{CO_{3, 1515}^{2-}}$ Peak Height: "
+ f"{mc3_output['bestp'][10]:.3f} ± {mc3_output['stdp'][10]:.3f}",
(0.025, 0.90),
xycoords="axes fraction",
)
ax.annotate(
r"$\mathregular{CO_{3, 1430}^{2-}}$ Peak Height: "
+ f"{mc3_output['bestp'][7]:.3f} ± {mc3_output['stdp'][7]:.3f}",
(0.025, 0.85),
xycoords="axes fraction",
)
ax.set_xlim([1250, 2400])
ax.set_xlabel(r"Wavenumber $(\mathregular{cm^{-1}})$")
ax.set_ylabel("Absorbance")
ax.legend(loc="upper right", prop={"size": 10})
ax.tick_params(axis="x", direction="in", length=5, pad=6.5)
ax.tick_params(axis="y", direction="in", length=5, pad=6.5)
ax.invert_xaxis()
[docs]
def plot_trace(posterior, title, zchain=None, pnames=None, thinning=50,
burnin=0, fignum=1000, savefile=None, fmt=".", ms=2.5, fs=12):
"""
Plot parameter trace MCMC sampling.
Parameters:
posterior (2D np.ndarray): MCMC posterior sampling with dimension:
[nsamples, npars].
zchain (1D np.ndarray): Chain index for each posterior sample.
pnames (str): Label names for parameters.
thinning (int): Thinning factor for plotting (plot every thinning-th
value).
burnin (int): Thinned burn-in number of iteration (only used when
zchain is not None).
fignum (int): The figure number.
savefile (bool): Name of file to save the plot if not none
fmt (string): The format string for the line and marker.
ms (float): Marker size.
fs (float): Fontsize of texts.
Returns:
axes (1D list of matplotlib.axes.Axes): List of axes containing
the marginal posterior distributions.
"""
# Get indices for samples considered in final analysis:
if zchain is not None:
nchains = np.amax(zchain) + 1
good = np.zeros(len(zchain), bool)
for c in range(nchains):
good[np.where(zchain == c)[0][burnin:]] = True
# Values accepted for posterior stats:
posterior = posterior[good]
zchain = zchain[good]
# Sort the posterior by chain:
zsort = np.lexsort([zchain])
posterior = posterior[zsort]
zchain = zchain[zsort]
# Get location for chains separations:
xsep = np.where(np.ediff1d(zchain[0::thinning]))[0]
# Get number of parameters and length of chain:
nsamples, npars = np.shape(posterior)
# Number of samples (thinned):
xmax = len(posterior[0::thinning])
# Set default parameter names:
if pnames is None:
pnames = mu.default_parnames(npars)
npanels = 16 # Max number of panels per page
npages = int(1 + (npars - 1) / npanels)
# Make the trace plot:
axes = []
ipar = 0
for page in range(npages):
fig = plt.figure(fignum + page, figsize=(8.5, 11.0))
plt.clf()
plt.subplots_adjust(left=0.15, right=0.95,
bottom=0.05, top=0.95, hspace=0.15)
while ipar < npars:
ax = plt.subplot(npanels, 1, ipar % npanels + 1)
axes.append(ax)
ax.plot(posterior[0::thinning, ipar],
fmt, ms=ms, c="#46A4F5", rasterized=True)
yran = ax.get_ylim()
if zchain is not None:
ax.vlines(xsep, yran[0], yran[1], "0.5")
# Y-axis adjustments:
ax.set_ylim(yran)
ax.locator_params(axis="y", nbins=5, tight=True)
ax.tick_params(labelsize=fs - 1, direction="in",
top=True, right=True)
ax.set_ylabel(pnames[ipar], size=fs, multialignment="center")
# X-axis adjustments:
ax.set_xlim(0, xmax)
ax.get_xaxis().set_visible(False)
ipar += 1
if ipar % npanels == 0:
break
ax.set_xlabel("Thinned MCMC Sample", size=fs)
ax.get_xaxis().set_visible(True)
if savefile is not None:
if npages > 1:
sf = os.path.splitext(savefile)
try:
bbox = fig.get_tightbbox(fig._cachedRenderer).padded(0.1)
bbox_points = bbox.get_points()
bbox_points[:, 0] = 0.0, 8.5
bbox.set_points(bbox_points)
except Exception:
ylow = 9.479 - 0.862 * np.amin(
[npanels - 1, npars - npanels * page - 1]
)
bbox = mpl.transforms.Bbox([[0.0, ylow], [8.5, 11]])
fig.savefig(f"{sf[0]}_page{page:02d}{sf[1]}", bbox_inches=bbox)
else:
fig.suptitle(title, fontsize=fs + 1)
plt.ioff()
fig.savefig(savefile, bbox_inches="tight")
return axes
[docs]
def plot_modelfit(data, uncert, indparams, model, title, nbins=75,
fignum=1400, savefile=None):
"""
Plot the binned dataset with given uncertainties and model curves as
a function of indparams. In a lower panel, plot the residuals between
the data and model.
Parameters:
data (1D np.ndarray): Input data set.
uncert (1D np.ndarray): One-sigma uncertainties of the data points.
indparams (1D np.ndarray): Independent variable (X axis) of the
data points.
model (1D np.ndarray): Model of data.
nbins (int): Number of bins in the output plot.
fignum (int): Figure number.
savefile (bool): Name of file to save the plot, if not none.
Returns:
ax (matplotlib.axes.Axes): Axes instance containing the marginal
posterior distributions.
"""
# Bin down array:
binsize = int((np.size(data) - 1) / nbins + 1)
binindp = ms.bin_array(indparams, binsize)
binmodel = ms.bin_array(model, binsize)
bindata, binuncert = ms.bin_array(data, binsize, uncert)
fs = 12
plt.figure(fignum, figsize=(8, 6))
plt.clf()
# Residuals:
rax = plt.axes([0.15, 0.1, 0.8, 0.2])
rax.errorbar(binindp, bindata - binmodel, binuncert, fmt="ko", ms=4)
rax.plot([indparams[0], indparams[-1]], [0, 0], "k:", lw=1.5)
rax.tick_params(labelsize=fs - 1, direction="in", top=True, right=True)
rax.set_xlabel("Wavenumber (cm^-1)", fontsize=fs)
rax.set_ylabel("Residual", fontsize=fs)
rax.invert_xaxis()
# Data and Model:
ax = plt.axes([0.15, 0.35, 0.8, 0.575])
ax.errorbar(binindp, bindata, binuncert,
fmt="ko", ms=4, label="Binned data")
ax.plot(indparams, data, "tab:blue", lw=3, label="Data")
ax.plot(indparams, model, "tab:purple",
linestyle="-.", lw=3, label="Best Fit")
ax.set_xticklabels([])
ax.invert_xaxis()
ax.tick_params(labelsize=fs - 1, direction="in", top=True, right=True)
ax.set_ylabel("Absorbance", fontsize=fs)
ax.legend(loc="best")
if savefile is not None:
plt.suptitle(title, fontsize=fs + 1)
plt.ioff()
plt.savefig(savefile, bbox_inches="tight")
return ax, rax