Source code for tavi.library.storage.loader.ornl_spice_loader
"""ORNL Spice format loader."""
import logging
import warnings
import xml.etree.ElementTree as ET
from typing import Any
import numpy as np
from tavi.backend.classification.rule_based_classifier import RuleBasedClassifier
from tavi.backend.classification.rule_set.ornl_spice_rule_set import ORNLSpiceRuleSet
from tavi.library.data.enum.raw_scan_type import RawScanType
from tavi.library.data.scan import UUID, Provenance, RawScan, Scan, ScanData, ScanMetadata, TaviMetadata
from tavi.library.storage.interface.file_store_interface import FileStoreInterface
from tavi.library.storage.loader.interface.base import AbstractLoader
logger = logging.getLogger(__name__)
[docs]
class ORNLSpiceLoader(AbstractLoader):
"""Loader for ORNL Spice format scan files."""
def __init__(self, filestore: FileStoreInterface) -> None:
"""Initialize ORNL Spice loader with classifier."""
super().__init__(filestore)
self.classifier = RuleBasedClassifier(filestore)
self.classification_rules = ORNLSpiceRuleSet()
[docs]
def load(self, file_path: str) -> Scan:
"""Load scan data."""
uuid = self.generate_uuid(file_path)
values = self.parse_scan_values(file_path)
meta = self.parse_metadata(file_path)
tavi_meta = self.parse_tavi_metadata(file_path)
prov = self.create_provenance(file_path)
# get ubconf file name
ub_name = meta.ubconf
ubconf = self.parse_external_metadata(file_path, ub_name)
# add it to MetaData's data entry
meta.data.update(ubconf)
return self.adapt_scan_data(uuid=uuid, values=values, meta=meta, tavi_meta=tavi_meta, prov=prov)
[docs]
def get_scan_type(self) -> RawScanType:
"""Get scan type (ORNLSpice)."""
return RawScanType.ORNLSpice
[docs]
def get_score(self, file_path: str) -> float:
"""Get score for scan."""
self.classifier.set_filestore(self.filestore)
return self.classifier.get_score(file_path, self.classification_rules)
[docs]
def parse_metadata(self, file_path: str) -> ScanMetadata:
"""Parse metadata."""
f = self.filestore.read_text_file(file_path=file_path)
all_content = f.splitlines()
headers = [line.strip() for line in all_content if "#" in line]
index_col_name = headers.index("# col_headers =")
col_names = headers[index_col_name + 1].strip("#").split()
# remove the dot before it causes problem
# index_of_pt = col_names.index("Pt.")
# col_names[index_of_pt] = "Pt"
metadata_list = headers[:index_col_name]
error_messages = headers[index_col_name + 2 :]
index_sum_count = [i for i, header in enumerate(headers) if header.startswith("# Sum of Counts =")]
# in case "Sum of Counts" doesn't exist
# happens to the last scan after beam is down
if len(index_sum_count) != 0:
metadata_list += headers[index_sum_count[0] :]
error_messages = error_messages[: index_sum_count[0] - len(headers)]
metadata = {}
others = []
for metadata_entry in metadata_list:
line = metadata_entry.strip("# ")
if "completed" in line or "stopped" in line: # last line
parts = line.split(" ")
end_time = parts[3] + " " + parts[0] + " " + parts[1]
metadata.update({"end_time": end_time})
# elif line[-1] == "=": # empty line
# unused.append(line[:-2]) # remove " ="
elif "=" in line: # useful line
parts = line.split("=")
key = parts[0].strip()
val = "=".join(parts[1:])[1:] # remove the first space character
metadata.update({key: val})
else: # how did you get here?
others.append(line)
if metadata.get("preset_type") == "countfile": # HB1 in polarization mode
countfile = []
for metadata_entry in metadata_list:
if metadata_entry.startswith("# countfile"):
_, val = metadata_entry.split("=")
countfile.append(val.strip())
metadata.update({"countfile": ", ".join(countfile)})
data = metadata | {"errors": error_messages} | {"others": others}
return ScanMetadata(data=data)
[docs]
def parse_tavi_metadata(self, file_path: str) -> TaviMetadata:
"""Parse metadata."""
instrument_name = ""
if "HB1A" in file_path:
instrument_name = "HB1A"
if "CG4C" in file_path:
instrument_name = "CG4C"
if "HB1" in file_path:
instrument_name = "HB1"
if "HB3" in file_path:
instrument_name = "HB3"
f = self.filestore.read_text_file(file_path=file_path)
all_content = f.splitlines()
headers = [line.strip() for line in all_content if "#" in line]
index_col_name = headers.index("# col_headers =")
metadata_list = headers[:index_col_name]
preset_channel = ""
preset_value = 0.0
def_x, def_y = "", ""
friendly_path = "IPTS-"
exp = "exp"
s = "scan"
for metadata_entry in metadata_list:
if metadata_entry.startswith("# scan "):
_, val = metadata_entry.split("=")
s += val.strip().zfill(4)
if metadata_entry.startswith("# proposal"):
_, val = metadata_entry.split("=")
friendly_path += val.strip()
if metadata_entry.startswith("# experiment_number"):
_, val = metadata_entry.split("=")
exp += val.strip().zfill(4)
if metadata_entry.startswith("# preset_channel"):
_, val = metadata_entry.split("=")
preset_channel = val
if metadata_entry.startswith("# preset_value"):
_, val = metadata_entry.split("=")
preset_value = val
if metadata_entry.startswith("# def_x"):
_, val = metadata_entry.split("=")
def_x = val
if metadata_entry.startswith("# def_y"):
_, val = metadata_entry.split("=")
def_y = val
friendly_name = instrument_name + "_" + exp + "_" + s
return TaviMetadata(
default_axis=(def_x, def_y),
friendly_name=friendly_name,
friendly_path=friendly_path,
normalization=(preset_channel, preset_value),
)
[docs]
def parse_scan_values(self, file_path: str) -> ScanData:
"""Parse scan values."""
f = self.filestore.read_text_file(file_path=file_path)
all_content = f.splitlines()
headers = [line.strip() for line in all_content if "#" in line]
index_col_name = headers.index("# col_headers =")
col_names = headers[index_col_name + 1].strip("#").split()
try:
with warnings.catch_warnings():
# Treat all warnings as exceptions within this block
warnings.simplefilter("error")
try:
col_values = np.genfromtxt(file_path, comments="#")
except ValueError as e:
logger.error(e)
col_values = np.array(None)
except Warning as e:
# exception happens when there is no valid measurements but all warnings.
# see HB1_exp0815_scan0001.dat file
logger.error(e)
col_values = np.array(None)
data = dict()
for col_name in col_names:
# guard against invalid format
if col_name[0].isdigit(): # can't start with digit, replace with _
col_name = "_" + col_name
attr_name = (
col_name.replace("-", "_").replace(" ", "_").replace(".", "")
) # replace "-", " ", with "_", remove any "."
if col_values.ndim > 1:
data[attr_name] = col_values[:, col_names.index(col_name)]
# sometimes data only have 1 entry, then we don't need to slice the data.
elif col_values.ndim == 1:
data[attr_name] = np.array([col_values[col_names.index(col_name)]])
else:
data[attr_name] = []
return ScanData(data=data)
[docs]
def parse_external_metadata(self, file_path: str, ub_name: str) -> dict[str, Any]:
"""Parse corresponding file in ubconf as external metadata."""
root_path = file_path
for _ in range(2):
root_path = self.filestore.get_parent(root_path)
ubconf_path = self.filestore.join_path(root_path, "UBConf")
ubconf_path = self.filestore.join_path(ubconf_path, ub_name)
try:
return self._parse_ubconf(ubconf_path=ubconf_path)
except FileNotFoundError:
return {}
def _parse_ubconf(self, ubconf_path: str) -> dict[str, Any]:
"""Parse a .ini file in ubconf folder for ORNL TAS data."""
ubconf: dict[str, Any] = {}
f = self.filestore.read_text_file(file_path=ubconf_path)
all_content = f.splitlines()
if all_content[0] == "[UBMode]":
for idx, line in enumerate(all_content):
if line.strip() == "":
continue # skip if empty
elif line.strip()[0] == "[":
continue # skiplines like "[xx]"
key, val = line.strip().split("=")
if key == "Mode":
mode_name = all_content[idx - 1].strip()
if mode_name == "[UBMode]":
ubconf.update({"UBMode": int(val)})
elif mode_name == "[AngleMode]":
ubconf.update({"AngleMode": int(val)})
elif "," in val: # string of vector to array
ubconf.update({key: np.array([float(v) for v in val.strip('"').split(",")])})
elif val == '""': # no value
pass
else: # float
ubconf.update({key: float(val)})
else: # xml junk from C#
tree = ET.parse(ubconf_path)
root = tree.getroot()
for matrix in root.findall("matrix"):
ub_matrix = matrix.attrib["matrix"].split(" ")
ubconf.update({"UBMatrix": np.array([float(ub_matrix[i]) for i in range(9)])})
return ubconf
[docs]
def adapt_scan_data(
self, uuid: UUID, values: ScanData, meta: ScanMetadata, tavi_meta: TaviMetadata, prov: Provenance
) -> RawScan:
"""Adapt scan data."""
return RawScan(uuid=uuid, data=values, metadata=meta, tavimeta=tavi_meta, prov=prov)
[docs]
def create_provenance(self, file_path: str) -> Provenance:
"""Create provenance of the scan file."""
uuid = self.generate_uuid(file_path)
weight = 1
raw_file = file_path
return Provenance(raw_file=raw_file, contributing_scans={uuid: weight})