Source code for foxes.engines.numpy

from xarray import Dataset

from foxes.core import Engine
import foxes.constants as FC

from .pool import _run_shared


[docs] class NumpyEngine(Engine): """ The numpy engine for foxes calculations. :group: engines """
[docs] def submit(self, f, *args, **kwargs): """ Submits a job to worker, obtaining a future Parameters ---------- f: Callable The function f(*args, **kwargs) to be submitted args: tuple, optional Arguments for the function kwargs: dict, optional Arguments for the function Returns ------- future: object The future object """ return f(*args, **kwargs)
[docs] def await_result(self, future): """ Waits for result from a future Parameters ---------- future: object The future Returns ------- result: object The calculation result """ return future
[docs] def map( self, func, inputs, *args, **kwargs, ): """ Runs a function on a list of files Parameters ---------- func: Callable Function to be called on each file, func(input, *args, **kwargs) -> data inputs: array-like The input data list args: tuple, optional Arguments for func kwargs: dict, optional Keyword arguments for func Returns ------- results: list The list of results """ return [func(input, *args, **kwargs) for input in inputs]
[docs] def future_is_done(self, future): """ Checks if a future is done Parameters ---------- future: object The future Returns ------- is_done: bool True if the future is done """ return True
[docs] def get_start_calc_message( self, n_chunks_states, n_chunks_targets, ): """Helper function for start calculation message""" msg = f"{self.name}: Starting calculation using a loop over" msg += f" {n_chunks_states} states chunks" if n_chunks_targets > 1: msg += f" and {n_chunks_targets} targets chunks" msg += "." return msg
[docs] def run_calculation( self, algo, model, model_data, farm_data=None, point_data=None, out_vars=[], chunk_store={}, sel=None, isel=None, iterative=False, write_nc=None, write_chunk_ani=None, **calc_pars, ): """ Runs the model calculation Parameters ---------- algo: foxes.core.Algorithm The algorithm object model: foxes.core.DataCalcModel The model that whose calculate function should be run model_data: xarray.Dataset The initial model data farm_data: xarray.Dataset, optional The initial farm data point_data: xarray.Dataset, optional The initial point data out_vars: list of str, optional Names of the output variables chunk_store: foxes.utils.Dict The chunk store sel: dict, optional Selection of coordinate subsets isel: dict, optional Selection of coordinate subsets index values iterative: bool Flag for use within the iterative algorithm write_nc: dict, optional Parameters for writing results to netCDF files, e.g. {'out_dir': 'results', 'base_name': 'calc_results', 'ret_data': False, 'split': 1000}. The split parameter controls how the output is split: - 'chunks': one file per chunk (fastest method), - 'input': split according to sizes of multiple states input files, - int: split with this many states per file, - None: create a single output file. Use ret_data = False together with non-single file writing to avoid constructing the full Dataset in memory. write_chunk_ani: dict, optional Parameters for writing chunk animations, e.g. {'fpath_base': 'results/chunk_animation', 'vars': ['WS'], 'resolution': 100, 'chunk': 5}.'} The chunk is either an integer that refers to a states chunk, or a tuple (states_chunk_index, points_chunk_index), or a list of such entries. calc_pars: dict, optional Additional parameters for the model.calculate() Returns ------- results: xarray.Dataset The model results """ # subset selection: model_data, farm_data, point_data = self.select_subsets( model_data, farm_data, point_data, sel=sel, isel=isel ) # basic checks: super().run_calculation(algo, model, model_data, farm_data, point_data) # prepare: algo.reset_chunk_store(chunk_store) n_states = model_data.sizes[FC.STATE] out_dims = model.output_coords() coords = {} if FC.STATE in out_dims and FC.STATE in model_data.coords: coords[FC.STATE] = model_data[FC.STATE].to_numpy() if farm_data is None: farm_data = Dataset() goal_data = farm_data if point_data is None else point_data # DEBUG objec mem sizes: # from foxes.utils import print_mem # for m in [algo] + model.models: # print_mem(m, pre_str="MULTIP CHECKING LARGE DATA", min_csize=9999) # calculate chunk sizes: n_targets = point_data.sizes[FC.TARGET] if point_data is not None else 0 chunk_sizes_states, chunk_sizes_targets = self.calc_chunk_sizes( n_states, n_targets ) n_chunks_states = len(chunk_sizes_states) n_chunks_targets = len(chunk_sizes_targets) self.print( f"{type(self).__name__}: Selecting n_chunks_states = {n_chunks_states}, n_chunks_targets = {n_chunks_targets}", level=2, ) # start calculation: with self.new_chunk_results_manager( algo, goal_data=goal_data, n_chunks_states=n_chunks_states, n_chunks_targets=n_chunks_targets, out_vars=out_vars, out_dims=out_dims, coords=coords, iterative=iterative, write_nc=write_nc, ) as results_mgr: results = {} i0_states = 0 for chunki_states in range(n_chunks_states): i1_states = i0_states + chunk_sizes_states[chunki_states] i0_targets = 0 for chunki_points in range(n_chunks_targets): key = (chunki_states, chunki_points) i1_targets = i0_targets + chunk_sizes_targets[chunki_points] # get this chunk's data: data = self.get_chunk_input_data( algo=algo, model_data=model_data, farm_data=farm_data, point_data=point_data, states_i0_i1=(i0_states, i1_states), targets_i0_i1=(i0_targets, i1_targets), out_vars=out_vars, chunki_states=chunki_states, chunki_points=chunki_points, n_chunks_states=n_chunks_states, n_chunks_points=n_chunks_targets, ) # submit model calculation: key = (chunki_states, chunki_points) results[key] = _run_shared( algo, model, *data, chunk_key=key, out_dims=out_dims, write_nc=write_nc, write_chunk_ani=write_chunk_ani, **calc_pars, ) # chunk_store.update(results[key][1]) del data # progress update: results_mgr.update(results) i0_targets = i1_targets i0_states = i1_states del calc_pars, farm_data, point_data, results return results_mgr.results