Source code for mao_merge_45m.merge

__all__ = ["merge"]


# standard library
from datetime import datetime, timezone
from math import inf
from pathlib import Path
from typing import Optional

# dependencies
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from zarr import ZipStore
from . import __version__


# main features
[docs]def merge( path_correlator_zarr: Path, path_merged_zarr: Optional[Path] = None, *, path_accelerometer_zarr: Optional[Path] = None, path_weather_zarr: Optional[Path] = None, path_antenna_zarr: Optional[Path] = None, path_sam45_zarr: Optional[Path] = None, interpolation: str = "linear", time_offset: int = 0, overwrite: bool = False, progress: bool = False, ) -> Path: """Merge Zarr files of measured data into a single Zarr file. Args: path_correlator_zarr: Path of the correlator Zarr file. path_merged_zarr: Path of the merged Zarr file. path_accelerometer_zarr: Path of the accelerometer Zarr file. path_weather_zarr: Path of the weather Zarr file. path_antenna_zarr: Path of the antenna Zarr file. path_sam45_zarr: Path of the SAM45 Zarr file. interpolation: Method of interpolation of log data. time_offset: Offset time in units of ms to add to correlator data overwrite: Whether to overwrite the merged Zarr file if exists. progress: Whether to show a progress bar. Returns: Path of the merged Zarr file. Raises: FileExistsError: Raised if the merged Zarr file exists and overwriting is not allowed (default). """ if path_merged_zarr is None: path_merged_zarr = path_correlator_zarr.with_suffix(".merged.zarr") if path_merged_zarr.exists() and not overwrite: raise FileExistsError(f"{path_merged_zarr} already exists.") # open correlator Zarr and correct time offset (if any) correlator = xr.open_zarr(path_correlator_zarr) with xr.set_options(keep_attrs=True): time_offset = np.timedelta64(time_offset, "ms") correlator.coords["time"] = correlator.coords["time"] + time_offset # add merge information correlator.attrs.update( merge_datetime=datetime.now(timezone.utc).isoformat("T", "seconds"), merge_version=__version__, ) # append metadata Zarrs to the correlator dataset for path in ( path_accelerometer_zarr, path_weather_zarr, path_antenna_zarr, path_sam45_zarr, ): if path is None: continue ds = xr.open_zarr(path).interp_like(correlator, interpolation) correlator.coords.update(ds) # save the merged dataset to the merged Zarr with ProgressBar(0.0 if progress else inf): if path_merged_zarr.suffix == ".zip": with ZipStore(path_merged_zarr, mode="w") as store: correlator.to_zarr(store) else: correlator.to_zarr(path_merged_zarr, mode="w") return path_merged_zarr