Source code for daops.utils.core

import collections

import xarray as xr
from elasticsearch import exceptions
from loguru import logger
from roocs_utils.xarray_utils.xarray_utils import open_xr_dataset, is_kerchunk_file

from .base_lookup import Lookup
from daops import CONFIG
from daops.utils import fixer


def _wrap_sequence(obj):
    if isinstance(obj, str):
        obj = [obj]
    return obj


[docs] class Characterised(Lookup): """ Characterisation lookup class to look up whether a dataset has been characterised. """
[docs] def lookup_characterisation(self): """ Attempts to find datasets in the characterisation store. Returns True if they exist in the store, returns False if not. """ id = self._convert_id(self.dset) try: self.es.get(index=CONFIG["elasticsearch"]["character_store"], id=id) return True except exceptions.NotFoundError: return False
[docs] def is_characterised(collection, require_all=False): """ Takes in a collection (an individual data reference or a sequence of them). Returns an ordered dictionary of a collection of ids with a boolean value for each stating whether the dataset has been characterised. If `require_all` is True: return a single Boolean value. :param collection: one or more data references :param require_all: Boolean to require that all must be characterised :return: Ordered Dictionary OR Boolean (if `require_all` is True) """ collection = _wrap_sequence(collection) resp = collections.OrderedDict() for dset in collection: _is_char = Characterised(dset).lookup_characterisation() if require_all and not _is_char: return False resp[dset] = Characterised(dset).lookup_characterisation() return resp
[docs] def open_dataset(ds_id, file_paths, apply_fixes=True): """ Opens an xarray Dataset and applies fixes if requested. Fixes are applied to the data either before or after the dataset is opened. Whether a fix is a 'pre-processor' or 'post-processor' is defined in the fix itself. :param ds_id: Dataset identifier in the form of a drs id e.g. cmip5.output1.INM.inmcm4.rcp45.mon.ocean.Omon.r1i1p1.latest.zostoga :param file_paths: (list) The file paths corresponding to the ds id. :param apply_fixes: Boolean. If True fixes will be applied to datasets if needed. Default is True. :return: xarray Dataset with fixes applied to the data. """ if apply_fixes and not is_kerchunk_file(ds_id): fix = fixer.Fixer(ds_id) if fix.pre_processor: for pre_process in fix.pre_processors: logger.info(f"Loading data with pre_processor: {pre_process.__name__}") else: logger.info(f"Loading data") ds = open_xr_dataset(file_paths, preprocess=fix.pre_processor) if fix.post_processors: for post_process in fix.post_processors: func, operands = post_process logger.info(f"Running post-processing function: {func.__name__}") ds = func(ds_id, ds, **operands) else: ds = open_xr_dataset(file_paths) return ds