Source code for lr_reduction.output

"""
Write R(q) output
"""

import json

import numpy as np
from plot_publisher import plot1d

from lr_reduction.stitching import (
    OverlapScalingFactor,
    ReducedData,
    StitchingConfiguration,
    StitchingType,
    scaling_factor_critical_edge,
)

from . import __version__ as version


[docs] class RunCollection: """ A collection of runs to assemble into a single R(Q) """ def __init__(self, average_overlap=False, stitching_configuration: StitchingConfiguration | None = None): self.collection = [] self.stitching_configuration = stitching_configuration or StitchingConfiguration(StitchingType.NONE) self.stitching_reflectivity_scale_factors = [] self.average_overlap = average_overlap self.qz_all = [] self.refl_all = [] self.d_refl_all = [] self.d_qz_all = []
[docs] def add(self, q, r, dr, meta_data, dq=None): """ Add a partial R(q) to the collection Parameters ---------- q : array Q values r : array R values dr : array Error in R values meta_data : dict Meta data for the run dq : array, optional Q resolution """ if dq is None: resolution = meta_data["dq_over_q"] dq = resolution * q self.collection.append(dict(q=q, r=r, dr=dr, dq=dq, info=meta_data)) self.stitching_reflectivity_scale_factors.append(1.0)
[docs] def calculate_scale_factors(self): """ Calculate scale factors for each run in the collection """ if (self.stitching_configuration.type == StitchingType.NONE or self.stitching_configuration.type == StitchingType.ABSOLUTE_NORMALIZATION): return elif self.stitching_configuration.type == StitchingType.AUTOMATIC_AVERAGE: # Convert collection to type used in scaling factor calculation # Sorted by increasing q with references to original indices sorted_indices, sorted_collection_reduced_data = zip( *sorted( ((i, ReducedData(run['q'], run['r'], run['dr'])) for i, run in enumerate(self.collection)), key=lambda t: t[1].q[0] ) ) # Track a cumulative scale factor to properly scale subsequent runs cumulative_scale_factor = 1.0 # Apply critical edge scaling if enabled if self.stitching_configuration.normalize_first_angle: ce = scaling_factor_critical_edge(self.stitching_configuration.scale_factor_qmin, self.stitching_configuration.scale_factor_qmax, sorted_collection_reduced_data) self.stitching_reflectivity_scale_factors[sorted_indices[0]] = ce cumulative_scale_factor = ce # Apply scaling for remaining runs if len(sorted_collection_reduced_data) > 1: for i in range(1, len(sorted_indices)): overlap_sf_calculator = OverlapScalingFactor( left_data=sorted_collection_reduced_data[i - 1], right_data=sorted_collection_reduced_data[i] ) sf = overlap_sf_calculator.get_scaling_factor() cumulative_scale_factor *= sf self.stitching_reflectivity_scale_factors[sorted_indices[i]] = cumulative_scale_factor
[docs] def merge(self): """ Merge the collection of runs """ qz_all = [] refl_all = [] d_refl_all = [] d_qz_all = [] for idx, item in enumerate(self.collection): for i in range(len(item["q"])): qz_all.append(item["q"][i]) refl_all.append(item["r"][i] * self.stitching_reflectivity_scale_factors[idx]) d_refl_all.append(item["dr"][i] * self.stitching_reflectivity_scale_factors[idx]) d_qz_all.append(item["dq"][i]) qz_all = np.asarray(qz_all) refl_all = np.asarray(refl_all) d_refl_all = np.asarray(d_refl_all) d_qz_all = np.asarray(d_qz_all) idx = np.argsort(qz_all) self.qz_all = np.take_along_axis(qz_all, idx, axis=None) self.refl_all = np.take_along_axis(refl_all, idx, axis=None) self.d_refl_all = np.take_along_axis(d_refl_all, idx, axis=None) self.d_qz_all = np.take_along_axis(d_qz_all, idx, axis=None) if self.average_overlap: # New full list of points qz_all = [] refl_all = [] d_refl_all = [] d_qz_all = [] # Average information for groups of points qz = self.qz_all[0] total = self.refl_all[0] err2 = self.d_refl_all[0] ** 2 dq = self.d_qz_all[0] npts = 1.0 for i in range(1, len(self.qz_all)): if (self.qz_all[i] - qz) / qz > 0.000001: # Store the previous point qz_all.append(qz) refl_all.append(total / npts) d_refl_all.append(np.sqrt(err2) / npts) d_qz_all.append(dq) # Start a new point qz = self.qz_all[i] total = self.refl_all[i] err2 = self.d_refl_all[i] ** 2 dq = self.d_qz_all[i] npts = 1.0 else: total += self.refl_all[i] err2 += self.d_refl_all[i] ** 2 npts += 1.0 # Store the last point qz_all.append(qz) refl_all.append(total / npts) d_refl_all.append(np.sqrt(err2) / npts) d_qz_all.append(dq) self.qz_all = np.asarray(qz_all) self.refl_all = np.asarray(refl_all) self.d_refl_all = np.asarray(d_refl_all) self.d_qz_all = np.asarray(d_qz_all)
[docs] def save_ascii(self, file_path, meta_as_json=False): """ Save R(Q) in ASCII format. This function merges the data before saving. It writes metadata and R(Q) data to the specified file in ASCII format. The metadata includes experiment details, reduction version, run title, start time, reduction time, and other optional parameters. The R(Q) data includes Q, R, dR, and dQ values. Parameters ---------- file_path : str The path to the file where the ASCII data will be saved. meta_as_json : bool, optional If True, metadata will be written in JSON format. Default is False. """ self.calculate_scale_factors() self.merge() with open(file_path, "w") as fd: # Write meta data initial_entry_written = False for i, item in enumerate(self.collection): _meta = item["info"] if not initial_entry_written: fd.write("# Experiment %s Run %s\n" % (_meta["experiment"], _meta["run_number"])) fd.write("# Reduction %s\n" % version) fd.write("# Run title: %s\n" % _meta["run_title"]) fd.write("# Run start time: %s\n" % _meta["start_time"]) fd.write("# Reduction time: %s\n" % _meta["time"]) if "q_summing" in _meta: fd.write("# Q summing: %s\n" % _meta["q_summing"]) if "tof_weighted" in _meta: fd.write("# TOF weighted: %s\n" % _meta["tof_weighted"]) if "bck_in_q" in _meta: fd.write("# Bck in Q: %s\n" % _meta["bck_in_q"]) if "theta_offset" in _meta: fd.write("# Theta offset: %s\n" % _meta["theta_offset"]) fd.write("# Stitching type: %s\n" % self.stitching_configuration.type.value) if self.stitching_configuration.type != StitchingType.NONE: fd.write("# Scale factor q min: %s\n" % self.stitching_configuration.scale_factor_qmin) fd.write("# Scale factor q max: %s\n" % self.stitching_configuration.scale_factor_qmax) if meta_as_json: fd.write("# Meta:%s\n" % json.dumps(_meta)) fd.write("# DataRun NormRun TwoTheta(deg) LambdaMin(A) ") fd.write("LambdaMax(A) Qmin(1/A) Qmax(1/A) SF_A SF_B SF\n") fd.write("") if "scaling_factors" in _meta: a = _meta["scaling_factors"]["a"] b = _meta["scaling_factors"]["b"] else: a = 1 b = 0 two_theta = _meta["theta"] * 360 / np.pi value_list = ( _meta["run_number"], _meta["norm_run"], two_theta, _meta["wl_min"], _meta["wl_max"], _meta["q_min"], _meta["q_max"], a, b, self.stitching_reflectivity_scale_factors[i], ) fd.write("# %-9s %-9s %-14.6g %-14.6g %-12.6g %-12.6s %-12.6s %-12.6s %-12.6s %-12.6s\n" % value_list) initial_entry_written = True # Write R(q) fd.write("# %-21s %-21s %-21s %-21s\n" % ("Q [1/Angstrom]", "R", "dR", "dQ [FWHM]")) fd.writelines( "%20.16f %20.16f %20.16f %20.16f\n" % (self.qz_all[i], self.refl_all[i], self.d_refl_all[i], self.d_qz_all[i]) for i in range(len(self.qz_all)) )
[docs] def add_from_file(self, file_path): """ Read a partial result file and add it to the collection Parameters ---------- file_path : str The path to the file to be read """ _q, _r, _dr, _dq, _meta = read_file(file_path) self.add(_q, _r, _dr, _meta, dq=_dq)
[docs] def plot(self): """ Plot the combined reflectivity curve for a collection of runs Returns ------- str HTML div containing the combined reflectivity curve plot """ refl_curves = [] run_names = [] for i, item in enumerate(self.collection): refl_curves.append([item["q"], item["r"] * self.stitching_reflectivity_scale_factors[i], item["dr"] * self.stitching_reflectivity_scale_factors[i], item["dq"]]) run_names.append(f"Run: {item['info']['run_number']} " f"SF: {self.stitching_reflectivity_scale_factors[i]:.3f}") # run_number parameter is only used when publish=True return plot1d(run_number="dummy_run", data_list=refl_curves, data_names=run_names, instrument='REF_L', x_title=u"Q (1/A)", x_log=True, y_title="Reflectivity", y_log=True, show_dx=False, publish=False)
[docs] def read_file(file_path): """ Read a data file and extract meta data Parameters ---------- file_path : str The path to the file to be read """ _meta = dict() with open(file_path, "r") as fd: for line in fd: if line.startswith("# Meta:"): _meta = json.loads(line[len("# Meta:") : -1]) try: _q, _r, _dr, _dq = np.loadtxt(file_path).T except: # noqa: E722 print("Could not read file. It may have no points") _q = _r = _dr = _dq = [] return _q, _r, _dr, _dq, _meta