Source code for foxes.input.states.one_point_flow

import numpy as np
from scipy.interpolate import interpn

from foxes.core import States
from foxes.utils import uv2wd
from foxes.models.wake_frames.timelines import Timelines
from foxes.config import config
import foxes.variables as FV
import foxes.constants as FC


[docs] class OnePointFlowStates(States): """ Time-evolving states based on horizontally homogeneous timeseries data Attributes ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height tl_heights: list of float The heights at which timelines will be calculated dt_min: float The delta t value in minutes, if not from timeseries data intp_pars: dict Parameters for height interpolation with scipy.interpolate.interpn :group: input.states """
[docs] def __init__( self, ref_xy, *base_states_args, base_states=None, tl_heights=None, dt_min=None, **base_states_kwargs, ): """ Constructor. Parameters ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height base_states_args: tuple, optional Arguments for creating the base states from States.new(), if not given as base_states base_states: foxes.core.States, optional The base states, representing horizontally homogeneous inflow tl_heights: list of float, optional The heights at which timelines will be calculated dt_min: float, optional The delta t value in minutes, if not from timeseries data base_states_kwargs: dict, optional Arguments for creating the base states from States.new(), if not given as base_states """ super().__init__() self.ref_xy = np.array(ref_xy, dtype=config.dtype_double) self.heights = tl_heights self.base_states = base_states self.dt_min = dt_min self.intp_pars = {"fill_value": None} if "bounds_error" in base_states_kwargs: self.intp_pars["bounds_error"] = base_states_kwargs["bounds_error"] if base_states is not None and len(base_states_kwargs): raise KeyError( f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_state_pars {list(base_states_kwargs.keys())}" ) elif base_states is not None and len(base_states_args): raise KeyError( f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_states_args of types {[type(a).__name__ for a in base_states_args]}" ) elif base_states is None: self.base_states = States.new(*base_states_args, **base_states_kwargs)
[docs] def __repr__(self): return f"{type(self).__name__}(base={type(self.base_states).__name__}, heights={self.heights}, dt_min={self.dt_min})"
[docs] def sub_models(self): """ List of all sub-models Returns ------- smdls: list of foxes.core.Model All sub models """ return [self.base_states]
[docs] def initialize(self, algo, verbosity=0): """ Initializes the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent """ super().initialize(algo, verbosity) # find heights: if self.heights is None: if hasattr(self.base_states, "heights"): self.heights = self.base_states.heights elif len(self.ref_xy) > 2: self.heights = [self.ref_xy[2]] else: raise KeyError( f"Cannot find 'heights' in base states of type '{type(self.base_states).__name__}', missing either `ref_xy` of type [x, y, z], or explicit value list via parameter 'tl_heights'" ) # pre-calc data: Timelines._precalc_data( self, algo, self.base_states, self.heights, verbosity, needs_res=True )
[docs] def size(self): """ The total number of states. Returns ------- int: The total number of states """ return self.base_states.size()
[docs] def index(self): """ The index list Returns ------- indices: array_like The index labels of states, or None for default integers """ return self.base_states.index()
[docs] def output_point_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ return self.base_states.output_point_vars(algo)
[docs] def weights(self, algo): """ The statistical weights of all states. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- weights: numpy.ndarray The weights, shape: (n_states, n_turbines) """ return self.base_states.weights(algo)
[docs] def set_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to running, and moves all large data to stash. The stashed data will be returned by the unset_running() function after running calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().set_running(algo, data_stash, sel, isel, verbosity) if sel is not None or isel is not None: data_stash[self.name]["data"] = self.timelines_data if isel is not None: self.timelines_data = self.timelines_data.isel(isel) if sel is not None: self.timelines_data = self.timelines_data.sel(sel)
[docs] def unset_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to not running, recovering large data from stash Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().unset_running(algo, data_stash, sel, isel, verbosity) data = data_stash[self.name] if "data" in data: self.timelines_data = data.pop("data")
[docs] def calc_states_indices(self, algo, mdata, points, hi, ref_xy): n_states, n_points = points.shape[:2] dxy = self.timelines_data["dxy"].to_numpy()[hi] i0 = mdata.states_i0(counter=True) trace_p = points[:, :, :2] - ref_xy[:, :, :2] trace_si = np.zeros((n_states, n_points), dtype=config.dtype_int) trace_si[:] = i0 + np.arange(n_states)[:, None] coeffs = np.full((n_states, n_points), np.nan, dtype=config.dtype_double) # flake8: noqa: F821 def _eval_trace(sel, hdxy=None, hdxy0=None, trs=None): """Helper function that updates trace_done""" nonlocal coeffs # project onto local x direction: hdxy0 = dxy[trace_si[sel]] if hdxy0 is None else hdxy0 nx = hdxy0 / np.linalg.norm(hdxy0, axis=-1)[..., None] projx = np.einsum("...d,...d->...", trace_p[sel], nx) # check for local points: if hdxy is None: seld = (projx >= 1e-10) & (projx <= 1e-10) if np.any(seld): coeffs[sel] = np.where(seld, -1, coeffs[sel]) # check for vicinity to reference plane: else: lx = np.einsum("...d,...d->...", hdxy, nx) seld = ((lx < 0) & (projx >= lx) & (projx <= 0)) | ( (lx > 0) & (projx >= 0) & (projx <= lx) ) if np.any(seld): w = projx / np.abs(lx) coeffs[sel] = np.where(seld, w, coeffs[sel]) # step backwards in time, until projection onto axis is negative: _eval_trace(sel=np.s_[:]) sel = np.isnan(coeffs) tshift = 0 while np.any(sel): trs = trace_si[sel] hdxy = -dxy[trs] trace_p[sel] += hdxy _eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift]) tshift -= 1 sel0 = np.isnan(coeffs) trace_si[sel0 & sel] -= 1 sel = sel0 & (trace_si >= 0) del trs, sel0, hdxy # step forwards in time, until projection onto axis is positive: sel = np.isnan(coeffs) if np.any(sel): trace_p = np.where( sel[:, :, None], points[:, :, :2] - ref_xy[:, :, :2], trace_p ) trace_si = np.where(sel, i0 + np.arange(n_states)[:, None] + 1, trace_si) sel &= trace_si < algo.n_states tshift = 1 while np.any(sel): trs = trace_si[sel] hdxy = dxy[trs] trace_p[sel] += hdxy _eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift]) tshift += 1 sel0 = np.isnan(coeffs) trace_si[sel0 & sel] += 1 sel = sel0 & (trace_si < algo.n_states) del trs, sel0, hdxy return trace_si, coeffs
[docs] def calculate(self, algo, mdata, fdata, tdata): """ The main model calculation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data Returns ------- results: dict The resulting data, keys: output variable str. Values: numpy.ndarray with shape (n_states, n_targets, n_tpoints) """ # prepare: targets = tdata[FC.TARGETS] n_states, n_targets, n_tpoints = targets.shape[:3] n_points = n_targets * n_tpoints points = targets.reshape(n_states, n_points, 3) heights = self.timelines_data["height"].to_numpy() n_heights = len(heights) # compute states indices for all requested points: trace_si = [] coeffs = [] for hi in range(n_heights): s, c = self.calc_states_indices( algo, mdata, points, hi, self.ref_xy[None, None, :] ) trace_si.append(s) coeffs.append(c) del s, c # flake8: noqa: F821 def _interp_time(hi, v): """Helper function for interpolation bewteen states""" sts = trace_si[hi] cfs = coeffs[hi] data = self.timelines_data[v].to_numpy()[hi] out = np.zeros(sts.shape, dtype=config.dtype_double) sel_low = sts < 0 if np.any(sel_low): out[sel_low] = data[0] sel_hi = sts >= algo.n_states if np.any(sel_hi): out[sel_hi] = data[algo.n_states - 1] sel = (~sel_low) & (~sel_hi) & (cfs <= 0) if np.any(sel): s = sts[sel] c = -cfs[sel] out[sel] = c * data[s] + (1 - c) * data[s - 1] sel = (~sel_low) & (~sel_hi) & (cfs > 0) if np.any(sel): s = sts[sel] c = cfs[sel] out[sel] = c * data[s - 1] + (1 - c) * data[s] return out # interpolate to heights: if n_heights > 1: ar_states = np.arange(n_states) ar_points = np.arange(n_points) crds = (heights, ar_states, ar_points) data = { v: np.stack([_interp_time(hi, v) for hi in range(n_heights)], axis=0) for v in self.timelines_data.data_vars.keys() if v != "dxy" } vres = list(data.keys()) data = np.stack(list(data.values()), axis=-1) eval = np.zeros((n_states, n_points, 3), dtype=config.dtype_double) eval[:, :, 0] = points[:, :, 2] eval[:, :, 1] = ar_states[:, None] eval[:, :, 2] = ar_points[None, :] try: ires = interpn(crds, data, eval, **self.intp_pars) except ValueError as e: print(f"\nStates '{self.name}': Interpolation error") print("INPUT VARS: (heights, states, points)") print( "DATA BOUNDS:", [float(np.min(d)) for d in crds], [float(np.max(d)) for d in crds], ) print( "EVAL BOUNDS:", [float(np.min(p)) for p in eval.T], [float(np.max(p)) for p in eval.T], ) print( "\nMaybe you want to try the option 'bounds_error=False'? This will extrapolate the data.\n" ) raise e del crds, eval, data, ar_points, ar_states results = {} for v in self.output_point_vars(algo): if v not in [FV.WS, FV.WD]: results[v] = ires[:, :, vres.index(v)] elif v not in results: uv = np.stack( [ires[:, :, vres.index("U")], ires[:, :, vres.index("V")]], axis=-1, ) results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)} del uv # no dependence on height: else: results = {} for v in self.output_point_vars(algo): if v not in [FV.WS, FV.WD]: results[v] = _interp_time(hi, v) elif v not in results: uv = np.stack( [_interp_time(hi, "U"), _interp_time(hi, "V")], axis=-1 ) results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)} del uv return { v: d.reshape(n_states, n_targets, n_tpoints) for v, d in results.items() }
[docs] class OnePointFlowTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from homogeneous timeseries data at one point :group: input.states """
[docs] def __init__(self, ref_xy, *args, tl_heights=None, **kwargs): """ Constructor. Parameters ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height args: tuple, optional Parameters for the base class tl_heights: list of float, optional The heights at which timelines will be calculated kwargs: dict, optional Parameters for the base class """ if tl_heights is None and len(ref_xy) < 3: tl_heights = [100.0] super().__init__( ref_xy, *args, tl_heights=tl_heights, states_type="Timeseries", **kwargs, )
[docs] class OnePointFlowMultiHeightTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from height dependent homogeneous timeseries data at one point :group: input.states """
[docs] def __init__(self, *args, **kwargs): """ Constructor. Parameters ---------- args: tuple, optional Parameters for the base class kwargs: dict, optional Parameters for the base class """ super().__init__(*args, states_type="MultiHeightTimeseries", **kwargs)
[docs] class OnePointFlowMultiHeightNCTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from height dependent homogeneous timeseries data at one point based on NetCDF input :group: input.states """
[docs] def __init__(self, *args, **kwargs): """ Constructor. Parameters ---------- args: tuple, optional Parameters for the base class kwargs: dict, optional Parameters for the base class """ super().__init__(*args, states_type="MultiHeightNCTimeseries", **kwargs)