Source code for daops.utils.core

import collections

import xarray as xr

from daops import logging
from daops.utils import fixer

LOGGER = logging.getLogger(__file__)


def _wrap_sequence(obj):
    if isinstance(obj, str):
        obj = [obj]
    return obj


[docs]def is_dataref_characterised(dset): return True
[docs]def is_characterised(collection, require_all=False): """ Takes in a collection (an individual data reference or a sequence of them). Returns an ordered dictionary of a collection of ids with a boolean value for each stating whether the dataset has been characterised. If `require_all` is True: return a single Boolean value. :param collection: one or more data references :param require_all: Boolean to require that all must be characterised :return: Ordered Dictionary OR Boolean (if `require_all` is True) """ collection = _wrap_sequence(collection) resp = collections.OrderedDict() for dset in collection: _is_char = is_dataref_characterised(dset) if require_all and not _is_char: return False resp[dset] = is_dataref_characterised(dset) return resp
[docs]def open_dataset(ds_id, file_paths): """ Opens an xarray Dataset and applies fixes if required. Fixes are applied to the data either before or after the dataset is opened. Whether a fix is a 'pre-processor' or 'post-processor' is defined in the fix itself. :param ds_id: Dataset identifier in the form of a drs id e.g. cmip5.output1.INM.inmcm4.rcp45.mon.ocean.Omon.r1i1p1.latest.zostoga :param file_paths: (list) The file paths corresponding to the ds id. :return: xarray Dataset with fixes applied to the data. """ fix = fixer.Fixer(ds_id) if fix.pre_processor: for pre_process in fix.pre_processors: LOGGER.info(f"Loading data with pre_processor: {pre_process.__name__}") else: LOGGER.info(f"Loading data") ds = xr.open_mfdataset( file_paths, preprocess=fix.pre_processor, use_cftime=True, combine="by_coords" ) if fix.post_processors: for post_process in fix.post_processors: func, operands = post_process LOGGER.info(f"Running post-processing function: {func.__name__}") ds = func(ds, **operands) return ds