__all__ = ["convert", "to_zarr"]
# standard library
import re
from pathlib import Path
from struct import Struct
from typing import (
Any,
BinaryIO,
Callable,
cast,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
)
# third-party packages
import numpy as np
import pandas as pd
import xarray as xr
from tomlkit import parse
from tqdm import tqdm
from dask.diagnostics import ProgressBar
# constants
DIM = "time"
BIG_ENDIAN = ">"
HEADER_REMOVAL = re.compile(r"\x00| ")
HEADER_SECTION = re.compile(r"^(\$+)(.+)$")
HEADER_SIZE = re.compile(r"HeaderSiz\s+=\s+(\d+)")
HEADER_VALUE = re.compile(r"^\s*(\w+)\s*=\s*(.+)$")
CHANNEL_NAME = re.compile("^CH")
TIME_NAME = "Measured time"
JST_HOURS = np.timedelta64(9, "h")
# type hints
Header = Dict[str, Any]
Data = pd.DataFrame
# main features
[docs]def convert(
path_raw_zarr: Union[Sequence[Path], Path],
path_fmt_zarr: Optional[Path] = None,
*,
length_per_chunk: int = 1000000,
overwrite: bool = False,
progress: bool = False,
) -> Path:
"""Convert a raw Zarr file(s) to a formatted Zarr file.
This function will make a one-dimensional accelerometer outputs
with time metadata derived from the raw Zarr file.
Args:
path_raw_vdif: Path(s) of the raw VDIF file(s).
path_fmt_zarr: Path of the formatted Zarr file (optional).
length_per_chunk: Length per chunk in the Zarr file.
overwrite: Whether to overwrite the formatted Zarr file if exists.
progress: Whether to show a progress bar.
Returns:
Path of the formatted Zarr file.
Raises:
FileExistsError: Raised if the formatted Zarr file exists
and overwriting is not allowed (default).
Notes:
The timezone of the Zarr file is not JST but UTC.
"""
# check the existence of the Zarr file
if isinstance(path_raw_zarr, Path):
path_raw_zarr = [path_raw_zarr]
if path_fmt_zarr is None:
path_fmt_zarr = path_raw_zarr[0].with_suffix(".dist.zarr")
if path_fmt_zarr.exists() and not overwrite:
raise FileExistsError(f"{path_fmt_zarr} already exists.")
# open the Zarr file(s) and concatenate them
ds_raw = xr.concat(map(xr.open_zarr, path_raw_zarr), DIM).sortby(DIM)
ds_raw = ds_raw.assign_coords(time=ds_raw.time - JST_HOURS)
ds_raw = ds_raw.chunk(length_per_chunk)
# write arrays to the Zarr file
ds_fmt = xr.Dataset()
for key, da in ds_raw.data_vars.items():
name, unit = key.split()
if CHANNEL_NAME.search(name) is None:
continue
da.attrs.update(
long_name=name,
units=unit.strip("()"),
)
ds_fmt[f"accelerometer_{name.lower()}"] = da
ds_fmt.time.attrs.update(long_name=TIME_NAME)
if progress:
with ProgressBar():
ds_fmt.to_zarr(path_fmt_zarr, mode="w")
else:
ds_fmt.to_zarr(path_fmt_zarr, mode="w")
return path_fmt_zarr
[docs]def to_zarr(
path_gbd: Path,
path_zarr: Optional[Path] = None,
*,
length_per_chunk: int = 1000000,
encoding: str = "shift-jis",
overwrite: bool = False,
progress: bool = False,
) -> Path:
"""Convert a GBD file to a Zarr file.
This function focuses on the conversion between formats.
The formatted Zarr file with metadata will be
made by another function (convert).
Args:
path_gbd: Path of the GBD file.
path_zarr: Path of the Zarr file (optional).
length_per_chunk: Length per chunk in the Zarr file.
encoding: Encoding of the GBD file.
overwrite: Whether to overwrite the Zarr file if exists.
progress: Whether to show a progress bar.
Returns:
Path of the Zarr file.
Raises:
FileExistsError: Raised if the Zarr file exists
and overwriting is not allowed (default).
"""
# check the existence of the Zarr file
if path_zarr is None:
path_zarr = path_gbd.with_suffix(".zarr")
if path_zarr.exists() and not overwrite:
raise FileExistsError(f"{path_zarr} already exists.")
# Read the GBD file and write it to the Zarr file
data = get_data(path_gbd, encoding, progress)
ds = cast(xr.Dataset, data.to_xarray()).chunk(length_per_chunk)
ds.to_zarr(path_zarr, mode="w")
return path_zarr
def get_header(path: Path, encoding: str = "shift-jis") -> Header:
"""Return the header of a GBD file as a dictionary."""
with path.open("rb") as f:
header = f.read(get_header_size(path, encoding)).decode(encoding)
header_rows = HEADER_REMOVAL.sub("", header).split()
toml_rows = []
section = ["", "", ""]
for row in header_rows:
toml_rows.append(parse_header_row(row, section))
return parse("\n".join(toml_rows))
def get_data(
path: Path,
encoding: str = "shift-jis",
progress: bool = False,
) -> Data:
"""Return the data of a GBD file as a pandas DataFrame."""
header = get_header(path, encoding)
names = get_data_names(header)
units = get_data_units(header)
scales = get_data_scales(header)
length = get_data_length(header)
index = get_data_index(header)
read = get_data_reader(header)
data = np.zeros([length, len(names)])
with path.open("rb") as f:
# skip header
f.read(get_header_size(path, encoding))
for i in tqdm(range(length), disable=not progress):
data[i] = read(f)
data /= scales
cols = [f"{name} ({unit})" for name, unit in zip(names, units)]
return pd.DataFrame(data, index, cols)
# helper features (header)
def get_header_size(path: Path, encoding: str = "shift-jis") -> int:
"""Return the byte size of a GBD file (multiple of 2048)."""
with path.open("rb") as f:
header = f.read(2048).decode(encoding)
if not (m := HEADER_SIZE.search(header)):
raise ValueError("Could not find header size.")
return int(m.group(1))
def parse_header_row(row: str, section: List[str]) -> str:
"""Parse a row of a header and return a TOML string."""
if m := HEADER_SECTION.search(row):
depth, name = len(m.group(1)), str(m.group(2))
if depth == 1:
section[:] = [name, "", ""]
elif depth == 2:
section[1:] = [name, ""]
elif depth == 3:
section[2:] = [name]
else:
raise ValueError(row)
return "[" + ".".join(filter(len, section)) + "]"
if m := HEADER_VALUE.search(row):
key, values = str(m.group(1)), str(m.group(2)).split(",")
values = list(map(to_parsable, values))
if len(values) == 1:
values = values[0]
return f"{key} = {values!r}"
raise ValueError(row)
def to_parsable(obj: str) -> Union[str, int]:
"""Convert an object to an integer or a string."""
try:
return int(obj)
except ValueError:
return obj.strip('"')
# helper features (data)
def get_data_names(header: Header) -> List[str]:
"""Return names (labels) of data."""
return header["Common"]["Data"]["Order"]
def get_data_length(header: Header) -> int:
"""Return the number of samples along time axis."""
return int(header["Common"]["Data"]["Counts"])
def get_data_index(header: Header) -> pd.DatetimeIndex:
"""Return index of data along time axis."""
start = "T".join(header["Common"]["Time"]["Start"])
freq = header["Common"]["Data"]["Sample"]
periods = get_data_length(header)
return pd.date_range(start, None, periods, freq, name=DIM)
def get_data_units(header: Header) -> List[Optional[str]]:
"""Return units (V, mV, ...) of data."""
names = get_data_names(header)
scales = header["Measure"]["Scale"]
units = []
for name in names:
if name in scales:
units.append(scales[name][6])
else:
units.append(None)
return units
def get_data_types(header: Header) -> List[Optional[str]]:
"""Return types (voltage, temperature, humidity) of data."""
names = get_data_names(header)
types = []
for name in names:
if name in (amp := header["Amp"]):
types.append(amp[name][1])
else:
types.append(None)
return types
def get_data_ranges(header: Header) -> List[Optional[str]]:
"""Return voltage range (units of V or mV) of data."""
names = get_data_names(header)
ranges = []
for name in names:
if name in (amp := header["Amp"]):
ranges.append(amp[name][2])
else:
ranges.append(None)
return ranges
def get_data_scales(header: Header) -> List[float]:
"""Return scale (dimensionless) of data."""
names = get_data_names(header)
types = get_data_types(header)
ranges = get_data_ranges(header)
scales = []
for name, type_, range_ in zip(names, types, ranges):
if type_ is None or range_ is None:
scales.append(1)
continue
if type_ == "TEMP":
scales.append(10)
continue
scale = 1
if "1" in range_:
scale *= 2
elif "2" in range_:
scale *= 1
elif "4" in range_:
scale *= 5
elif "5" in range_:
scale *= 4
else:
raise ValueError(range_)
if range_ in ("10mV", "20mV"):
scale *= 1000
elif range_ in ("50mV", "100mV", "200mV"):
scale *= 100
elif range_ in ("500mV", "1V", "2V"):
scale *= 10
elif range_ in ("5V", "10V", "20V"):
scale *= 1
elif range_ in ("50V", "100V", "200V"):
scale *= 0.1
elif range_ in ("500V", "1000V"):
scale *= 0.01
else:
raise ValueError(range_)
if "mV" in range_:
scale *= 1
else:
scale *= 1000
scales.append(scale)
return scales
def get_data_reader(header: Header) -> Callable[[BinaryIO], Tuple]:
"""Make a binary reader function for data."""
names = get_data_names(header)
format_ = ""
for name in names:
if "CH" in name:
format_ += "h"
elif "Pulse" in name:
format_ += "L"
elif "Logic" in name:
format_ += "H"
elif "Alarm" in name:
format_ += "H"
elif "AlOut" in name:
format_ += "H"
elif "Status" in name:
format_ += "H"
else:
raise ValueError(name)
struct = Struct(BIG_ENDIAN + format_)
def reader(f: BinaryIO) -> Tuple:
return struct.unpack(f.read(struct.size))
return reader