import numpy as np
import xarray as xr
from .model import Model
from foxes.data import StaticData
from foxes.utils import Dict, all_subclasses
import foxes.constants as FC
[docs]
class Algorithm(Model):
"""
Abstract base class for algorithms.
Algorithms collect required objects for running
calculations, and contain the calculation functions
which are meant to be called from top level code.
Attributes
----------
mbook: foxes.models.ModelBook
The model book
farm: foxes.WindFarm
The wind farm
chunks: dict
The chunks choice for running in parallel with dask,
e.g. `{"state": 1000}` for chunks of 1000 states
verbosity: int
The verbosity level, 0 means silent
dbook: foxes.DataBook
The data book, or None for default
:group: core
"""
[docs]
def __init__(self, mbook, farm, chunks, verbosity, dbook=None):
"""
Constructor.
Parameters
----------
mbook: foxes.models.ModelBook
The model book
farm: foxes.WindFarm
The wind farm
chunks: dict
The chunks choice for running in parallel with dask,
e.g. `{"state": 1000}` for chunks of 1000 states
verbosity: int
The verbosity level, 0 means silent
dbook: foxes.DataBook, optional
The data book, or None for default
"""
super().__init__()
self.name = type(self).__name__
self.mbook = mbook
self.farm = farm
self.chunks = chunks
self.verbosity = verbosity
self.n_states = None
self.n_turbines = farm.n_turbines
self.dbook = StaticData() if dbook is None else dbook
if chunks is not None and FC.TARGET not in chunks:
self.chunks[FC.TARGET] = chunks.get(FC.POINT, None)
self._idata_mem = Dict()
[docs]
def print(self, *args, vlim=1, **kwargs):
"""
Print function, based on verbosity.
Parameters
----------
args: tuple, optional
Arguments for the print function
kwargs: dict, optional
Keyword arguments for the print function
vlim: int
The verbosity limit
"""
if self.verbosity >= vlim:
print(*args, **kwargs)
def __get_sizes(self, idata, mtype):
"""
Private helper function
"""
sizes = {}
for v, t in idata["data_vars"].items():
if not isinstance(t, tuple) or len(t) != 2:
raise ValueError(
f"Input {mtype} data entry '{v}': Not a tuple of size 2, got '{t}'"
)
if not isinstance(t[0], tuple):
raise ValueError(
f"Input {mtype} data entry '{v}': First tuple entry not a dimensions tuple, got '{t[0]}'"
)
for c in t[0]:
if not isinstance(c, str):
raise ValueError(
f"Input {mtype} data entry '{v}': First tuple entry not a dimensions tuple, got '{t[0]}'"
)
if not isinstance(t[1], np.ndarray):
raise ValueError(
f"Input {mtype} data entry '{v}': Second entry is not a numpy array, got: {type(t[1]).__name__}"
)
if len(t[1].shape) != len(t[0]):
raise ValueError(
f"Input {mtype} data entry '{v}': Wrong data shape, expecting {len(t[0])} dimensions, got {t[1].shape}"
)
if FC.STATE in t[0]:
if t[0][0] != FC.STATE:
raise ValueError(
f"Input {mtype} data entry '{v}': Dimension '{FC.STATE}' not at first position, got {t[0]}"
)
if FC.TURBINE in t[0]:
if t[0][1] != FC.TURBINE:
raise ValueError(
f"Input {mtype} data entry '{v}': Dimension '{FC.TURBINE}' not at second position, got {t[0]}"
)
if FC.TARGET in t[0]:
if t[0][1] != FC.TARGET:
raise ValueError(
f"Input {mtype} data entry '{v}': Dimension '{FC.TARGET}' not at second position, got {t[0]}"
)
if len(t[0]) < 3 or t[0][2] != FC.TPOINT:
raise KeyError(
f"Input {mtype} data entry '{v}': Expecting dimension '{FC.TPOINT}' as third entry. Got {t[0]}"
)
elif FC.TURBINE in t[0]:
raise ValueError(
f"Input {mtype} data entry '{v}': Dimension '{FC.TURBINE}' requires combination with dimension '{FC.STATE}'"
)
for d, s in zip(t[0], t[1].shape):
if d not in sizes:
sizes[d] = s
elif sizes[d] != s:
raise ValueError(
f"Input {mtype} data entry '{v}': Dimension '{d}' has wrong size, expecting {sizes[d]}, got {s}"
)
for v, c in idata["coords"].items():
if v not in sizes:
raise KeyError(
f"Input coords entry '{v}': Not used in farm data, found {sorted(list(sizes.keys()))}"
)
elif len(c) != sizes[v]:
raise ValueError(
f"Input coords entry '{v}': Wrong coordinate size for '{v}': Expecting {sizes[v]}, got {len(c)}"
)
return sizes
def __get_xrdata(self, idata, sizes):
"""
Private helper function
"""
xrdata = xr.Dataset(**idata)
if self.chunks is not None:
if FC.TURBINE in self.chunks.keys():
raise ValueError(
f"Dimension '{FC.TURBINE}' cannot be chunked, got chunks {self.chunks}"
)
if FC.TPOINT in self.chunks.keys():
raise ValueError(
f"Dimension '{FC.TPOINT}' cannot be chunked, got chunks {self.chunks}"
)
xrdata = xrdata.chunk(
chunks={c: v for c, v in self.chunks.items() if c in sizes}
)
return xrdata
[docs]
def chunked(self, ds):
return (
ds.chunk(chunks={c: v for c, v in self.chunks.items() if c in ds.coords})
if self.chunks is not None
else ds
)
[docs]
def initialize(self):
"""
Initializes the algorithm.
"""
super().initialize(self, self.verbosity)
@property
def idata_mem(self):
"""
The current idata memory
Returns
-------
dict :
Keys: model name, value: idata dict
"""
return self._idata_mem
[docs]
def store_model_data(self, model, idata, force=False):
"""
Store model data
Parameters
----------
model: foxes.core.Model
The model
idata: dict
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`
force: bool
Overwrite existing data
"""
mname = f"{type(model).__name__}_{model.name}"
if not force and mname in self._idata_mem:
raise KeyError(f"Attempt to overwrite stored data for model '{mname}'")
self._idata_mem[mname] = idata
[docs]
def get_model_data(self, model):
"""
Gets model data from memory
Parameters
----------
model: foxes.core.Model
The model
"""
mname = f"{type(model).__name__}_{model.name}"
try:
return self._idata_mem[mname]
except KeyError:
raise KeyError(
f"Key '{mname}' not found in idata_mem, available keys: {sorted(list(self._idata_mem.keys()))}"
)
[docs]
def del_model_data(self, model):
"""
Remove stored model data
Parameters
----------
model: foxes.core.Model
The model
"""
mname = f"{type(model).__name__}_{model.name}"
try:
del self._idata_mem[mname]
except KeyError:
raise KeyError(f"Attempt to delete data of model '{mname}', but not stored")
[docs]
def update_n_turbines(self):
"""
Reset the number of turbines,
according to self.farm
"""
if self.n_turbines != self.farm.n_turbines:
self.n_turbines = self.farm.n_turbines
# resize stored idata, if dependent on turbine coord:
newk = {}
for mname, idata in self.idata_mem.items():
if mname[:2] == "__":
continue
for dname, d in idata["data_vars"].items():
k = f"__{mname}_{dname}_turbinv"
if k in self.idata_mem:
ok = self.idata_mem[k]
else:
ok = None
if FC.TURBINE in d[0]:
i = d[0].index(FC.TURBINE)
ok = np.unique(d[1], axis=1).shape[i] == 1
newk[k] = ok
if ok is not None:
if not ok:
raise ValueError(
f"{self.name}: Stored idata entry '{mname}:{dname}' is turbine dependent, unable to reset n_turbines"
)
if FC.TURBINE in idata["coords"]:
idata["coords"][FC.TURBINE] = np.arange(self.n_turbines)
i = d[0].index(FC.TURBINE)
n0 = d[1].shape[i]
if n0 > self.n_turbines:
idata["data_vars"][dname] = (
d[0],
np.take(d[1], range(self.n_turbines), axis=i),
)
elif n0 < self.n_turbines:
shp = [
d[1].shape[j] if j != i else self.n_turbines - n0
for j in range(len(d[1].shape))
]
a = np.zeros(shp, dtype=d[1].dtype)
shp = [
d[1].shape[j] if j != i else 1
for j in range(len(d[1].shape))
]
a[:] = np.take(d[1], -1, axis=i).reshape(shp)
idata["data_vars"][dname] = (
d[0],
np.append(d[1], a, axis=i),
)
self._idata_mem.update(newk)
[docs]
def get_models_idata(self):
"""
Returns idata object of models
Returns
-------
idata: dict, optional
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`.
Take algorithm's idata object by default.
"""
if not self.initialized:
raise ValueError(
f"Algorithm '{self.name}': get_models_idata called before initialization"
)
idata = {"coords": {}, "data_vars": {}}
for k, hidata in self._idata_mem.items():
if len(k) < 3 or k[:2] != "__":
idata["coords"].update(hidata["coords"])
idata["data_vars"].update(hidata["data_vars"])
return idata
[docs]
def get_models_data(self, idata=None):
"""
Creates xarray from model input data.
Parameters
----------
idata: dict, optional
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`.
Take algorithm's idata object by default.
Returns
-------
xarray.Dataset
The model input data
"""
if idata is None:
idata = self.get_models_idata()
sizes = self.__get_sizes(idata, "models")
return self.__get_xrdata(idata, sizes)
[docs]
def new_point_data(self, points, states_indices=None):
"""
Creates a point data xarray object, containing only points.
Parameters
----------
points: numpy.ndarray
The points, shape: (n_states, n_points, 3)
states_indices: array_like, optional
The indices of the states dimension
Returns
-------
xarray.Dataset
A dataset containing the points data
"""
if states_indices is None:
idata = {"coords": {}, "data_vars": {}}
else:
idata = {"coords": {FC.STATE: states_indices}, "data_vars": {}}
if (
len(points.shape) != 3
or points.shape[0] != self.n_states
or points.shape[2] != 3
):
raise ValueError(
f"points have wrong dimensions, expecting ({self.n_states}, {points.shape[1]}, 3), got {points.shape}"
)
idata["data_vars"][FC.TARGETS] = (
(FC.STATE, FC.TARGET, FC.TPOINT, FC.XYH),
points[:, :, None, :],
)
idata["data_vars"][FC.TWEIGHTS] = (
(FC.TPOINT,),
np.array([1.0], dtype=FC.DTYPE),
)
sizes = self.__get_sizes(idata, "point")
return self.__get_xrdata(idata, sizes)
[docs]
def finalize(self, clear_mem=False):
"""
Finalizes the algorithm.
Parameters
----------
clear_mem: bool
Clear idata memory
"""
super().finalize(self, self.verbosity)
if clear_mem:
self._idata_mem = Dict()
[docs]
@classmethod
def new(cls, algo_type, *args, **kwargs):
"""
Run-time algorithm factory.
Parameters
----------
algo_type: str
The selected derived class name
args: tuple, optional
Additional parameters for the constructor
kwargs: dict, optional
Additional parameters for the constructor
"""
if algo_type is None:
return None
allc = all_subclasses(cls)
found = algo_type in [scls.__name__ for scls in allc]
if found:
for scls in allc:
if scls.__name__ == algo_type:
return scls(*args, **kwargs)
else:
estr = (
"Algorithm type '{}' is not defined, available types are \n {}".format(
algo_type, sorted([i.__name__ for i in allc])
)
)
raise KeyError(estr)