Source code for mao_merge_45m.correlator

__all__ = ["convert", "to_zarr"]


# standard library
from logging import getLogger
from pathlib import Path
from struct import Struct
from typing import BinaryIO, Callable, Optional, Tuple


# third-party packages
import zarr
import dask.array as da
import numpy as np
import xarray as xr
from tqdm import tqdm
from dask.diagnostics import ProgressBar


# module-level logger
logger = getLogger(__name__)


# constants
DIMS = "time", "freq"
UINT = "I"
SHORT = "h"
LITTLE_ENDIAN = "<"
N_ROWS_VDIF_HEAD = 8
N_ROWS_CORR_HEAD = 64
N_ROWS_CORR_DATA = 512
N_BYTES_PER_UNIT = 1312
N_UNITS_PER_SCAN = 64
N_UNITS_PER_SECOND = 6400
N_SCANS_PER_SECOND = 100
N_SCANS_PER_MINUTE = 6000
N_CHANS_FOR_FORMAT = 8192
LOWER_FREQ_MHZ = 16384
SLICE_MONTH = slice(24, 30)
SLICE_SECOND = slice(0, 30)
REF_YEAR = np.datetime64("2000", "Y")
UNIT_MONTH = np.timedelta64(6, "M")
UNIT_SECOND = np.timedelta64(1, "s")
UNIT_MILLISECOND = np.timedelta64(10, "ms")
IMAG_UNIT = np.complex64(1j)  # type: ignore
TIME_NAME = "Measured time"
FREQ_NAME = "Measured frequency"
FREQ_UNIT = "GHz"
CORR_NAME = "Correlator output"
CORR_UNIT = "Arbitrary unit"


