Source code for daops.utils.core

"""Utility functions for the DAOPS package."""

import collections

from clisops.utils.dataset_utils import is_kerchunk_file, open_xr_dataset
from elasticsearch import exceptions
from loguru import logger

from daops import config_
from daops.utils import fixer

from .base_lookup import Lookup


[docs] def _wrap_sequence(obj): if isinstance(obj, str): obj = [obj] return obj
[docs] class Characterised(Lookup): """Characterisation lookup class to look up whether a dataset has been characterised."""
[docs] def lookup_characterisation(self): """Attempt to find datasets in the characterisation store. Returns True if they exist in the store, returns False if not. """ id = self._convert_id(self.dset) try: self.es.get(index=config_()["elasticsearch"]["character_store"], id=id) return True except exceptions.NotFoundError: return False
[docs] def is_characterised(collection, require_all=False): """Intake a collection (an individual data reference or a sequence of them). Returns an ordered dictionary of a collection of ids with a boolean value for each stating whether the dataset has been characterised. If `require_all` is True: return a single Boolean value. :param collection: one or more data references :param require_all: Boolean to require that all must be characterised :return: Ordered Dictionary OR Boolean (if `require_all` is True) """ collection = _wrap_sequence(collection) resp = collections.OrderedDict() for dset in collection: _is_char = Characterised(dset).lookup_characterisation() if require_all and not _is_char: return False resp[dset] = Characterised(dset).lookup_characterisation() return resp
[docs] def open_dataset(ds_id, file_paths, apply_fixes=True): """Open an xarray Dataset and apply fixes if requested. Fixes are applied to the data either before or after the dataset is opened. Whether a fix is a 'pre-processor' or 'post-processor' is defined in the fix itself. :param ds_id: Dataset identifier in the form of a drs id e.g. cmip5.output1.INM.inmcm4.rcp45.mon.ocean.Omon.r1i1p1.latest.zostoga :param file_paths: (list) The file paths corresponding to the ds id. :param apply_fixes: Boolean. If True fixes will be applied to datasets if needed. Default is True. :return: xarray Dataset with fixes applied to the data. """ if apply_fixes and not is_kerchunk_file(ds_id): fix = fixer.Fixer(ds_id) if fix.pre_processor: for pre_process in fix.pre_processors: logger.info(f"Loading data with pre_processor: {pre_process.__name__}") else: logger.info("Loading data") ds = open_xr_dataset(file_paths, preprocess=fix.pre_processor) if fix.post_processors: for post_process in fix.post_processors: func, operands = post_process logger.info(f"Running post-processing function: {func.__name__}") ds = func(ds_id, ds, **operands) else: ds = open_xr_dataset(file_paths) return ds