import numpy as np
from scipy.interpolate import interpn
from foxes.core import States
from foxes.utils import uv2wd
from foxes.models.wake_frames.timelines import Timelines
from foxes.config import config
import foxes.variables as FV
import foxes.constants as FC
[docs]
class OnePointFlowStates(States):
"""
Time-evolving states based on horizontally
homogeneous timeseries data
Attributes
----------
ref_xy: list of float
The [x, y] or [x, y, z] coordinates of the base states.
If [x, y, z] then z will serve as height
tl_heights: list of float
The heights at which timelines will be calculated
dt_min: float
The delta t value in minutes,
if not from timeseries data
intp_pars: dict
Parameters for height interpolation with
scipy.interpolate.interpn
:group: input.states
"""
[docs]
def __init__(
self,
ref_xy,
*base_states_args,
base_states=None,
tl_heights=None,
dt_min=None,
**base_states_kwargs,
):
"""
Constructor.
Parameters
----------
ref_xy: list of float
The [x, y] or [x, y, z] coordinates of the base states.
If [x, y, z] then z will serve as height
base_states_args: tuple, optional
Arguments for creating the base states from
States.new(), if not given as base_states
base_states: foxes.core.States, optional
The base states, representing horizontally
homogeneous inflow
tl_heights: list of float, optional
The heights at which timelines will be calculated
dt_min: float, optional
The delta t value in minutes,
if not from timeseries data
base_states_kwargs: dict, optional
Arguments for creating the base states from
States.new(), if not given as base_states
"""
super().__init__()
self.ref_xy = np.array(ref_xy, dtype=config.dtype_double)
self.heights = tl_heights
self.base_states = base_states
self.dt_min = dt_min
self.intp_pars = {"fill_value": None}
if "bounds_error" in base_states_kwargs:
self.intp_pars["bounds_error"] = base_states_kwargs["bounds_error"]
if base_states is not None and len(base_states_kwargs):
raise KeyError(
f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_state_pars {list(base_states_kwargs.keys())}"
)
elif base_states is not None and len(base_states_args):
raise KeyError(
f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_states_args of types {[type(a).__name__ for a in base_states_args]}"
)
elif base_states is None:
self.base_states = States.new(*base_states_args, **base_states_kwargs)
[docs]
def __repr__(self):
return f"{type(self).__name__}(base={type(self.base_states).__name__}, heights={self.heights}, dt_min={self.dt_min})"
[docs]
def sub_models(self):
"""
List of all sub-models
Returns
-------
smdls: list of foxes.core.Model
All sub models
"""
return [self.base_states]
[docs]
def load_data(self, algo, verbosity=0):
"""
Load and/or create all model data that is subject to chunking.
Such data should not be stored under self, for memory reasons. The
data returned here will automatically be chunked and then provided
as part of the mdata object during calculations.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
verbosity: int
The verbosity level, 0 = silent
Returns
-------
idata: dict
The dict has exactly two entries: `data_vars`,
a dict with entries `name_str -> (dim_tuple, data_ndarray)`;
and `coords`, a dict with entries `dim_name_str -> dim_array`
"""
# find heights:
if self.heights is None:
if hasattr(self.base_states, "heights"):
self.heights = self.base_states.heights
elif len(self.ref_xy) > 2:
self.heights = [self.ref_xy[2]]
else:
raise KeyError(
f"Cannot find 'heights' in base states of type '{type(self.base_states).__name__}', missing either `ref_xy` of type [x, y, z], or explicit value list via parameter 'tl_heights'"
)
# pre-calc data:
self.WEIGHT = self.var(FV.WEIGHT)
idata = super().load_data(algo, verbosity)
idata["data_vars"][self.WEIGHT] = Timelines._precalc_data(
self, algo, self.base_states, self.heights, verbosity, needs_res=True
)
return idata
[docs]
def size(self):
"""
The total number of states.
Returns
-------
int:
The total number of states
"""
return self.base_states.size()
[docs]
def index(self):
"""
The index list
Returns
-------
indices: array_like
The index labels of states, or None for default integers
"""
return self.base_states.index()
[docs]
def output_point_vars(self, algo):
"""
The variables which are being modified by the model.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
Returns
-------
output_vars: list of str
The output variable names
"""
return self.base_states.output_point_vars(algo)
[docs]
def set_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to running, and moves
all large data to stash.
The stashed data will be returned by the
unset_running() function after running calculations.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
super().set_running(algo, data_stash, sel, isel, verbosity)
if sel is not None or isel is not None:
data_stash[self.name]["data"] = self.timelines_data
if isel is not None:
self.timelines_data = self.timelines_data.isel(isel)
if sel is not None:
self.timelines_data = self.timelines_data.sel(sel)
[docs]
def unset_running(
self,
algo,
data_stash,
sel=None,
isel=None,
verbosity=0,
):
"""
Sets this model status to not running, recovering large data
from stash
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
data_stash: dict
Large data stash, this function adds data here.
Key: model name. Value: dict, large model data
sel: dict, optional
The subset selection dictionary
isel: dict, optional
The index subset selection dictionary
verbosity: int
The verbosity level, 0 = silent
"""
super().unset_running(algo, data_stash, sel, isel, verbosity)
data = data_stash[self.name]
if "data" in data:
self.timelines_data = data.pop("data")
[docs]
def calc_states_indices(self, algo, mdata, points, hi, ref_xy):
n_states, n_points = points.shape[:2]
dxy = self.timelines_data["dxy"].to_numpy()[hi]
i0 = mdata.states_i0(counter=True)
trace_p = points[:, :, :2] - ref_xy[:, :, :2]
trace_si = np.zeros((n_states, n_points), dtype=config.dtype_int)
trace_si[:] = i0 + np.arange(n_states)[:, None]
coeffs = np.full((n_states, n_points), np.nan, dtype=config.dtype_double)
# flake8: noqa: F821
def _eval_trace(sel, hdxy=None, hdxy0=None, trs=None):
"""Helper function that updates trace_done"""
nonlocal coeffs
# project onto local x direction:
hdxy0 = dxy[trace_si[sel]] if hdxy0 is None else hdxy0
nx = hdxy0 / np.linalg.norm(hdxy0, axis=-1)[..., None]
projx = np.einsum("...d,...d->...", trace_p[sel], nx)
# check for local points:
if hdxy is None:
seld = (projx >= 1e-10) & (projx <= 1e-10)
if np.any(seld):
coeffs[sel] = np.where(seld, -1, coeffs[sel])
# check for vicinity to reference plane:
else:
lx = np.einsum("...d,...d->...", hdxy, nx)
seld = ((lx < 0) & (projx >= lx) & (projx <= 0)) | (
(lx > 0) & (projx >= 0) & (projx <= lx)
)
if np.any(seld):
w = projx / np.abs(lx)
coeffs[sel] = np.where(seld, w, coeffs[sel])
# step backwards in time, until projection onto axis is negative:
_eval_trace(sel=np.s_[:])
sel = np.isnan(coeffs)
tshift = 0
while np.any(sel):
trs = trace_si[sel]
hdxy = -dxy[trs]
trace_p[sel] += hdxy
_eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift])
tshift -= 1
sel0 = np.isnan(coeffs)
trace_si[sel0 & sel] -= 1
sel = sel0 & (trace_si >= 0)
del trs, sel0, hdxy
# step forwards in time, until projection onto axis is positive:
sel = np.isnan(coeffs)
if np.any(sel):
trace_p = np.where(
sel[:, :, None], points[:, :, :2] - ref_xy[:, :, :2], trace_p
)
trace_si = np.where(sel, i0 + np.arange(n_states)[:, None] + 1, trace_si)
sel &= trace_si < algo.n_states
tshift = 1
while np.any(sel):
trs = trace_si[sel]
hdxy = dxy[trs]
trace_p[sel] += hdxy
_eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift])
tshift += 1
sel0 = np.isnan(coeffs)
trace_si[sel0 & sel] += 1
sel = sel0 & (trace_si < algo.n_states)
del trs, sel0, hdxy
return trace_si, coeffs
[docs]
def calculate(self, algo, mdata, fdata, tdata):
"""
The main model calculation.
This function is executed on a single chunk of data,
all computations should be based on numpy arrays.
Parameters
----------
algo: foxes.core.Algorithm
The calculation algorithm
mdata: foxes.core.MData
The model data
fdata: foxes.core.FData
The farm data
tdata: foxes.core.TData
The target point data
Returns
-------
results: dict
The resulting data, keys: output variable str.
Values: numpy.ndarray with shape
(n_states, n_targets, n_tpoints)
"""
# prepare:
targets = tdata[FC.TARGETS]
n_states, n_targets, n_tpoints = targets.shape[:3]
n_points = n_targets * n_tpoints
points = targets.reshape(n_states, n_points, 3)
heights = self.timelines_data["height"].to_numpy()
n_heights = len(heights)
# compute states indices for all requested points:
trace_si = []
coeffs = []
for hi in range(n_heights):
s, c = self.calc_states_indices(
algo, mdata, points, hi, self.ref_xy[None, None, :]
)
trace_si.append(s)
coeffs.append(c)
del s, c
# flake8: noqa: F821
def _interp_time(hi, v):
"""Helper function for interpolation bewteen states"""
sts = trace_si[hi]
cfs = coeffs[hi]
data = self.timelines_data[v].to_numpy()[hi]
out = np.zeros(sts.shape, dtype=config.dtype_double)
sel_low = sts < 0
if np.any(sel_low):
out[sel_low] = data[0]
sel_hi = sts >= algo.n_states
if np.any(sel_hi):
out[sel_hi] = data[algo.n_states - 1]
sel = (~sel_low) & (~sel_hi) & (cfs <= 0)
if np.any(sel):
s = sts[sel]
c = -cfs[sel]
out[sel] = c * data[s] + (1 - c) * data[s - 1]
sel = (~sel_low) & (~sel_hi) & (cfs > 0)
if np.any(sel):
s = sts[sel]
c = cfs[sel]
out[sel] = c * data[s - 1] + (1 - c) * data[s]
return out
# interpolate to heights:
if n_heights > 1:
ar_states = np.arange(n_states)
ar_points = np.arange(n_points)
crds = (heights, ar_states, ar_points)
data = {
v: np.stack([_interp_time(hi, v) for hi in range(n_heights)], axis=0)
for v in self.timelines_data.data_vars.keys()
if v != "dxy"
}
vres = list(data.keys())
data = np.stack(list(data.values()), axis=-1)
eval = np.zeros((n_states, n_points, 3), dtype=config.dtype_double)
eval[:, :, 0] = points[:, :, 2]
eval[:, :, 1] = ar_states[:, None]
eval[:, :, 2] = ar_points[None, :]
try:
ires = interpn(crds, data, eval, **self.intp_pars)
except ValueError as e:
print(f"\nStates '{self.name}': Interpolation error")
print("INPUT VARS: (heights, states, points)")
print(
"DATA BOUNDS:",
[float(np.min(d)) for d in crds],
[float(np.max(d)) for d in crds],
)
print(
"EVAL BOUNDS:",
[float(np.min(p)) for p in eval.T],
[float(np.max(p)) for p in eval.T],
)
print(
"\nMaybe you want to try the option 'bounds_error=False'? This will extrapolate the data.\n"
)
raise e
del crds, eval, data, ar_points, ar_states
results = {}
for v in self.output_point_vars(algo):
if v not in [FV.WS, FV.WD]:
results[v] = ires[:, :, vres.index(v)]
elif v not in results:
uv = np.stack(
[ires[:, :, vres.index("U")], ires[:, :, vres.index("V")]],
axis=-1,
)
results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)}
del uv
# no dependence on height:
else:
results = {}
for v in self.output_point_vars(algo):
if v not in [FV.WS, FV.WD]:
results[v] = _interp_time(hi, v)
elif v not in results:
uv = np.stack(
[_interp_time(hi, "U"), _interp_time(hi, "V")], axis=-1
)
results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)}
del uv
# set weights:
tdata[FV.WEIGHT] = mdata[self.WEIGHT][:, None, None]
tdata.dims[FV.WEIGHT] = (FC.STATE, FC.TARGET, FC.TPOINT)
return {
v: d.reshape(n_states, n_targets, n_tpoints) for v, d in results.items()
}
[docs]
class OnePointFlowTimeseries(OnePointFlowStates):
"""
Inhomogeneous inflow from homogeneous timeseries data
at one point
:group: input.states
"""
[docs]
def __init__(self, ref_xy, *args, tl_heights=None, **kwargs):
"""
Constructor.
Parameters
----------
ref_xy: list of float
The [x, y] or [x, y, z] coordinates of the base states.
If [x, y, z] then z will serve as height
args: tuple, optional
Parameters for the base class
tl_heights: list of float, optional
The heights at which timelines will be calculated
kwargs: dict, optional
Parameters for the base class
"""
if tl_heights is None and len(ref_xy) < 3:
tl_heights = [100.0]
super().__init__(
ref_xy,
*args,
tl_heights=tl_heights,
states_type="Timeseries",
**kwargs,
)
[docs]
class OnePointFlowMultiHeightTimeseries(OnePointFlowStates):
"""
Inhomogeneous inflow from height dependent homogeneous
timeseries data at one point
:group: input.states
"""
[docs]
def __init__(self, *args, **kwargs):
"""
Constructor.
Parameters
----------
args: tuple, optional
Parameters for the base class
kwargs: dict, optional
Parameters for the base class
"""
super().__init__(*args, states_type="MultiHeightTimeseries", **kwargs)
[docs]
class OnePointFlowMultiHeightNCTimeseries(OnePointFlowStates):
"""
Inhomogeneous inflow from height dependent homogeneous
timeseries data at one point based on NetCDF input
:group: input.states
"""
[docs]
def __init__(self, *args, **kwargs):
"""
Constructor.
Parameters
----------
args: tuple, optional
Parameters for the base class
kwargs: dict, optional
Parameters for the base class
"""
super().__init__(*args, states_type="MultiHeightNCTimeseries", **kwargs)