import collections
import xarray as xr
from elasticsearch import exceptions
from loguru import logger
from roocs_utils.xarray_utils.xarray_utils import open_xr_dataset, is_kerchunk_file
from .base_lookup import Lookup
from daops import CONFIG
from daops.utils import fixer
def _wrap_sequence(obj):
if isinstance(obj, str):
obj = [obj]
return obj
[docs]
class Characterised(Lookup):
"""
Characterisation lookup class to look up whether a dataset has been characterised.
"""
[docs]
def lookup_characterisation(self):
"""
Attempts to find datasets in the characterisation store. Returns True if they exist in the store,
returns False if not.
"""
id = self._convert_id(self.dset)
try:
self.es.get(index=CONFIG["elasticsearch"]["character_store"], id=id)
return True
except exceptions.NotFoundError:
return False
[docs]
def is_characterised(collection, require_all=False):
"""
Takes in a collection (an individual data reference or a sequence of them).
Returns an ordered dictionary of a collection of ids with a boolean value
for each stating whether the dataset has been characterised.
If `require_all` is True: return a single Boolean value.
:param collection: one or more data references
:param require_all: Boolean to require that all must be characterised
:return: Ordered Dictionary OR Boolean (if `require_all` is True)
"""
collection = _wrap_sequence(collection)
resp = collections.OrderedDict()
for dset in collection:
_is_char = Characterised(dset).lookup_characterisation()
if require_all and not _is_char:
return False
resp[dset] = Characterised(dset).lookup_characterisation()
return resp
[docs]
def open_dataset(ds_id, file_paths, apply_fixes=True):
"""
Opens an xarray Dataset and applies fixes if requested.
Fixes are applied to the data either before or after the dataset is opened.
Whether a fix is a 'pre-processor' or 'post-processor' is defined in the
fix itself.
:param ds_id: Dataset identifier in the form of a drs id
e.g. cmip5.output1.INM.inmcm4.rcp45.mon.ocean.Omon.r1i1p1.latest.zostoga
:param file_paths: (list) The file paths corresponding to the ds id.
:param apply_fixes: Boolean. If True fixes will be applied to datasets if needed. Default is True.
:return: xarray Dataset with fixes applied to the data.
"""
if apply_fixes and not is_kerchunk_file(ds_id):
fix = fixer.Fixer(ds_id)
if fix.pre_processor:
for pre_process in fix.pre_processors:
logger.info(f"Loading data with pre_processor: {pre_process.__name__}")
else:
logger.info(f"Loading data")
ds = open_xr_dataset(file_paths, preprocess=fix.pre_processor)
if fix.post_processors:
for post_process in fix.post_processors:
func, operands = post_process
logger.info(f"Running post-processing function: {func.__name__}")
ds = func(ds_id, ds, **operands)
else:
ds = open_xr_dataset(file_paths)
return ds