import numpy as np
import xarray as xr
from abc import abstractmethod
from .model import Model
from foxes.data import StaticData
from foxes.utils import Dict, new_instance
from foxes.config import config
import foxes.constants as FC
from .engine import Engine
[docs]
class Algorithm(Model):
"""
Abstract base class for algorithms.
Algorithms collect required objects for running
calculations, and contain the calculation functions
which are meant to be called from top level code.
Attributes
----------
verbosity: int
The verbosity level, 0 means silent
:group: core
"""
[docs]
def __init__(
self,
mbook,
farm,
verbosity=1,
dbook=None,
**engine_pars,
):
"""
Constructor.
Parameters
----------
mbook: foxes.models.ModelBook
The model book
farm: foxes.WindFarm
The wind farm
verbosity: int
The verbosity level, 0 means silent
dbook: foxes.DataBook, optional
The data book, or None for default
engine_pars: dict, optional
Parameters for the engine constructor
"""
super().__init__()
self.name = type(self).__name__
self.verbosity = verbosity
self.n_states = None
self.n_turbines = farm.n_turbines
self.__farm = farm
self.__mbook = mbook
self.__dbook = StaticData() if dbook is None else dbook
self.__idata_mem = Dict(name="idata_mem")
self.__chunk_store = Dict(name="chunk_store")
if len(engine_pars):
if "engine_type" in engine_pars:
if "engine" in engine_pars:
raise KeyError(
f"{self.name}: Expecting either 'engine' or 'engine_type', not both"
)
elif "engine" in engine_pars:
engine_pars["engine_type"] = engine_pars.pop("engine")
if "engine_type" in engine_pars:
try:
e = Engine.new(verbosity=verbosity, **engine_pars)
except TypeError as e:
print(f"\nError while interpreting engine_pars {engine_pars}\n")
raise e
self.print(f"Algorithm '{self.name}': Selecting engine '{e}'")
e.initialize()
else:
raise KeyError(
f"{self.name}: Found unsupported parameters {list(engine_pars.keys())}"
)
@property
def farm(self):
"""
The wind farm
Returns
-------
mb: foxes.core.WindFarm
The wind farm
"""
return self.__farm
@property
def mbook(self):
"""
The model book
Returns
-------
mb: foxes.models.ModelBook()
The model book
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot access mbook while running"
)
return self.__mbook
@property
def dbook(self):
"""
The data book
Returns
-------
mb: foxes.data.StaticData()
The data book
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot access dbook while running"
)
return self.__dbook
@property
def idata_mem(self):
"""
The current idata memory
Returns
-------
dict :
Keys: model name, value: idata dict
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot access idata_mem while running"
)
return self.__idata_mem
@property
def chunk_store(self):
"""
The current chunk store
Returns
-------
dict :
Keys: model name, value: idata dict
"""
return self.__chunk_store
[docs]
def print(self, *args, vlim=1, **kwargs):
"""
Print function, based on verbosity.
Parameters
----------
args: tuple, optional
Arguments for the print function
kwargs: dict, optional
Keyword arguments for the print function
vlim: int
The verbosity limit
"""
if self.verbosity >= vlim:
print(*args, **kwargs)
[docs]
def print_deco(self, func_name=None, n_points=None):
"""
Helper function for printing model names
Parameters
----------
func_name: str, optional
Name of the calling function
n_points: int, optional
The number of points
"""
if self.verbosity > 0:
deco = "-" * 60
print(f"\n{deco}")
print(f" Algorithm: {type(self).__name__}")
if func_name is not None:
print(f" Running {self.name}: {func_name}")
print(deco)
print(f" n_states : {self.n_states}")
print(f" n_turbines: {self.n_turbines}")
[docs]
def initialize(self):
"""
Initializes the algorithm.
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot initialize while running"
)
super().initialize(self, self.verbosity - 1)
[docs]
def store_model_data(self, model, idata, force=False):
"""
Store model data
Parameters
----------
model: foxes.core.Model
The model
idata: dict
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`
force: bool
Overwrite existing data
"""
mname = f"{type(model).__name__}_{model.name}"
if force:
self.__idata_mem[mname] = idata
elif mname in self.idata_mem:
raise KeyError(f"Attempt to overwrite stored data for model '{mname}'")
else:
self.idata_mem[mname] = idata
[docs]
def get_model_data(self, model):
"""
Gets model data from memory
Parameters
----------
model: foxes.core.Model
The model
"""
mname = f"{type(model).__name__}_{model.name}"
try:
return self.idata_mem[mname]
except KeyError:
raise KeyError(
f"Key '{mname}' not found in idata_mem, available keys: {sorted(list(self.idata_mem.keys()))}"
)
[docs]
def del_model_data(self, model):
"""
Remove stored model data
Parameters
----------
model: foxes.core.Model
The model
"""
mname = f"{type(model).__name__}_{model.name}"
try:
del self.idata_mem[mname]
except KeyError:
raise KeyError(f"Attempt to delete data of model '{mname}', but not stored")
[docs]
def update_n_turbines(self):
"""
Reset the number of turbines,
according to self.farm
"""
if self.n_turbines != self.farm.n_turbines:
self.n_turbines = self.farm.n_turbines
# resize stored idata, if dependent on turbine coord:
newk = {}
for mname, idata in self.idata_mem.items():
if mname[:2] == "__":
continue
for dname, d in idata["data_vars"].items():
k = f"__{mname}_{dname}_turbinv"
if k in self.idata_mem:
ok = self.idata_mem[k]
else:
ok = None
if FC.TURBINE in d[0]:
i = d[0].index(FC.TURBINE)
ok = np.unique(d[1], axis=1).shape[i] == 1
newk[k] = ok
if ok is not None:
if not ok:
raise ValueError(
f"{self.name}: Stored idata entry '{mname}:{dname}' is turbine dependent, unable to reset n_turbines"
)
if FC.TURBINE in idata["coords"]:
idata["coords"][FC.TURBINE] = np.arange(self.n_turbines)
i = d[0].index(FC.TURBINE)
n0 = d[1].shape[i]
if n0 > self.n_turbines:
idata["data_vars"][dname] = (
d[0],
np.take(d[1], range(self.n_turbines), axis=i),
)
elif n0 < self.n_turbines:
shp = [
d[1].shape[j] if j != i else self.n_turbines - n0
for j in range(len(d[1].shape))
]
a = np.zeros(shp, dtype=d[1].dtype)
shp = [
d[1].shape[j] if j != i else 1
for j in range(len(d[1].shape))
]
a[:] = np.take(d[1], -1, axis=i).reshape(shp)
idata["data_vars"][dname] = (
d[0],
np.append(d[1], a, axis=i),
)
self.idata_mem.update(newk)
[docs]
def get_models_idata(self):
"""
Returns idata object of models
Returns
-------
idata: dict, optional
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`.
Take algorithm's idata object by default.
"""
if not self.initialized:
raise ValueError(
f"Algorithm '{self.name}': get_models_idata called before initialization"
)
idata = {"coords": {}, "data_vars": {}}
for k, hidata in self.idata_mem.items():
if len(k) < 3 or k[:2] != "__":
idata["coords"].update(hidata["coords"])
idata["data_vars"].update(hidata["data_vars"])
return idata
[docs]
def get_models_data(self, idata=None, sel=None, isel=None):
"""
Creates xarray from model input data.
Parameters
----------
idata: dict, optional
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`.
Take algorithm's idata object by default.
sel: dict, optional
Selection of coordinates in dataset
isel: dict, optional
Selection of coordinates in dataset
Returns
-------
ds: xarray.Dataset
The model input data
"""
if idata is None:
idata = self.get_models_idata()
ds = xr.Dataset(**idata)
if isel is not None:
ds = ds.isel(isel)
if sel is not None:
ds = ds.sel(sel)
return ds
[docs]
def new_point_data(self, points, states_indices=None, n_states=None):
"""
Creates a point data xarray object, containing only points.
Parameters
----------
points: numpy.ndarray
The points, shape: (n_states, n_points, 3)
states_indices: array_like, optional
The indices of the states dimension
n_states: int, optional
The number of states
Returns
-------
xarray.Dataset
A dataset containing the points data
"""
if n_states is None:
n_states = self.n_states
if states_indices is None:
idata = {"coords": {}, "data_vars": {}}
else:
idata = {"coords": {FC.STATE: states_indices}, "data_vars": {}}
if len(points.shape) == 2 and points.shape[1] == 3:
pts = np.zeros((n_states,) + points.shape, dtype=config.dtype_double)
pts[:] = points[None]
points = pts
del pts
if (
len(points.shape) != 3
or points.shape[0] != n_states
or points.shape[2] != 3
):
raise ValueError(
f"points have wrong dimensions, expecting ({n_states}, {points.shape[1]}, 3), got {points.shape}"
)
idata["data_vars"][FC.TARGETS] = (
(FC.STATE, FC.TARGET, FC.TPOINT, FC.XYH),
points[:, :, None, :],
)
idata["data_vars"][FC.TWEIGHTS] = (
(FC.TPOINT,),
np.array([1.0], dtype=config.dtype_double),
)
return xr.Dataset(**idata)
[docs]
def find_chunk_in_store(
self,
mdata,
tdata=None,
prev_s=0,
prev_t=0,
error=True,
):
"""
Finds indices in chunk store
Parameters
----------
name: str
The data name
mdata: foxes.core.MData
The mdata object
tdata: foxes.core.TData, optional
The tdata object
prev_s: int
How many states chunks backward
prev_t: int
How many points chunks backward
error: bool
Flag for raising KeyError if data not found
Returns
-------
inds: tuple
The (i0, n_states, t0, n_targets) data of the
returning chunk
"""
i0 = int(mdata.states_i0(counter=True))
t0 = int(tdata.targets_i0() if tdata is not None else 0)
n_states = int(mdata.n_states)
n_targets = int(tdata.n_targets if tdata is not None else 0)
if prev_s > 0 or prev_t > 0:
inds = np.array(
[
[
d["i0"],
d["i0"] + d["n_states"],
d["n_states"],
d["t0"],
d["t0"] + d["n_targets"],
d["n_targets"],
]
for d in self.chunk_store.values()
],
dtype=int,
)
if prev_t > 0:
while prev_t > 0:
sel = np.where((inds[:, 0] == i0) & (inds[:, 4] == t0))[0]
if len(sel) == 0:
if error:
raise KeyError(
f"{self.name}: Previous key {(i0, t0)}, prev={(prev_s, prev_t)}, not found in chunk store, got inds {inds}"
)
else:
return None
else:
n_targets = inds[sel[0], 5]
t0 -= n_targets
prev_t -= 1
if prev_s > 0:
while prev_s > 0:
sel = np.where((inds[:, 1] == i0) & (inds[:, 3] == t0))[0]
if len(sel) == 0:
if error:
raise KeyError(
f"{self.name}: Previous key {(i0, t0)}, prev={(prev_s, prev_t)}, not found in chunk store, got inds {inds}"
)
else:
return None
else:
n_states = inds[sel[0], 2]
i0 -= n_states
prev_s -= 1
return i0, n_states, t0, n_targets
[docs]
def add_to_chunk_store(
self,
name,
data,
mdata,
tdata=None,
copy=True,
):
"""
Add data to the chunk store
Parameters
----------
name: str
The data name
data: numpy.ndarray
The data
mdata: foxes.core.MData
The mdata object
tdata: foxes.core.TData, optional
The tdata object
copy: bool
Flag for copying incoming data
"""
i0 = int(mdata.states_i0(counter=True))
t0 = int(tdata.targets_i0() if tdata is not None else 0)
key = (i0, t0)
if key not in self.chunk_store:
n_states = int(mdata.n_states)
n_targets = int(tdata.n_targets if tdata is not None else 0)
self.chunk_store[key] = Dict(
{
"i0": i0,
"t0": t0,
"n_states": n_states,
"n_targets": n_targets,
},
name=f"chunk_store_{i0}_{t0}",
)
self.chunk_store[key][name] = data.copy() if copy else data
[docs]
def get_from_chunk_store(
self,
name,
mdata,
tdata=None,
prev_s=0,
prev_t=0,
ret_inds=False,
error=True,
):
"""
Get data to the chunk store
Parameters
----------
name: str
The data name
mdata: foxes.core.MData
The mdata object
tdata: foxes.core.TData, optional
The tdata object
prev_s: int
How many states chunks backward
prev_t: int
How many points chunks backward
ret_inds: bool
Also return (i0, n_states, t0, n_targets)
of the returned chunk
error: bool
Flag for raising KeyError if data not found
Returns
-------
data: numpy.ndarray
The data
inds: tuple, optional
The (i0, n_states, t0, n_targets) data of the
returning chunk
"""
inds = self.find_chunk_in_store(mdata, tdata, prev_s, prev_t, error)
if inds is None:
return (None, (None, None, None, None)) if ret_inds else None
else:
i0, __, t0, __ = inds
try:
data = self.chunk_store[(i0, t0)][name]
except KeyError as e:
if error:
raise e
else:
data = None
if ret_inds:
return data, inds
else:
return data
[docs]
def reset_chunk_store(self, new_chunk_store=None):
"""
Resets the chunk store
Parameters
----------
new_chunk_store: foxes.utils.Dict, optional
The new chunk store
Returns
-------
chunk_store: foxes.utils.Dict
The chunk store before resetting
"""
chunk_store = self.chunk_store
if new_chunk_store is None:
self.__chunk_store = Dict(name="chunk_store")
elif isinstance(new_chunk_store, Dict):
self.__chunk_store = new_chunk_store
else:
self.__chunk_store = Dict(name="chunk_store")
self.__chunk_store.update(new_chunk_store)
return chunk_store
[docs]
def block_convergence(self, **kwargs):
"""
Switch on convergence block during iterative run
Parameters
----------
kwargs: dict, optional
Parameters for add_to_chunk_store()
"""
self.add_to_chunk_store(
name=FC.BLOCK_CONVERGENCE, data=True, copy=False, **kwargs
)
[docs]
def eval_conv_block(self):
"""
Evaluate convergence block, removing blocks on the fly
Returns
-------
blocked: bool
True if convergence is currently blocked
"""
blocked = False
for c in self.__chunk_store.values():
blocked = c.pop(FC.BLOCK_CONVERGENCE, False) or blocked
return blocked
[docs]
def set_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to running, and moves
all large data to stash.
The stashed data will be returned by the
unset_running() function after running calculations.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
assert algo is self
super().set_running(algo, data_stash, sel, isel, verbosity)
data_stash[self.name].update(
dict(
mbook=self.__mbook,
dbook=self.__dbook,
idata_mem=self.__idata_mem,
)
)
del self.__mbook, self.__dbook
self.__idata_mem = {}
[docs]
def unset_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to not running, recovering large data
from stash
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
assert algo is self
super().unset_running(algo, data_stash, sel, isel, verbosity)
data = data_stash[self.name]
self.__mbook = data.pop("mbook")
self.__dbook = data.pop("dbook")
self.__idata_mem = data.pop("idata_mem")
@abstractmethod
def _launch_parallel_farm_calc(
self,
*args,
mbook,
dbook,
chunk_store,
**kwargs,
):
"""
Runs the main farm calculation, launching parallelization
Parameters
----------
args: tuple, optional
Additional parameters for running
mbook: foxes.models.ModelBook
The model book
dbook: foxes.DataBook
The data book, or None for default
chunk_store: foxes.utils.Dict
The chunk store
kwargs: dict, optional
Additional parameters for running
Returns
-------
farm_results: xarray.Dataset
The farm results. The calculated variables have
dimensions (state, turbine)
"""
pass
[docs]
def calc_farm(self, *args, **kwargs):
"""
Calculate farm data.
Parameters
----------
args: tuple, optional
Parameters
kwargs: dict, optional
Keyword parameters
Returns
-------
farm_results: xarray.Dataset
The farm results. The calculated variables have
dimensions (state, turbine)
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot call calc_farm while running"
)
# set to running:
data_stash = {}
chunk_store = self.reset_chunk_store()
mdls = [
m
for m in [self] + list(args) + list(kwargs.values())
if isinstance(m, Model)
]
for m in mdls:
m.set_running(
self, data_stash, sel=None, isel=None, verbosity=self.verbosity - 2
)
# run parallel calculation:
farm_results = self._launch_parallel_farm_calc(
*args,
chunk_store=chunk_store,
sel=None,
isel=None,
**kwargs,
)
# reset to not running:
for m in mdls:
m.unset_running(
self, data_stash, sel=None, isel=None, verbosity=self.verbosity - 2
)
return farm_results
@abstractmethod
def _launch_parallel_points_calc(
self,
*args,
chunk_store,
**kwargs,
):
"""
Runs the main points calculation, launching parallelization
Parameters
----------
args: tuple, optional
Additional parameters for running
chunk_store: foxes.utils.Dict
The chunk store
kwargs: dict, optional
Additional parameters for running
Returns
-------
point_results: xarray.Dataset
The point results. The calculated variables have
dimensions (state, point)
"""
pass
[docs]
def calc_points(self, *args, sel=None, isel=None, **kwargs):
"""
Calculate points data.
Parameters
----------
args: tuple, optional
Parameters
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
kwargs: dict, optional
Keyword parameters
Returns
-------
point_results: xarray.Dataset
The point results. The calculated variables have
dimensions (state, point)
"""
if self.running:
raise ValueError(
f"Algorithm '{self.name}': Cannot call calc_points while running"
)
# set to running:
data_stash = {}
self.set_running(
self, data_stash, sel=sel, isel=isel, verbosity=self.verbosity - 2
)
# run parallel calculation:
chunk_store = self.reset_chunk_store()
point_results = self._launch_parallel_points_calc(
*args,
chunk_store=chunk_store,
sel=sel,
isel=isel,
**kwargs,
)
# reset to not running:
self.unset_running(
self, data_stash, sel=sel, isel=isel, verbosity=self.verbosity - 2
)
return point_results
[docs]
def finalize(self, clear_mem=False):
"""
Finalizes the algorithm.
Parameters
----------
clear_mem: bool
Clear idata memory
"""
if self.running:
raise ValueError(f"Algorithm '{self.name}': Cannot finalize while running")
super().finalize(self, self.verbosity - 1)
if clear_mem:
self.__idata_mem = Dict()
# self.reset_chunk_store()
[docs]
@classmethod
def new(cls, algo_type, *args, **kwargs):
"""
Run-time algorithm factory.
Parameters
----------
algo_type: str
The selected derived class name
args: tuple, optional
Additional parameters for the constructor
kwargs: dict, optional
Additional parameters for the constructor
"""
return new_instance(cls, algo_type, *args, **kwargs)