# standard library
from dataclasses import dataclass
from pathlib import Path
from typing import Literal, Optional, Sequence, Union
from functools import partial
# third-party packages
import numpy as np
import pandas as pd
from dask.diagnostics import ProgressBar
from xarray_dataclasses import AsDataset, Attr, Data, Dataof
# constants
LOG_COLS_REAL = (
"time",
"antenna_azimuth",
"antenna_elevation",
"collimator_azimuth",
"collimator_elevation",
"subref_X",
"subref_Y",
"subref_Z1",
"subref_Z2",
)
LOG_COLS_PROG = (
"time",
"prog_antenna_azimuth",
"prog_antenna_elevation",
"prog_subref_X",
"prog_subref_Y",
"prog_subref_Z1",
"prog_subref_Z2",
)
JST_HOURS = np.timedelta64(9, "h")
LOG_TIMEFMT = "%y%m%d %H%M%S.%f"
# type hints
T = Literal["time"]
# dataclasses
[docs]@dataclass
class AntennaAzimuth:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Antenna azimuth"
units: Attr[str] = "degree"
[docs]@dataclass
class AntennaElevation:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Antenna elevation"
units: Attr[str] = "degree"
[docs]@dataclass
class CollimatorAzimuth:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Collimator azimuth"
units: Attr[str] = "degree"
[docs]@dataclass
class CollimatorElevation:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Collimator elevation"
units: Attr[str] = "degree"
[docs]@dataclass
class SubrefX:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref X"
units: Attr[str] = "mm"
[docs]@dataclass
class SubrefY:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Y"
units: Attr[str] = "mm"
[docs]@dataclass
class SubrefZ1:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Z1"
units: Attr[str] = "mm"
[docs]@dataclass
class SubrefZ2:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Z2"
units: Attr[str] = "mm"
[docs]@dataclass
class ProgAntennaAzimuth:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Antenna azimuth (prog)"
units: Attr[str] = "degree"
[docs]@dataclass
class ProgAntennaElevation:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Antenna elevation (prog)"
units: Attr[str] = "degree"
[docs]@dataclass
class ProgSubrefX:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref X (prog)"
units: Attr[str] = "mm"
[docs]@dataclass
class ProgSubrefY:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Y (prog)"
units: Attr[str] = "mm"
[docs]@dataclass
class ProgSubrefZ1:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Z1 (prog)"
units: Attr[str] = "mm"
[docs]@dataclass
class ProgSubrefZ2:
data: Data[T, float] = 0.0
long_name: Attr[str] = "Subref Z2 (prog)"
units: Attr[str] = "mm"
[docs]@dataclass
class Antenna(AsDataset):
"""Representation of antenna logs in xarray."""
antenna_azimuth: Dataof[AntennaAzimuth] = 0.0
"""Azimuth of the antenna."""
antenna_elevation: Dataof[AntennaElevation] = 0.0
"""Elevation of the antenna."""
collimator_azimuth: Dataof[CollimatorAzimuth] = 0.0
"""Azimuth of the collimator."""
collimator_elevation: Dataof[CollimatorElevation] = 0.0
"""Elevation of the collimator."""
subref_X: Dataof[SubrefX] = 0.0
"""X position of a subref."""
subref_Y: Dataof[SubrefY] = 0.0
"""Y position of a subref."""
subref_Z1: Dataof[SubrefZ1] = 0.0
"""Z1 position of a subref."""
subref_Z2: Dataof[SubrefZ2] = 0.0
"""Z2 position of a subref."""
prog_antenna_azimuth: Dataof[ProgAntennaAzimuth] = 0.0
"""Azimuth of the antenna (prog)."""
prog_antenna_elevation: Dataof[ProgAntennaElevation] = 0.0
"""Elevation of the antenna (prog)."""
prog_subref_X: Dataof[ProgSubrefX] = 0.0
"""X position of a subref (prog)."""
prog_subref_Y: Dataof[ProgSubrefY] = 0.0
"""Y position of a subref (prog)."""
prog_subref_Z1: Dataof[ProgSubrefZ1] = 0.0
"""Z1 position of a subref (prog)."""
prog_subref_Z2: Dataof[ProgSubrefZ2] = 0.0
"""Z2 position of a subref (prog)."""
[docs]def get_df_real(
path_log: Path,
index: int,
) -> pd.DataFrame:
"""Helper function to read measured values from a log."""
return (
pd.read_csv(
path_log,
sep=r"\s+",
header=None,
skiprows=lambda row: row % 10 != index,
parse_dates=[[1, 2]],
index_col="1_2",
usecols=range(1, 8),
date_parser=partial(pd.to_datetime, format=LOG_TIMEFMT),
)
.astype(float)
.groupby(level=0)
.last()
.resample("100 ms")
.interpolate()
)
[docs]def get_df_prog(path_log: Path, index: int = 9) -> pd.DataFrame:
"""Helper function to read programmed values from a log."""
return (
pd.read_csv(
path_log,
sep=r"\s+",
header=None,
skiprows=lambda row: row % 10 != index,
parse_dates=[[1, 2]],
index_col="1_2",
usecols=range(1, 9),
date_parser=partial(pd.to_datetime, format=LOG_TIMEFMT),
)
.astype(float)
.groupby(level=0)
.last()
.resample("100 ms")
.interpolate()
.interpolate(method="pad")
)
[docs]def convert(
path_log: Union[Sequence[Path], Path],
path_zarr: Optional[Path] = None,
*,
length_per_chunk: int = 1000000,
overwrite: bool = False,
progress: bool = False,
) -> Path:
"""Convert a raw 50 sps log file(s) to a formatted Zarr file.
This function will make a one-dimensional antenna log outputs
with time metadata derived from the raw 50 sps log file.
Args:
path_log: Path(s) of the raw 50 sps log file(s).
path_zarr: Path of the formatted Zarr file (optional).
length_per_chunk: Length per chunk in the Zarr file.
overwrite: Whether to overwrite the formatted Zarr file if exists.
progress: Whether to show a progress bar.
Returns:
Path of the formatted Zarr file.
Raises:
FileExistsError: Raised if the formatted Zarr file exists
and overwriting is not allowed (default).
Notes:
The timezone of the Zarr file is not JST but UTC.
"""
# check the existence of the Zarr file
if isinstance(path_log, Path):
path_log = [path_log]
if path_zarr is None:
path_zarr = path_log[0].with_suffix(".zarr")
if path_zarr.exists() and not overwrite:
raise FileExistsError(f"{path_zarr} already exists.")
# read log file(s) and convert them to DataFrame(s)
df = pd.DataFrame(
columns=LOG_COLS_REAL[1:],
index=pd.DatetimeIndex([], name=LOG_COLS_REAL[0]),
)
for path in path_log:
# step 1: read measured values as dataframes
ant_az = get_df_real(path, 0)
ant_el = get_df_real(path, 1)
col_az = get_df_real(path, 2)
col_el = get_df_real(path, 3)
subref_X = get_df_real(path, 4)
subref_Y = get_df_real(path, 5)
subref_Z1 = get_df_real(path, 6)
subref_Z2 = get_df_real(path, 7)
# make index
index = ant_az.index
index = index.append(index[-1:] + pd.Timedelta("80 ms"))
index = index.to_series().asfreq("20 ms").index
# reshape data
ant_az = ant_az.to_numpy().flatten()
ant_el = ant_el.to_numpy().flatten()
col_az = col_az.to_numpy().flatten()
col_el = col_el.to_numpy().flatten()
subref_X = subref_X.to_numpy().flatten()
subref_Y = subref_Y.to_numpy().flatten()
subref_Z1 = subref_Z1.to_numpy().flatten()
subref_Z2 = subref_Z2.to_numpy().flatten()
df_real = pd.DataFrame(
data={
LOG_COLS_REAL[1]: ant_az,
LOG_COLS_REAL[2]: ant_el,
LOG_COLS_REAL[3]: col_az,
LOG_COLS_REAL[4]: col_el,
LOG_COLS_REAL[5]: subref_X,
LOG_COLS_REAL[6]: subref_Y,
LOG_COLS_REAL[7]: subref_Z1,
LOG_COLS_REAL[8]: subref_Z2,
},
index=pd.Index(index, name=LOG_COLS_REAL[0]),
)
# step 2: read programmed values as a dataframe
prog_all = get_df_prog(path)
df_prog = pd.DataFrame(
data={
LOG_COLS_PROG[1]: prog_all.iloc[:, 0],
LOG_COLS_PROG[2]: prog_all.iloc[:, 1],
LOG_COLS_PROG[3]: prog_all.iloc[:, 2],
LOG_COLS_PROG[4]: prog_all.iloc[:, 3],
LOG_COLS_PROG[5]: prog_all.iloc[:, 4],
LOG_COLS_PROG[6]: prog_all.iloc[:, 5],
},
index=pd.Index(prog_all.index, name=LOG_COLS_PROG[0]),
)
# step 3: merge dataframes
df_all = pd.concat([df_real, df_prog], axis=1).interpolate(method="pad")
df = pd.concat([df, df_all])
# write DataFrame(s) to the Zarr file
ds = Antenna.new(
df.antenna_azimuth,
df.antenna_elevation,
df.collimator_azimuth,
df.collimator_elevation,
df.subref_X,
df.subref_Y,
df.subref_Z1,
df.subref_Z2,
df.prog_antenna_azimuth,
df.prog_antenna_elevation,
df.prog_subref_X,
df.prog_subref_Y,
df.prog_subref_Z1,
df.prog_subref_Z2,
)
ds = ds.assign_coords(time=ds.time - JST_HOURS)
ds = ds.chunk(length_per_chunk)
if progress:
with ProgressBar():
ds.to_zarr(path_zarr, mode="w")
else:
ds.to_zarr(path_zarr, mode="w")
return path_zarr