Source code for daops.ops.base

"""Base class for all Operations."""

from clisops.parameter import collection_parameter

from daops.processor import process
from daops.utils import consolidate, normalise


[docs] class Operation: """Base class for all Operations.""" def __init__( self, collection, file_namer="standard", split_method="time:auto", output_dir=None, output_type="netcdf", apply_fixes=True, **params, ): """Construct operation. Sets common input parameters as attributes. Parameters that are specific to each operation are handled in: self._resolve_params(). """ self._file_namer = file_namer self._split_method = split_method self._output_dir = output_dir self._output_type = output_type self._apply_fixes = apply_fixes self._resolve_params(collection, **params) self._consolidate_collection()
[docs] def _resolve_params(self, collection, **params): """Resolve the operation-specific input parameters to `self.params` and parameterise collection parameter and set to `self.collection`.""" self.collection = collection_parameter.CollectionParameter(collection) self.params = params
[docs] def _consolidate_collection(self): """Take in the collection object and finds the file paths relating to each input dataset. If a time range has been supplied then only the files relating to this time range are recorded. Set the result to `self.collection`. """ if "time" in self.params: self.collection = consolidate.consolidate( self.collection, time=self.params.get("time") ) else: self.collection = consolidate.consolidate(self.collection)
[docs] def get_operation_callable(self): """Return the operation callable from clisops.""" raise NotImplementedError
[docs] def calculate(self): """Process the input and calculate the result using clisops. It then returns the result as a daops.normalise.ResultSet object. """ config = { "output_type": self._output_type, "output_dir": self._output_dir, "split_method": self._split_method, "file_namer": self._file_namer, } self.params.update(config) # Normalise (i.e. "fix") data inputs based on "character" norm_collection = normalise.normalise(self.collection, self._apply_fixes) rs = normalise.ResultSet(vars()) # change name of data ref here for dset, collection in norm_collection.items(): # Process each input dataset (either in series or # parallel) rs.add( dset, process(self.get_operation_callable(), collection, **self.params), ) return rs