import numpy as np
from abc import ABC
from itertools import count
from foxes.config import config
import foxes.constants as FC
[docs]
class Model(ABC):
"""
Base class for all models.
Attributes
----------
name: str
The model name
:group: core
"""
_ids = {}
[docs]
def __init__(self):
"""
Constructor.
"""
t = type(self).__name__
if t not in self._ids:
self._ids[t] = count(0)
self._id = next(self._ids[t])
self.name = f"{type(self).__name__}"
if self._id > 0:
self.name += f"_instance{self._id}"
self.__initialized = False
self.__running = False
[docs]
def __repr__(self):
return f"{type(self).__name__}()"
@property
def model_id(self):
"""
Unique id based on the model type.
Returns
-------
int
Unique id of the model object
"""
return self._id
[docs]
def var(self, v):
"""
Creates a model specific variable name.
Parameters
----------
v: str
The variable name
Returns
-------
str
Model specific variable name
"""
return f"{self.name}_{v}"
@property
def initialized(self):
"""
Initialization flag.
Returns
-------
bool :
True if the model has been initialized.
"""
return self.__initialized
[docs]
def sub_models(self):
"""
List of all sub-models
Returns
-------
smdls: list of foxes.core.Model
All sub models
"""
return []
[docs]
def load_data(self, algo, verbosity=0):
"""
Load and/or create all model data that is subject to chunking.
Such data should not be stored under self, for memory reasons. The
data returned here will automatically be chunked and then provided
as part of the mdata object during calculations.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
verbosity: int
The verbosity level, 0 = silent
Returns
-------
idata: dict
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`
"""
if self.initialized:
raise ValueError(
f"Model '{self.name}': Cannot call load_data after initialization"
)
return {"coords": {}, "data_vars": {}}
[docs]
def initialize(self, algo, verbosity=0, force=False):
"""
Initializes the model.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
verbosity: int
The verbosity level, 0 = silent
force: bool
Overwrite existing data
"""
if self.running:
raise ValueError(f"Model '{self.name}': Cannot initialize while running")
if not self.initialized:
pr = False
for m in self.sub_models():
if not m.initialized:
if verbosity > 1 and not pr:
print(f">> {self.name}: Starting sub-model initialization >> ")
pr = True
m.initialize(algo, verbosity)
if pr:
print(f"<< {self.name}: Finished sub-model initialization << ")
if verbosity > 0:
print(f"Initializing model '{self.name}'")
algo.store_model_data(self, self.load_data(algo, verbosity), force)
self.__initialized = True
@property
def running(self):
"""
Flag for currently running models
Returns
-------
flag: bool
True if currently running
"""
return self.__running
[docs]
def set_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to running, and moves
all large data to stash.
The stashed data will be returned by the
unset_running() function after running calculations.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
if self.running:
raise ValueError(
f"Model '{self.name}': Cannot call set_running while running"
)
for m in self.sub_models():
if not m.running:
m.set_running(algo, data_stash, sel, isel, verbosity=verbosity)
if verbosity > 0:
print(f"Model '{self.name}': running")
if self.name not in data_stash:
data_stash[self.name] = {}
self.__running = True
[docs]
def unset_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to not running, recovering large data
from stash
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
if not self.running:
raise ValueError(
f"Model '{self.name}': Cannot call unset_running when not running"
)
for m in self.sub_models():
if m.running:
m.unset_running(algo, data_stash, sel, isel, verbosity=verbosity)
if verbosity > 0:
print(f"Model '{self.name}': not running")
self.__running = False
[docs]
def finalize(self, algo, verbosity=0):
"""
Finalizes the model.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
verbosity: int
The verbosity level, 0 = silent
"""
if self.running:
raise ValueError(f"Model '{self.name}': Cannot finalize while running")
if self.initialized:
pr = False
for m in self.sub_models():
if verbosity > 1 and not pr:
print(f">> {self.name}: Starting sub-model finalization >> ")
pr = True
m.finalize(algo, verbosity)
if pr:
print(f"<< {self.name}: Finished sub-model finalization << ")
if verbosity > 0:
print(f"Finalizing model '{self.name}'")
algo.del_model_data(self)
self.__initialized = False
[docs]
def get_data(
self,
variable,
target,
lookup="smfp",
mdata=None,
fdata=None,
tdata=None,
downwind_index=None,
accept_none=False,
accept_nan=True,
algo=None,
upcast=False,
selection=None,
):
"""
Getter for a data entry in the model object
or provided data sources
Parameters
----------
variable: str
The variable, serves as data key
target: str, optional
The dimensions identifier for the output,
FC.STATE_TURBINE, FC.STATE_TARGET or
FC.STATE_TARGET_TPOINT
lookup: str
The order of data sources. Combination of:
's' for self,
'm' for mdata,
'f' for fdata,
't' for tdata,
'w' for wake modelling data
mdata: foxes.core.Data, optional
The model data
fdata: foxes.core.Data, optional
The farm data
tdata: foxes.core.Data, optional
The target point data
downwind_index: int, optional
The index in the downwind order
data_prio: bool
First search the data source, then the object
accept_none: bool
Do not throw an error if data entry is None
accept_nan: bool
Do not throw an error if data entry is np.nan
algo: foxes.core.Algorithm, optional
The algorithm, needed for data from previous iteration
upcast: bool
Flag for ensuring targets dimension,
otherwise dimension 1 is entered
selection: numpy.ndarray, optional
Apply this selection to the result,
state-turbine, state-target, or state-target-tpoint
"""
def _geta(a):
sources = [s for s in [mdata, fdata, tdata, algo, self] if s is not None]
for s in sources:
try:
if a == "states_i0":
out = s.states_i0(counter=True)
if out is not None:
return out
else:
out = getattr(s, a)
if out is not None:
return out
except AttributeError:
pass
raise KeyError(
f"Model '{self.name}': Failed to determine '{a}'. Maybe add to arguments of get_data: mdata, fdata, tdata, algo?"
)
n_states = _geta("n_states")
if target == FC.STATE_TURBINE:
if downwind_index is not None:
raise ValueError(
f"Target '{target}' is incompatible with downwind_index (here {downwind_index})"
)
n_turbines = _geta("n_turbines")
dims = (FC.STATE, FC.TURBINE)
shp = (n_states, n_turbines)
elif target == FC.STATE_TARGET:
n_targets = _geta("n_targets")
dims = (FC.STATE, FC.TARGET)
shp = (n_states, n_targets)
elif target == FC.STATE_TARGET_TPOINT:
n_targets = _geta("n_targets")
n_tpoints = _geta("n_tpoints")
dims = (FC.STATE, FC.TARGET, FC.TPOINT)
shp = (n_states, n_targets, n_tpoints)
else:
raise KeyError(
f"Model '{self.name}': Wrong parameter 'target = {target}'. Choices: {FC.STATE_TURBINE}, {FC.STATE_TARGET}, {FC.STATE_TARGET_TPOINT}"
)
def _match_shape(a):
out = np.asarray(a)
if len(out.shape) < len(shp):
for i, s in enumerate(shp):
if i >= len(out.shape):
out = out[..., None]
elif a.shape[i] not in (1, s):
raise ValueError(
f"Shape mismatch for '{variable}': Got {out.shape}, expecting {shp}"
)
elif len(out.shape) > len(shp):
raise ValueError(
f"Shape mismatch for '{variable}': Got {out.shape}, expecting {shp}"
)
return out
def _filter_dims(source):
a = source[variable]
a_dims = tuple(source.dims[variable])
if downwind_index is None or FC.TURBINE not in a_dims:
d = a_dims
else:
slc = tuple(
[downwind_index if dd == FC.TURBINE else np.s_[:] for dd in a_dims]
)
a = a[slc]
d = tuple([dd for dd in a_dims if dd != FC.TURBINE])
return a, d
out = None
for s in lookup:
# lookup self:
if s == "s" and hasattr(self, variable):
a = getattr(self, variable)
if a is not None:
out = _match_shape(a)
# lookup mdata:
elif s == "m" and mdata is not None and variable in mdata:
a, d = _filter_dims(mdata)
l = len(d)
if l <= len(dims) and d == dims[:l]:
out = _match_shape(mdata[variable])
# lookup fdata:
elif (
s == "f"
and fdata is not None
and variable in fdata
and tuple(fdata.dims[variable]) == (FC.STATE, FC.TURBINE)
):
if target == FC.STATE_TURBINE:
out = fdata[variable]
elif downwind_index is not None:
out = _match_shape(fdata[variable][:, downwind_index])
# lookup pdata:
elif (
s == "t"
and target != FC.STATE_TURBINE
and tdata is not None
and variable in tdata
):
a, d = _filter_dims(tdata)
l = len(d)
if l <= len(dims) and d == dims[:l]:
out = _match_shape(tdata[variable])
# lookup wake modelling data:
elif (
s == "w"
and fdata is not None
and tdata is not None
and variable in fdata
and tuple(fdata.dims[variable]) == (FC.STATE, FC.TURBINE)
and downwind_index is not None
and algo is not None
):
out = _match_shape(
algo.wake_frame.get_wake_modelling_data(
algo,
variable,
downwind_index,
fdata,
tdata=tdata,
target=target,
)
)
if out is not None:
break
# check for None:
if out is None:
if not accept_none:
raise ValueError(
f"Model '{self.name}': Variable '{variable}' is requested but not found."
)
return out
# data from other chunks, only with iterations:
if (
target in [FC.STATE_TARGET, FC.STATE_TARGET_TPOINT]
and fdata is not None
and variable in fdata
and tdata is not None
and FC.STATES_SEL in tdata
):
if out.shape != shp:
# upcast to dims:
tmp = np.zeros(shp, dtype=out.dtype)
tmp[:] = out
out = tmp
del tmp
else:
out = out.copy()
if downwind_index is None:
raise KeyError(
f"Model '{self.name}': Require downwind_index for obtaining results from previous iteration"
)
if tdata[FC.STATE_SOURCE_ORDERI] != downwind_index:
raise ValueError(
f"Model '{self.name}': Expecting downwind_index {tdata[FC.STATE_SOURCE_ORDERI]}, got {downwind_index}"
)
if algo is None:
raise ValueError(
f"Model '{self.name}': Iteration data found for variable '{variable}', requiring algo"
)
from foxes.algorithms.sequential import Sequential
if isinstance(algo, Sequential):
i0 = algo.states.counter
else:
i0 = _geta("states_i0")
sts = tdata[FC.STATES_SEL]
if target == FC.STATE_TARGET and tdata.n_tpoints != 1:
# find the mean index and round it to nearest integer:
sts = tdata.tpoint_mean(FC.STATES_SEL)[:, :, None]
sts = (sts + 0.5).astype(config.dtype_int)
sel = sts < i0
if np.any(sel):
if not hasattr(algo, "farm_results_downwind"):
raise KeyError(
f"Model '{self.name}': Iteration data found for variable '{variable}', requiring iterative algorithm"
)
prev_fres = getattr(algo, "farm_results_downwind")
if prev_fres is not None:
prev_data = prev_fres[variable].to_numpy()[sts[sel], downwind_index]
if target == FC.STATE_TARGET:
out[sel[:, :, 0]] = prev_data
else:
out[sel] = prev_data
del prev_data
del prev_fres
if np.any(~sel):
sts = sts[~sel] - i0
sel_data = fdata[variable][sts, downwind_index]
if target == FC.STATE_TARGET:
out[~sel[:, :, 0]] = sel_data
else:
out[~sel] = sel_data
del sel_data
del sel, sts
# check for nan:
if not accept_nan:
try:
if np.all(np.isnan(np.atleast_1d(out))):
raise ValueError(
f"Model '{self.name}': Requested variable '{variable}' contains NaN values."
)
except TypeError:
pass
# apply selection:
if selection is not None:
def _upcast_sel(sel_shape):
chp = []
for i, s in enumerate(out.shape):
if i < len(sel_shape) and sel_shape[i] > 1:
if sel_shape[i] != shp[i]:
raise ValueError(
f"Incompatible selection shape {sel_shape} for output shape {shp[i]}"
)
chp.append(shp[i])
else:
chp.append(s)
chp = tuple(chp)
eshp = list(shp[len(sel_shape) :])
if chp != out.shape:
nout = np.zeros(chp, dtype=out.dtype)
nout[:] = out
return nout, eshp
return out, eshp
if isinstance(selection, np.ndarray) and selection.dtype == bool:
if len(selection.shape) > len(out.shape):
raise ValueError(
f"Expecting selection of shape {out.shape}, got {selection.shape}"
)
out, eshp = _upcast_sel(selection.shape)
elif isinstance(selection, (tuple, list)):
if len(selection) > len(out.shape):
raise ValueError(
f"Selection is tuple/list of length {len(selection)}, expecting <= {len(out.shape)} "
)
out, eshp = _upcast_sel(shp[: len(selection)])
else:
raise TypeError(
f"Expecting selection of type np.ndarray (bool), or tuple, or list. Got {type(selection).__name__}"
)
out = out[selection]
shp = tuple([len(out)] + list(eshp))
# apply upcast:
if upcast and out.shape != shp:
tmp = np.zeros(shp, dtype=out.dtype)
tmp[:] = out
out = tmp
del tmp
return out