import os
import numpy as np
from abc import ABC, abstractmethod
from tqdm import tqdm
from xarray import Dataset
from foxes.core import MData, FData, TData
from foxes.utils import new_instance
from foxes.config import config
import foxes.constants as FC
__global_engine_data__ = dict(
engine=None,
)
[docs]
class Engine(ABC):
"""
Abstract base clas for foxes calculation engines
Attributes
----------
chunk_size_states: int
The size of a states chunk
chunk_size_points: int
The size of a points chunk
n_procs: int, optional
The number of processes to be used,
or None for automatic
verbosity: int
The verbosity level, 0 = silent
:group: core
"""
[docs]
def __init__(
self,
chunk_size_states=None,
chunk_size_points=None,
n_procs=None,
verbosity=1,
):
"""
Constructor.
Parameters
----------
chunk_size_states: int, optional
The size of a states chunk
chunk_size_points: int, optional
The size of a points chunk
n_procs: int, optional
The number of processes to be used,
or None for automatic
verbosity: int
The verbosity level, 0 = silent
"""
self.chunk_size_states = chunk_size_states
self.chunk_size_points = chunk_size_points
try:
self.n_procs = n_procs if n_procs is not None else os.process_cpu_count()
except AttributeError:
self.n_procs = os.cpu_count()
self.verbosity = verbosity
self.__initialized = False
self.__entered = False
[docs]
def __repr__(self):
s = f"n_procs={self.n_procs}, chunk_size_states={self.chunk_size_states}, chunk_size_points={self.chunk_size_points}"
return f"{type(self).__name__}({s})"
def __enter__(self):
if self.__entered:
raise ValueError(f"Enter called for already entered engine")
self.__entered = True
if not self.initialized:
self.initialize()
return self
def __exit__(self, *exit_args):
if not self.__entered:
raise ValueError(f"Exit called for not entered engine")
self.__entered = False
if self.initialized:
self.finalize(*exit_args)
def __del__(self):
if self.initialized:
self.finalize()
@property
def entered(self):
"""
Flag that this model has been entered.
Returns
-------
flag: bool
True if the model has been entered.
"""
return self.__entered
@property
def initialized(self):
"""
Initialization flag.
Returns
-------
flag: bool
True if the model has been initialized.
"""
return self.__initialized
[docs]
def initialize(self):
"""
Initializes the engine.
"""
if not self.entered:
self.__enter__()
elif not self.initialized:
if get_engine(error=False, default=False) is not None:
raise ValueError(
f"Cannot initialize engine '{type(self).__name__}', since engine already set to '{type(get_engine()).__name__}'"
)
global __global_engine_data__
__global_engine_data__["engine"] = self
self.__initialized = True
[docs]
def finalize(self, type=None, value=None, traceback=None):
"""
Finalizes the engine.
Parameters
----------
type: object, optional
Dummy argument for the exit function
value: object, optional
Dummy argument for the exit function
traceback: object, optional
Dummy argument for the exit function
"""
if self.entered:
self.__exit__(type, value, traceback)
elif self.initialized:
global __global_engine_data__
__global_engine_data__["engine"] = None
self.__initialized = False
[docs]
def print(self, *args, level=1, **kwargs):
"""Prints based on verbosity"""
if self.verbosity >= level:
print(*args, **kwargs)
[docs]
def map(
self,
func,
inputs,
*args,
**kwargs,
):
"""
Runs a function on a list of files
Parameters
----------
func: Callable
Function to be called on each file,
func(input, *args, **kwargs) -> data
inputs: array-like
The input data list
args: tuple, optional
Arguments for func
kwargs: dict, optional
Keyword arguments for func
Returns
-------
results: list
The list of results
"""
raise NotImplementedError
@property
def loop_dims(self):
"""
Gets the loop dimensions (possibly chunked)
Returns
-------
dims: list of str
The loop dimensions (possibly chunked)
"""
if self.chunk_size_states is None and self.chunk_size_states is None:
return []
elif self.chunk_size_states is None:
return [FC.TARGET]
elif self.chunk_size_points is None:
return [FC.STATE]
else:
return [FC.STATE, FC.TARGET]
[docs]
def select_subsets(self, *datasets, sel=None, isel=None):
"""
Takes subsets of datasets
Parameters
----------
datasets: tuple
The xarray.Dataset or xarray.Dataarray objects
sel: dict, optional
The selection dictionary
isel: dict, optional
The index selection dictionary
Returns
-------
subsets: list
The subsets of the input data
"""
if sel is not None:
new_datasets = []
for data in datasets:
if data is not None:
s = {c: u for c, u in sel.items() if c in data.coords}
new_datasets.append(data.sel(s) if len(s) else data)
else:
new_datasets.append(data)
datasets = new_datasets
if isel is not None:
new_datasets = []
for data in datasets:
if data is not None:
s = {c: u for c, u in isel.items() if c in data.coords}
new_datasets.append(data.isel(s) if len(s) else data)
else:
new_datasets.append(data)
datasets = new_datasets
return datasets
[docs]
def calc_chunk_sizes(self, n_states, n_targets=1):
"""
Computes the sizes of states and points chunks
Parameters
----------
n_states: int
The number of states
n_targets: int
The number of point targets
Returns
-------
chunk_sizes_states: numpy.ndarray
The sizes of all states chunks, shape: (n_chunks_states,)
chunk_sizes_targets: numpy.ndarray
The sizes of all targets chunks, shape: (n_chunks_targets,)
"""
# determine states chunks:
if self.chunk_size_states is None:
n_chunks_states = min(self.n_procs, n_states)
chunk_size_states = max(int(n_states / self.n_procs), 1)
else:
chunk_size_states = min(n_states, self.chunk_size_states)
n_chunks_states = max(int(n_states / chunk_size_states), 1)
if int(n_states / n_chunks_states) > chunk_size_states:
n_chunks_states += 1
chunk_size_states = int(n_states / n_chunks_states)
# determine points chunks:
chunk_sizes_targets = [n_targets]
if n_targets > 1:
if self.chunk_size_points is None:
if n_targets < max(n_states, 1000):
chunk_size_targets = n_targets
n_chunks_targets = 1
else:
n_chunks_targets = min(self.n_procs, n_targets)
chunk_size_targets = max(int(n_targets / self.n_procs), 1)
if self.chunk_size_states is None and n_chunks_states > 1:
while chunk_size_states * chunk_size_targets > n_targets:
n_chunks_states += 1
chunk_size_states = int(n_states / n_chunks_states)
else:
chunk_size_targets = min(n_targets, self.chunk_size_points)
n_chunks_targets = max(int(n_targets / chunk_size_targets), 1)
if int(n_targets / n_chunks_targets) > chunk_size_targets:
n_chunks_targets += 1
chunk_size_targets = int(n_targets / n_chunks_targets)
chunk_sizes_targets = np.full(n_chunks_targets, chunk_size_targets)
extra = n_targets - n_chunks_targets * chunk_size_targets
if extra > 0:
chunk_sizes_targets[-extra:] += 1
s = np.sum(chunk_sizes_targets)
assert (
s == n_targets
), f"Targets count mismatch: Expecting {n_targets}, chunks sum is {s}. Chunks: {[int(c) for c in chunk_sizes_targets]}"
chunk_sizes_states = np.full(n_chunks_states, chunk_size_states)
extra = n_states - n_chunks_states * chunk_size_states
if extra > 0:
chunk_sizes_states[-extra:] += 1
s = np.sum(chunk_sizes_states)
assert (
s == n_states
), f"States count mismatch: Expecting {n_states}, chunks sum is {s}. Chunks: {[int(c) for c in chunk_sizes_states]}"
return chunk_sizes_states, chunk_sizes_targets
[docs]
def combine_results(
self,
algo,
results,
model_data,
out_vars,
out_coords,
n_chunks_states,
n_chunks_targets,
goal_data,
iterative,
):
"""
Combines chunk results into final Dataset
Parameters
----------
algo: foxes.core.Algorithm
The algorithm object
results: dict
The results from the chunk calculations,
key: (chunki_states, chunki_targets),
value: dict with numpy.ndarray values
model_data: xarray.Dataset
The initial model data
out_vars: list of str
Names of the output variables
out_coords: list of str
Names of the output coordinates
n_chunks_states: int
The number of states chunks
n_chunks_targets: int
The number of targets chunks
goal_data: foxes.core.Data
Either fdata or tdata
iterative: bool
Flag for use within the iterative algorithm
Returns
-------
ds: xarray.Dataset
The final results dataset
"""
self.print(f"{type(self).__name__}: Combining results", level=2)
pbar = tqdm(total=len(out_vars)) if self.verbosity > 1 else None
data_vars = {}
for v in out_vars:
if v in results[(0, 0)][0]:
data_vars[v] = [out_coords, []]
if n_chunks_targets == 1:
alls = 0
for chunki_states in range(n_chunks_states):
r, cstore = results[(chunki_states, 0)]
data_vars[v][1].append(r[v])
alls += data_vars[v][1][-1].shape[0]
if iterative:
for k, c in cstore.items():
if k in algo.chunk_store:
algo.chunk_store[k].update(c)
else:
algo.chunk_store[k] = c
else:
for chunki_states in range(n_chunks_states):
tres = []
for chunki_points in range(n_chunks_targets):
r, cstore = results[(chunki_states, chunki_points)]
tres.append(r[v])
if iterative:
for k, c in cstore.items():
if k in algo.chunk_store:
algo.chunk_store[k].update(c)
else:
algo.chunk_store[k] = c
data_vars[v][1].append(np.concatenate(tres, axis=1))
del tres
del r, cstore
data_vars[v][1] = np.concatenate(data_vars[v][1], axis=0)
else:
data_vars[v] = (goal_data[v].dims, goal_data[v].to_numpy())
if pbar is not None:
pbar.update()
del results
if pbar is not None:
pbar.close()
# if not iterative or algo.final_iteration:
# algo.reset_chunk_store()
coords = {}
if FC.STATE in out_coords and FC.STATE in model_data.coords:
coords[FC.STATE] = model_data[FC.STATE].to_numpy()
return Dataset(
coords=coords,
data_vars={v: tuple(d) for v, d in data_vars.items()},
)
[docs]
@abstractmethod
def run_calculation(self, algo, model, model_data, farm_data, point_data=None):
"""
Runs the model calculation
Parameters
----------
algo: foxes.core.Algorithm
The algorithm object
model: foxes.core.DataCalcModel
The model that whose calculate function
should be run
model_data: xarray.Dataset
The initial model data
farm_data: xarray.Dataset
The initial farm data
point_data: xarray.Dataset, optional
The initial point data
Returns
-------
results: xarray.Dataset
The model results
"""
n_states = model_data.sizes[FC.STATE]
if point_data is None:
self.print(
f"{type(self).__name__}: Calculating {n_states} states for {algo.n_turbines} turbines"
)
else:
self.print(
f"{type(self).__name__}: Calculating data at {point_data.sizes[FC.TARGET]} points for {n_states} states"
)
if not self.initialized:
raise ValueError(f"Engine '{type(self).__name__}' not initialized")
if not model.initialized:
raise ValueError(f"Model '{model.name}' not initialized")
[docs]
@classmethod
def new(cls, engine_type, *args, **kwargs):
"""
Run-time engine factory.
Parameters
----------
engine_type: str
The selected derived class name
args: tuple, optional
Additional parameters for constructor
kwargs: dict, optional
Additional parameters for constructor
"""
if engine_type is None:
engine_type = "default"
engine_type = dict(
default="DefaultEngine",
threads="ThreadsEngine",
process="ProcessEngine",
xarray="XArrayEngine",
dask="DaskEngine",
multiprocess="MultiprocessEngine",
local_cluster="LocalClusterEngine",
slurm_cluster="SlurmClusterEngine",
mpi="MPIEngine",
ray="RayEngine",
numpy="NumpyEngine",
single="SingleChunkEngine",
).get(engine_type, engine_type)
return new_instance(cls, engine_type, *args, **kwargs)
[docs]
def get_engine(error=True, default=True):
"""
Gets the global calculation engine
Parameters
----------
error: bool
Flag for raising ValueError if no
engine is found
default: bool or dict or Engine
Set default engine if not set yet
Returns
-------
engine: foxes.core.Engine
The foxes calculation engine
:group: core
"""
engine = __global_engine_data__["engine"]
if engine is None:
if isinstance(default, dict):
engine = Engine.new(**default)
print(f"Selecting default engine '{engine}'")
engine.initialize()
return engine
elif isinstance(default, Engine):
print(f"Selecting default engine '{default}'")
default.initialize()
return default
elif isinstance(default, bool) and default:
engine = Engine.new(engine_type="DefaultEngine", verbosity=1)
print(f"Selecting '{engine}'")
engine.initialize()
return engine
elif error:
raise ValueError("Engine not found.")
return engine
[docs]
def has_engine():
"""
Flag that checks if engine has been set
Returns
-------
flag: bool
True if engine has been set
:group: core
"""
return __global_engine_data__["engine"] is not None
[docs]
def reset_engine():
"""
Resets the global calculation engine
:group: core
"""
engine = get_engine(error=False, default=False)
if engine is not None:
engine.finalize(type=None, value=None, traceback=None)