# standard library
from dataclasses import dataclass
from pathlib import Path
from typing import Literal, Optional, Sequence, Union, Tuple
# third-party packages
import numpy as np
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar
from xarray_dataclasses import AsDataArray, AsDataset, Attr, Data, Dataof, Coord
# constants
JST_HOURS = np.timedelta64(9, "h")
LOG_DTYPE = [
("time", "U19"),
("array", "U2"),
("mode", "U4"),
("spec", ("f4", 512)),
]
LOG_TIMEFMT = "%Y%m%d%H%M%S.%f"
# type hints
Time = Literal["time"]
Chan = Literal["chan"]
# dataclasses
[docs]@dataclass
class Array(AsDataArray):
time: Coord[Time, np.datetime64]
data: Data[Tuple[Time, Chan], np.float32]
[docs]@dataclass
class A1:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A1"
units: Attr[str] = "K"
[docs]@dataclass
class A2:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A2"
units: Attr[str] = "K"
[docs]@dataclass
class A3:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A3"
units: Attr[str] = "K"
[docs]@dataclass
class A4:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A4"
units: Attr[str] = "K"
[docs]@dataclass
class A5:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A5"
units: Attr[str] = "K"
[docs]@dataclass
class A6:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A6"
units: Attr[str] = "K"
[docs]@dataclass
class A7:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A7"
units: Attr[str] = "K"
[docs]@dataclass
class A8:
data: Data[Time, float] = 0.0
long_name: Attr[str] = "SAM45 A8"
units: Attr[str] = "K"
[docs]@dataclass
class SAM45(AsDataset):
"""Representation of SAM45 logs in xarray."""
sam45_A1: Dataof[A1] = 0.0
"""Data of A1."""
sam45_A2: Dataof[A2] = 0.0
"""Data of A2."""
sam45_A3: Dataof[A3] = 0.0
"""Data of A3."""
sam45_A4: Dataof[A4] = 0.0
"""Data of A4."""
sam45_A5: Dataof[A5] = 0.0
"""Data of A5."""
sam45_A6: Dataof[A6] = 0.0
"""Data of A6."""
sam45_A7: Dataof[A7] = 0.0
"""Data of A7."""
sam45_A8: Dataof[A8] = 0.0
"""Data of A8."""
[docs]def convert(
path_log: Union[Sequence[Path], Path],
path_zarr: Optional[Path] = None,
*,
ch_min: int = 0,
ch_max: int = 4096,
length_per_chunk: int = 1000000,
overwrite: bool = False,
progress: bool = False,
) -> Path:
"""Convert a raw SAM45 log file(s) to a formatted Zarr file.
This function will make a one-dimensional antenna log outputs
with time metadata derived from the raw SAM45 log file.
Args:
path_log: Path(s) of the raw SAM45 log file(s).
ch_min: Minimum channel used for channel binning.
ch_max: Maximum channel used for channel binning.
path_zarr: Path of the formatted Zarr file (optional).
length_per_chunk: Length per chunk in the Zarr file.
overwrite: Whether to overwrite the formatted Zarr file if exists.
progress: Whether to show a progress bar.
Returns:
Path of the formatted Zarr file.
Raises:
FileExistsError: Raised if the formatted Zarr file exists
and overwriting is not allowed (default).
Notes:
The timezone of the Zarr file is not JST but UTC.
"""
# check the existence of the Zarr file
if isinstance(path_log, Path):
path_log = [path_log]
if path_zarr is None:
path_zarr = path_log[0].with_suffix(".zarr")
if path_zarr.exists() and not overwrite:
raise FileExistsError(f"{path_zarr} already exists.")
# read log file(s) and convert them to DataFrame(s)
dl = []
for path in path_log:
# read data as datasets
data = np.genfromtxt(path, dtype=LOG_DTYPE)
ds_ = xr.Dataset()
for array in np.unique(data["array"]):
where = data["array"] == array
spec = data["spec"][where]
mode = data["mode"][where]
time = data["time"][where]
on = spec[mode == "ON"]
off = spec[mode == "OFF"][0] # 1st off source
zero = spec[mode == "ZERO"]
calibrated = (on - off) / (off - zero)
datetime = pd.to_datetime(time[mode == "ON"], format=LOG_TIMEFMT)
ds_[array] = Array.new(datetime.to_numpy(), calibrated)
dl.append(ds_.sel(chan=slice(ch_min, ch_max)).mean("chan"))
# write DataFrame(s) to the Zarr file
ds = xr.concat(dl, dim="time")
ds = SAM45.new(
ds["A1"],
ds["A2"],
ds["A3"],
ds["A4"],
ds["A5"],
ds["A6"],
ds["A7"],
ds["A8"],
)
ds = ds.assign_coords(time=ds.time - JST_HOURS)
ds = ds.chunk(length_per_chunk)
if progress:
with ProgressBar():
ds.to_zarr(path_zarr, mode="w")
else:
ds.to_zarr(path_zarr, mode="w")
return path_zarr