Source code for mao_merge_45m.weather

__all__ = ["convert"]


# standard library
from pathlib import Path
from typing import Optional, Sequence, Union, cast


# third-party packages
import numpy as np
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar


# constants
CSV_COLS = "time", "wind_speed", "wind_direction"
JST_HOURS = np.timedelta64(9, "h")


[docs]def convert( path_csv: Union[Sequence[Path], Path], path_zarr: Optional[Path] = None, *, length_per_chunk: int = 1000000, overwrite: bool = False, progress: bool = False, ) -> Path: """Convert a raw CSV file(s) to a formatted Zarr file. This function will make a one-dimensional weather log outputs with time metadata derived from the raw CSV file. Args: path_csv: Path(s) of the raw CSV file(s). path_zarr: Path of the formatted Zarr file (optional). length_per_chunk: Length per chunk in the Zarr file. overwrite: Whether to overwrite the formatted Zarr file if exists. progress: Whether to show a progress bar. Returns: Path of the formatted Zarr file. Raises: FileExistsError: Raised if the formatted Zarr file exists and overwriting is not allowed (default). Notes: The timezone of the Zarr file is not JST but UTC. """ # check the existence of the Zarr file if isinstance(path_csv, Path): path_csv = [path_csv] if path_zarr is None: path_zarr = path_csv[0].with_suffix(".zarr") if path_zarr.exists() and not overwrite: raise FileExistsError(f"{path_zarr} already exists.") # read CSV file(s) and convert them to DataFrame(s) df = pd.DataFrame( columns=CSV_COLS[1:], index=pd.Index([], name=CSV_COLS[0]), ) for path in path_csv: df_ = pd.read_csv( path, names=CSV_COLS, index_col=0, parse_dates=True, ) df = pd.concat([df, df_]).drop_duplicates() # write DataFrame(s) to the Zarr file ds = cast(xr.Dataset, df.to_xarray()) ds = ds.assign_coords(time=ds.time - JST_HOURS) ds = ds.chunk(length_per_chunk) ds.time.attrs.update( long_name="Measured time", ) ds.wind_speed.attrs.update( long_name=CSV_COLS[1], units="m/s", ) ds.wind_direction.attrs.update( long_name=CSV_COLS[2], units="degree", ) if progress: with ProgressBar(): ds.to_zarr(path_zarr, mode="w") else: ds.to_zarr(path_zarr, mode="w") return path_zarr