import numpy as np
from xarray import Dataset
from foxes.algorithms import Iterative
from foxes.config import config
from foxes.core import get_engine
import foxes.variables as FV
import foxes.constants as FC
from . import models as mdls
[docs]
class Sequential(Iterative):
"""
A sequential calculation of states without chunking.
This is of use for the evaluation in simulation
environments that do not support multi-state computations,
like FMUs.
Attributes
----------
ambient: bool
Flag for ambient calculation
calc_pars: dict
Parameters for model calculation.
Key: model name str, value: parameter dict
states0: foxes.core.States
The original states
points: numpy.ndarray
The points of interest, shape: (n_states, n_points, 3)
plugins: list of foxes.algorithm.sequential.SequentialIterPlugin
The plugins, updated with every iteration
outputs: list of str
The output variables
:group: algorithms.sequential
"""
[docs]
@classmethod
def get_model(cls, name):
"""
Get the algorithm specific model
Parameters
----------
name: str
The model name
Returns
-------
model: foxes.core.model
The model
"""
try:
return getattr(mdls, name)
except AttributeError:
return super().get_model(name)
[docs]
def __init__(
self,
farm,
states,
*args,
points=None,
ambient=False,
calc_pars={},
plugins=[],
outputs=None,
**kwargs,
):
"""
Constructor.
Parameters
----------
farm: foxes.WindFarm
The wind farm
states: foxes.core.States
The ambient states
args: tuple, optional
Additional arguments for Downwind
points: numpy.ndarray, optional
The points of interest, shape: (n_states, n_points, 3)
ambient: bool
Flag for ambient calculation
calc_pars: dict
Parameters for model calculation.
Key: model name str, value: parameter dict
plugins: list of foxes.algorithm.sequential.SequentialIterPlugin
The plugins, updated with every iteration
outputs: list of str, optional
The output variables
kwargs: dict, optional
Additional arguments for Downwind
"""
super().__init__(farm, mdls.SeqState(states), *args, **kwargs)
self.ambient = ambient
self.calc_pars = calc_pars
self.states0 = self.states.states
self.points = points
self.plugins = plugins
self.outputs = outputs if outputs is not None else self.DEFAULT_FARM_OUTPUTS
self._verbo0 = self.verbosity + 1
self.verbosity -= 1
get_engine().verbosity -= 2
self._i = None
@property
def iterating(self):
"""
Flag for running iteration
Returns
-------
itr: bool
True if currently iterating
"""
return self._i is not None
[docs]
def get_models_data(self, sel=None, isel=None):
if sel is not None and len(sel):
raise ValueError(f"calc_points does not support sel, got sel={sel}")
if isel is not None and len(isel):
raise ValueError(f"calc_points does not support isel, got isel={isel}")
return self._model_data.isel({FC.STATE: [self.counter]})
[docs]
def __iter__(self):
"""Initialize the iterator"""
if not self.iterating:
if not self.initialized:
self.initialize()
self.print_deco("calc_farm")
self._inds = self.states0.index()
self._weights = self.states0.weights(self)
self._i = 0
self._counter = 0
self._it = 0
mlist, __ = self._collect_farm_models(
None, self.calc_pars, ambient=self.ambient
)
self._calc_farm_vars(mlist)
self._it = None
self._model_data = Dataset(**super().get_models_idata())
if self._verbo0 > 0:
print("\nInput data:\n")
print(self._model_data)
print(f"\nOutput farm variables:", ", ".join(self.farm_vars))
print()
self._farm_results = Dataset(
coords={FC.STATE: self._model_data[FC.STATE].to_numpy()},
data_vars={
v: (
(FC.STATE, FC.TURBINE),
np.zeros_like(self._model_data[FV.WEIGHT].to_numpy()),
)
for v in self.farm_vars
},
)
self._farm_results[FC.TNAME] = ((FC.TURBINE,), self.farm.turbine_names)
if FV.ORDER in self._farm_results:
self._farm_results[FV.ORDER] = self._farm_results[FV.ORDER].astype(
config.dtype_int
)
self._farm_results_dwnd = self._farm_results.copy(deep=True)
self._point_results = None
for p in self.plugins:
p.initialize(self)
return self
[docs]
def __next__(self):
"""Run calculation for current step, then iterate to next"""
if self._i < len(self._inds):
self._counter = self._i
self.states._counter = self._i
self.states._size = 1
self.states._indx = self._inds[self._i]
self.states._weight = self._weights[self._i]
if self._verbo0 > 0:
print(f"{self.name}: Running state {self.states.index()[0]}")
fres, fres_dnwnd = super().calc_farm(
outputs=self.farm_vars,
finalize=False,
ret_dwnd_order=True,
**self.calc_pars,
)
for v in self._farm_results.data_vars.keys():
if FC.STATE in self._farm_results[v].dims:
self._farm_results[v].loc[{FC.STATE: [self.index]}] = fres[v]
self._farm_results_dwnd[v].loc[{FC.STATE: [self.index]}] = (
fres_dnwnd[v]
)
if self.points is None:
for p in self.plugins:
p.update(self, fres)
self._i += 1
return fres
else:
pres = super().calc_points(fres, points=self.points, finalize=False)
if self._point_results is None:
n_states = self._model_data.sizes[FC.STATE]
self._point_results = Dataset(
coords={
FC.STATE: self._model_data[FC.STATE].to_numpy(),
**{c: d for c, d in pres.coords.items() if c != FC.STATE},
},
data_vars={
v: (
d.dims,
np.zeros([n_states] + list(d.shape[1:]), dtype=d.dtype),
)
for v, d in pres.data_vars.items()
if d.dims[0] == FC.STATE
},
)
for v, d in pres.data_vars.items():
if FC.STATE not in d.dims:
self._point_results[v] = d
for v in self._point_results.data_vars.keys():
if FC.STATE in self._point_results[v].dims:
self._point_results[v].loc[{FC.STATE: [self.index]}] = pres[v]
for p in self.plugins:
p.update(self, fres, pres)
self._i += 1
return fres, pres
else:
del self._model_data
self._i = None
self.states._counter = None
self.states._size = len(self._inds)
self.states._indx = self._inds
self.states._weight = self._weights
for p in self.plugins:
p.finalize(self)
raise StopIteration
@property
def size(self):
"""
The total number of iteration steps
Returns
-------
s: int
The total number of iteration steps
"""
return self.states.size()
@property
def counter(self):
"""
The current index counter
Returns
-------
i: int
The current index counter
"""
return self._counter if self.iterating else None
@property
def index(self):
"""
The current index
Returns
-------
indx: int
The current index
"""
return self.states._indx if self.iterating else None
[docs]
def states_i0(self, counter, algo=None):
"""
Returns counter or index
Parameters
----------
counter: bool
Flag for counter
algo: object, optional
Dummy argument, due to consistency with
foxes.core.Data.states_i0
Returns
-------
i0: int
The counter or index
"""
return self.counter if counter else self.index
@property
def weight(self):
"""
The current weight array
Returns
-------
w: numpy.ndarray
The current weight array, shape: (n_turbines,)
"""
return self.states._weight if self.iterating else None
@property
def farm_results(self):
"""
The overall farm results
Returns
-------
results: xarray.Dataset
The overall farm results
"""
return self._farm_results
@property
def farm_results_downwind(self):
"""
The overall farm results, with turbine
dimension in downwind order
Returns
-------
results: xarray.Dataset
The overall farm results
"""
return self._farm_results_dwnd
@property
def cur_farm_results(self):
"""
The current farm results
Returns
-------
results: xarray.Dataset
The current farm results
"""
return self._farm_results.isel({FC.STATE: [self.counter]})
@property
def point_results(self):
"""
The overall point results
Returns
-------
results: xarray.Dataset
The overall point results
"""
return self._point_results
@property
def cur_point_results(self):
"""
The current point results
Returns
-------
results: xarray.Dataset
The current point results
"""
return self._point_results.isel({FC.STATE: [self.counter]})
[docs]
def calc_farm(self):
"""
Calculate farm data.
Returns
-------
farm_results: xarray.Dataset
The farm results. The calculated variables have
dimensions (state, turbine)
"""
if not self.iterating:
raise ValueError(f"calc_farm call is only allowed during iterations")
return self.cur_farm_results
[docs]
def calc_points(
self,
farm_results,
points,
**kwargs,
):
"""
Calculate data at a given set of points.
Parameters
----------
farm_results: xarray.Dataset
The farm results. The calculated variables have
dimensions (state, turbine)
points: numpy.ndarray
The points of interest, shape: (n_states, n_points, 3)
states_sel: list, optional
Reduce to selected states
states_isel: list, optional
Reduce to the selected states indices
Returns
-------
point_results: xarray.Dataset
The point results. The calculated variables have
dimensions (state, point)
"""
if not self.iterating:
raise ValueError(f"calc_points call is only allowed during iterations")
return super().calc_points(farm_results, points, finalize=False, **kwargs)