# main features
[docs]def convert( path_raw_zarr: Path, path_fmt_zarr: Optional[Path] = None, *, bin_width: int = 8, overwrite: bool = False, progress: bool = False, ) -> Path: """Convert a raw Zarr file to a formatted Zarr file. This function will make a two-dimensional correlator output with time and freq metadata derived from the raw Zarr file. It will also average the output in frequency to reduce the size. Args: path_raw_vdif: Path of the raw VDIF file. path_fmt_zarr: Path of the formatted Zarr file (optional). bin_width: Bin width for averaging the output in frequency. overwrite: Whether to overwrite the formatted Zarr file if exists. progress: Whether to show a progress bar. Returns: Path of the formatted Zarr file. Raises: FileExistsError: Raised if the formatted Zarr file exists and overwriting is not allowed (default). """ # check the existence of the Zarr file if path_fmt_zarr is None: path_fmt_zarr = path_raw_zarr.with_suffix(".dist.zarr") if path_fmt_zarr.exists() and not overwrite: raise FileExistsError(f"{path_fmt_zarr} already exists.") # open the Zarr file and reshape arrays z = zarr.open(str(path_raw_zarr)) vdif_head = da.from_zarr(z.vdif_head) # type: ignore corr_data = da.from_zarr(z.corr_data) # type: ignore vdif_head = vdif_head.flatten().reshape( ( vdif_head.shape[0] / N_UNITS_PER_SCAN, vdif_head.shape[1] * N_UNITS_PER_SCAN, ) ) corr_data = corr_data.flatten().reshape( ( corr_data.shape[0] / N_UNITS_PER_SCAN, corr_data.shape[1] * N_UNITS_PER_SCAN, ) ) # make complex correlator array correlator = corr_data[:, 0::2] + corr_data[:, 1::2] * IMAG_UNIT # use only first 8192 frequency channels correlator = correlator[:, :N_CHANS_FOR_FORMAT] # make time and freq arrays scan = np.arange(len(correlator)) month = slice_binary(vdif_head[:, 1], SLICE_MONTH) * UNIT_MONTH second = slice_binary(vdif_head[:, 0], SLICE_SECOND) * UNIT_SECOND millisecond = (scan % N_SCANS_PER_SECOND) * UNIT_MILLISECOND time = (REF_YEAR + month + second + millisecond).compute() freq = 1e-3 * (LOWER_FREQ_MHZ + np.arange(N_CHANS_FOR_FORMAT)) # bin channels correlator = correlator.reshape( ( correlator.shape[0], correlator.shape[1] // bin_width, bin_width, ) ).mean(-1) freq = freq.reshape( ( freq.shape[0] // bin_width, bin_width, ) ).mean(-1) # write arrays to the Zarr file correlator = correlator.rechunk((N_SCANS_PER_MINUTE, None)) ds = xr.Dataset( dict( time=(DIMS[0], time), freq=(DIMS[1], freq), correlator=(DIMS, correlator), ) ) ds.time.attrs.update( long_name=TIME_NAME, ) ds.freq.attrs.update( long_name=FREQ_NAME, units=FREQ_UNIT, ) ds.correlator.attrs.update( long_name=CORR_NAME, units=CORR_UNIT, ) if progress: with ProgressBar(): ds.to_zarr(path_fmt_zarr, mode="w") else: ds.to_zarr(path_fmt_zarr, mode="w") return path_fmt_zarr
[docs]def to_zarr( path_vdif: Path, path_zarr: Optional[Path] = None, *, seconds_per_chunk: int = 60, overwrite: bool = False, progress: bool = False, ) -> Path: """Convert a VDIF file to a Zarr file losslessly. This function focuses on the conversion between formats. The formatted Zarr file with metadata will be made by another function (convert). The output Zarr file from the function has three arrays. - vdif_head: 4-byte data (uint32) * 8. - corr_head: 4-byte data (uint32) * 64. - corr_data: 2-byte data (int16) * 512 (256 re/im values). Args: path_vdif: Path of the VDIF file. path_zarr: Path of the Zarr file (optional). seconds_per_chunk: Time length per chunk in the Zarr file. overwrite: Whether to overwrite the Zarr file if exists. progress: Whether to show a progress bar. Returns: Path of the Zarr file. Raises: RuntimeError: Raised if the VDIF file is truncated. FileExistsError: Raised if the Zarr file exists and overwriting is not allowed (default). """ # check the existence of the Zarr file if path_zarr is None: path_zarr = Path(path_vdif).with_suffix(".zarr") if path_zarr.exists() and not overwrite: raise FileExistsError(f"{path_zarr} already exists.") # check if the VDIF file is truncated. n_units, mod = divmod(path_vdif.stat().st_size, N_BYTES_PER_UNIT) if mod: raise RuntimeError(f"{path_vdif} may be truncated.") n_units_per_chunk = N_UNITS_PER_SECOND * seconds_per_chunk n_chunks, mod = divmod(n_units, n_units_per_chunk) if mod: raise RuntimeError("Chunk length could not divide total time.") # prepare an empty zarr file z = zarr.open(str(path_zarr), mode="w") z.empty( # type: ignore name="vdif_head", shape=(n_units, N_ROWS_VDIF_HEAD), chunks=(n_units_per_chunk, N_ROWS_VDIF_HEAD), dtype=np.dtype(UINT), # type: ignore ) z.empty( # type: ignore name="corr_head", shape=(n_units, N_ROWS_CORR_HEAD), chunks=(n_units_per_chunk, N_ROWS_CORR_HEAD), dtype=np.dtype(UINT), # type: ignore ) z.empty( # type: ignore name="corr_data", shape=(n_units, N_ROWS_CORR_DATA), chunks=(n_units_per_chunk, N_ROWS_CORR_DATA), dtype=np.dtype(SHORT), # type: ignore ) # Read the VDIF file and write it to the Zarr file read_vdif_head = make_binary_reader(N_ROWS_VDIF_HEAD, UINT) read_corr_head = make_binary_reader(N_ROWS_CORR_HEAD, UINT) read_corr_data = make_binary_reader(N_ROWS_CORR_DATA, SHORT) with open(path_vdif, "rb") as f: for i in tqdm(range(n_chunks), disable=not progress): vdif_head = [] corr_head = [] corr_data = [] for _ in range(n_units_per_chunk): vdif_head.append(read_vdif_head(f)) corr_head.append(read_corr_head(f)) corr_data.append(read_corr_data(f)) index = slice( (i + 0) * n_units_per_chunk, (i + 1) * n_units_per_chunk, ) z.vdif_head[index] = vdif_head # type: ignore z.corr_head[index] = corr_head # type: ignore z.corr_data[index] = corr_data # type: ignore return path_zarr
def to_vdif( path_zarr: Path, path_vdif: Optional[Path] = None, *, overwrite: bool = False, progress: bool = False, ) -> Path: """Convert a Zarr file to a VDIF file losslessly. Args: path_zarr: Path of the Zarr file. path_vdif: Path of the VDIF file (optional). overwrite: Whether to overwrite the VDIF file if exists. progress: Whether to show a progress bar. Returns: Path of the VDIF file. Raises: FileExistsError: Raised if the VDIF file exists and overwriting is not allowed (default). """ # check the existence of the Zarr file if path_vdif is None: path_vdif = Path(path_zarr).with_suffix(".vdif") if path_vdif.exists() and not overwrite: raise FileExistsError(f"{path_vdif} already exists.") # Read the Zarr file and write it to the VDIF file z = zarr.open(str(path_zarr), mode="r") vdif_head = z.vdif_head # type: ignore corr_head = z.corr_head # type: ignore corr_data = z.corr_data # type: ignore arrays = tqdm( zip(vdif_head, corr_head, corr_data), total=len(vdif_head), disable=not progress, ) with path_vdif.open("wb") as f: for units in arrays: for unit in units: f.write(unit.tobytes()) return path_vdif # helper features def make_binary_reader( n_repeat: int, dtype: str, ) -> Callable[[BinaryIO], Tuple]: """Make a binary reader function.""" struct = Struct(LITTLE_ENDIAN + dtype * n_repeat) def reader(f: BinaryIO) -> Tuple: return struct.unpack(f.read(struct.size)) return reader def slice_binary(val: int, slice_: slice) -> int: """Slice an integer with given bit range.""" start = int(slice_.start) stop = int(slice_.stop) length = stop - start return (val >> start) & ((1 << length) - 1)