Source code for foxes.input.states.one_point_flow

import numpy as np
from scipy.interpolate import interpn

from foxes.core import States
from foxes.utils import uv2wd
from foxes.models.wake_frames.timelines import Timelines
from foxes.config import config
import foxes.variables as FV
import foxes.constants as FC


[docs] class OnePointFlowStates(States): """ Time-evolving states based on horizontally homogeneous timeseries data Attributes ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height tl_heights: list of float The heights at which timelines will be calculated dt_min: float The delta t value in minutes, if not from timeseries data intp_pars: dict Parameters for height interpolation with scipy.interpolate.interpn :group: input.states """
[docs] def __init__( self, ref_xy, *base_states_args, base_states=None, tl_heights=None, dt_min=None, **base_states_kwargs, ): """ Constructor. Parameters ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height base_states_args: tuple, optional Arguments for creating the base states from States.new(), if not given as base_states base_states: foxes.core.States, optional The base states, representing horizontally homogeneous inflow tl_heights: list of float, optional The heights at which timelines will be calculated dt_min: float, optional The delta t value in minutes, if not from timeseries data base_states_kwargs: dict, optional Arguments for creating the base states from States.new(), if not given as base_states """ super().__init__() self.ref_xy = np.array(ref_xy, dtype=config.dtype_double) self.heights = tl_heights self.base_states = base_states self.dt_min = dt_min self.intp_pars = {"fill_value": None} if "bounds_error" in base_states_kwargs: self.intp_pars["bounds_error"] = base_states_kwargs["bounds_error"] if base_states is not None and len(base_states_kwargs): raise KeyError( f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_state_pars {list(base_states_kwargs.keys())}" ) elif base_states is not None and len(base_states_args): raise KeyError( f"Base states of type '{type(base_states).__name__}' were given, cannot handle base_states_args of types {[type(a).__name__ for a in base_states_args]}" ) elif base_states is None: self.base_states = States.new(*base_states_args, **base_states_kwargs)
[docs] def __repr__(self): return f"{type(self).__name__}(base={type(self.base_states).__name__}, heights={self.heights}, dt_min={self.dt_min})"
[docs] def sub_models(self): """ List of all sub-models Returns ------- smdls: list of foxes.core.Model All sub models """ return [self.base_states]
[docs] def load_data(self, algo, verbosity=0): """ Load and/or create all model data that is subject to chunking. Such data should not be stored under self, for memory reasons. The data returned here will automatically be chunked and then provided as part of the mdata object during calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent Returns ------- idata: dict The dict has exactly two entries: `data_vars`, a dict with entries `name_str -> (dim_tuple, data_ndarray)`; and `coords`, a dict with entries `dim_name_str -> dim_array` """ # find heights: if self.heights is None: if hasattr(self.base_states, "heights"): self.heights = self.base_states.heights elif len(self.ref_xy) > 2: self.heights = [self.ref_xy[2]] else: raise KeyError( f"Cannot find 'heights' in base states of type '{type(self.base_states).__name__}', missing either `ref_xy` of type [x, y, z], or explicit value list via parameter 'tl_heights'" ) # pre-calc data: self.WEIGHT = self.var(FV.WEIGHT) idata = super().load_data(algo, verbosity) idata["data_vars"][self.WEIGHT] = Timelines._precalc_data( self, algo, self.base_states, self.heights, verbosity, needs_res=True ) return idata
[docs] def size(self): """ The total number of states. Returns ------- int: The total number of states """ return self.base_states.size()
[docs] def index(self): """ The index list Returns ------- indices: array_like The index labels of states, or None for default integers """ return self.base_states.index()
[docs] def output_point_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ return self.base_states.output_point_vars(algo)
[docs] def set_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to running, and moves all large data to stash. The stashed data will be returned by the unset_running() function after running calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().set_running(algo, data_stash, sel, isel, verbosity) if sel is not None or isel is not None: data_stash[self.name]["data"] = self.timelines_data if isel is not None: self.timelines_data = self.timelines_data.isel(isel) if sel is not None: self.timelines_data = self.timelines_data.sel(sel)
[docs] def unset_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to not running, recovering large data from stash Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().unset_running(algo, data_stash, sel, isel, verbosity) data = data_stash[self.name] if "data" in data: self.timelines_data = data.pop("data")
[docs] def calc_states_indices(self, algo, mdata, points, hi, ref_xy): n_states, n_points = points.shape[:2] dxy = self.timelines_data["dxy"].to_numpy()[hi] i0 = mdata.states_i0(counter=True) trace_p = points[:, :, :2] - ref_xy[:, :, :2] trace_si = np.zeros((n_states, n_points), dtype=config.dtype_int) trace_si[:] = i0 + np.arange(n_states)[:, None] coeffs = np.full((n_states, n_points), np.nan, dtype=config.dtype_double) # flake8: noqa: F821 def _eval_trace(sel, hdxy=None, hdxy0=None, trs=None): """Helper function that updates trace_done""" nonlocal coeffs # project onto local x direction: hdxy0 = dxy[trace_si[sel]] if hdxy0 is None else hdxy0 nx = hdxy0 / np.linalg.norm(hdxy0, axis=-1)[..., None] projx = np.einsum("...d,...d->...", trace_p[sel], nx) # check for local points: if hdxy is None: seld = (projx >= 1e-10) & (projx <= 1e-10) if np.any(seld): coeffs[sel] = np.where(seld, -1, coeffs[sel]) # check for vicinity to reference plane: else: lx = np.einsum("...d,...d->...", hdxy, nx) seld = ((lx < 0) & (projx >= lx) & (projx <= 0)) | ( (lx > 0) & (projx >= 0) & (projx <= lx) ) if np.any(seld): w = projx / np.abs(lx) coeffs[sel] = np.where(seld, w, coeffs[sel]) # step backwards in time, until projection onto axis is negative: _eval_trace(sel=np.s_[:]) sel = np.isnan(coeffs) tshift = 0 while np.any(sel): trs = trace_si[sel] hdxy = -dxy[trs] trace_p[sel] += hdxy _eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift]) tshift -= 1 sel0 = np.isnan(coeffs) trace_si[sel0 & sel] -= 1 sel = sel0 & (trace_si >= 0) del trs, sel0, hdxy # step forwards in time, until projection onto axis is positive: sel = np.isnan(coeffs) if np.any(sel): trace_p = np.where( sel[:, :, None], points[:, :, :2] - ref_xy[:, :, :2], trace_p ) trace_si = np.where(sel, i0 + np.arange(n_states)[:, None] + 1, trace_si) sel &= trace_si < algo.n_states tshift = 1 while np.any(sel): trs = trace_si[sel] hdxy = dxy[trs] trace_p[sel] += hdxy _eval_trace(sel, hdxy=hdxy, hdxy0=dxy[trs - tshift]) tshift += 1 sel0 = np.isnan(coeffs) trace_si[sel0 & sel] += 1 sel = sel0 & (trace_si < algo.n_states) del trs, sel0, hdxy return trace_si, coeffs
[docs] def calculate(self, algo, mdata, fdata, tdata): """ The main model calculation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data Returns ------- results: dict The resulting data, keys: output variable str. Values: numpy.ndarray with shape (n_states, n_targets, n_tpoints) """ # prepare: targets = tdata[FC.TARGETS] n_states, n_targets, n_tpoints = targets.shape[:3] n_points = n_targets * n_tpoints points = targets.reshape(n_states, n_points, 3) heights = self.timelines_data["height"].to_numpy() n_heights = len(heights) # compute states indices for all requested points: trace_si = [] coeffs = [] for hi in range(n_heights): s, c = self.calc_states_indices( algo, mdata, points, hi, self.ref_xy[None, None, :] ) trace_si.append(s) coeffs.append(c) del s, c # flake8: noqa: F821 def _interp_time(hi, v): """Helper function for interpolation bewteen states""" sts = trace_si[hi] cfs = coeffs[hi] data = self.timelines_data[v].to_numpy()[hi] out = np.zeros(sts.shape, dtype=config.dtype_double) sel_low = sts < 0 if np.any(sel_low): out[sel_low] = data[0] sel_hi = sts >= algo.n_states if np.any(sel_hi): out[sel_hi] = data[algo.n_states - 1] sel = (~sel_low) & (~sel_hi) & (cfs <= 0) if np.any(sel): s = sts[sel] c = -cfs[sel] out[sel] = c * data[s] + (1 - c) * data[s - 1] sel = (~sel_low) & (~sel_hi) & (cfs > 0) if np.any(sel): s = sts[sel] c = cfs[sel] out[sel] = c * data[s - 1] + (1 - c) * data[s] return out # interpolate to heights: if n_heights > 1: ar_states = np.arange(n_states) ar_points = np.arange(n_points) crds = (heights, ar_states, ar_points) data = { v: np.stack([_interp_time(hi, v) for hi in range(n_heights)], axis=0) for v in self.timelines_data.data_vars.keys() if v != "dxy" } vres = list(data.keys()) data = np.stack(list(data.values()), axis=-1) eval = np.zeros((n_states, n_points, 3), dtype=config.dtype_double) eval[:, :, 0] = points[:, :, 2] eval[:, :, 1] = ar_states[:, None] eval[:, :, 2] = ar_points[None, :] try: ires = interpn(crds, data, eval, **self.intp_pars) except ValueError as e: print(f"\nStates '{self.name}': Interpolation error") print("INPUT VARS: (heights, states, points)") print( "DATA BOUNDS:", [float(np.min(d)) for d in crds], [float(np.max(d)) for d in crds], ) print( "EVAL BOUNDS:", [float(np.min(p)) for p in eval.T], [float(np.max(p)) for p in eval.T], ) print( "\nMaybe you want to try the option 'bounds_error=False'? This will extrapolate the data.\n" ) raise e del crds, eval, data, ar_points, ar_states results = {} for v in self.output_point_vars(algo): if v not in [FV.WS, FV.WD]: results[v] = ires[:, :, vres.index(v)] elif v not in results: uv = np.stack( [ires[:, :, vres.index("U")], ires[:, :, vres.index("V")]], axis=-1, ) results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)} del uv # no dependence on height: else: results = {} for v in self.output_point_vars(algo): if v not in [FV.WS, FV.WD]: results[v] = _interp_time(hi, v) elif v not in results: uv = np.stack( [_interp_time(hi, "U"), _interp_time(hi, "V")], axis=-1 ) results = {FV.WD: uv2wd(uv), FV.WS: np.linalg.norm(uv, axis=-1)} del uv # set weights: tdata[FV.WEIGHT] = mdata[self.WEIGHT][:, None, None] tdata.dims[FV.WEIGHT] = (FC.STATE, FC.TARGET, FC.TPOINT) return { v: d.reshape(n_states, n_targets, n_tpoints) for v, d in results.items() }
[docs] class OnePointFlowTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from homogeneous timeseries data at one point :group: input.states """
[docs] def __init__(self, ref_xy, *args, tl_heights=None, **kwargs): """ Constructor. Parameters ---------- ref_xy: list of float The [x, y] or [x, y, z] coordinates of the base states. If [x, y, z] then z will serve as height args: tuple, optional Parameters for the base class tl_heights: list of float, optional The heights at which timelines will be calculated kwargs: dict, optional Parameters for the base class """ if tl_heights is None and len(ref_xy) < 3: tl_heights = [100.0] super().__init__( ref_xy, *args, tl_heights=tl_heights, states_type="Timeseries", **kwargs, )
[docs] class OnePointFlowMultiHeightTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from height dependent homogeneous timeseries data at one point :group: input.states """
[docs] def __init__(self, *args, **kwargs): """ Constructor. Parameters ---------- args: tuple, optional Parameters for the base class kwargs: dict, optional Parameters for the base class """ super().__init__(*args, states_type="MultiHeightTimeseries", **kwargs)
[docs] class OnePointFlowMultiHeightNCTimeseries(OnePointFlowStates): """ Inhomogeneous inflow from height dependent homogeneous timeseries data at one point based on NetCDF input :group: input.states """
[docs] def __init__(self, *args, **kwargs): """ Constructor. Parameters ---------- args: tuple, optional Parameters for the base class kwargs: dict, optional Parameters for the base class """ super().__init__(*args, states_type="MultiHeightNCTimeseries", **kwargs)