Source code for foxes.models.wake_frames.timelines

import numpy as np
from xarray import Dataset

from foxes.core import WakeFrame, MData, FData, TData
from foxes.utils import wd2uv
from foxes.algorithms.iterative import Iterative
from foxes.config import config
import foxes.variables as FV
import foxes.constants as FC


[docs] class Timelines(WakeFrame): """ Dynamic wakes for spatially uniform timeseries states. Attributes ---------- max_length_km: float The maximal wake length in km cl_ipars: dict Interpolation parameters for centre line point interpolation dt_min: float The delta t value in minutes, if not from timeseries data :group: models.wake_frames """
[docs] def __init__(self, max_length_km=2e4, cl_ipars={}, dt_min=None, **kwargs): """ Constructor. Parameters ---------- max_length_km: float The maximal wake length in km cl_ipars: dict Interpolation parameters for centre line point interpolation dt_min: float, optional The delta t value in minutes, if not from timeseries data kwargs: dict, optional Additional parameters for the base class """ super().__init__(max_length_km=max_length_km, **kwargs) self.cl_ipars = cl_ipars self.dt_min = dt_min
[docs] def __repr__(self): return f"{type(self).__name__}(dt_min={self.dt_min}, max_length_km={self.max_length_km})"
def _precalc_data(self, algo, states, heights, verbosity, needs_res=False): """Helper function for pre-calculation of ambient wind vectors""" if verbosity > 0: print(f"{self.name}: Pre-calculating ambient wind vectors") # get and check times: times = np.asarray(states.index()) if self.dt_min is None: if not np.issubdtype(times.dtype, np.datetime64): raise TypeError( f"{self.name}: Expecting state index of type np.datetime64, found {times.dtype}" ) elif len(times) == 1: raise KeyError( f"{self.name}: Expecting 'dt_min' for single step timeseries" ) dt = ( (times[1:] - times[:-1]) .astype("timedelta64[s]") .astype(config.dtype_int) ) else: n = max(len(times) - 1, 1) dt = np.full(n, self.dt_min * 60, dtype="timedelta64[s]").astype( config.dtype_int ) # prepare mdata: data = algo.get_model_data(states)["coords"] mdict = {v: np.array(d) for v, d in data.items()} mdims = {v: (v,) for v in data.keys()} data = algo.get_model_data(states)["data_vars"] mdict.update({v: d[1] for v, d in data.items()}) mdims.update({v: d[0] for v, d in data.items()}) mdata = MData(mdict, mdims, loop_dims=[FC.STATE], states_i0=0) del mdict, mdims, data # prepare fdata: fdata = FData({}, {}, loop_dims=[FC.STATE]) # prepare tdata: n_states = states.size() data = { v: np.zeros((n_states, 1, 1), dtype=config.dtype_double) for v in states.output_point_vars(algo) } pdims = {v: (FC.STATE, FC.TARGET, FC.TPOINT) for v in data.keys()} points = np.zeros((n_states, 1, 3), dtype=config.dtype_double) # calculate all heights: self.timelines_data = {"dxy": (("height", FC.STATE, "dir"), [])} for h in heights: if verbosity > 0: print(f" Height: {h} m") points[..., 2] = h tdata = TData.from_points( points=points, data=data, dims=pdims, ) res = states.calculate(algo, mdata, fdata, tdata) del tdata uv = wd2uv(res[FV.WD], res[FV.WS])[:, 0, 0, :2] if len(dt) == 1: dxy = uv * dt[0] else: dxy = uv[:-1] * dt[:, None] dxy = np.append(dxy, uv[-1, None, :] * dt[-1], axis=0) self.timelines_data["dxy"][1].append(dxy) """ DEBUG import matplotlib.pyplot as plt xy = np.array([np.sum(self.timelines_data[h][:n], axis=0) for n in range(len(self.timelines_data[h]))]) print(xy) plt.plot(xy[:, 0], xy[:, 1]) plt.title(f"Height {h} m") plt.show() quit() """ if needs_res: if "U" not in self.timelines_data: self.timelines_data["U"] = (("height", FC.STATE), []) self.timelines_data["V"] = (("height", FC.STATE), []) self.timelines_data["U"][1].append(uv[:, 0]) self.timelines_data["V"][1].append(uv[:, 1]) for v in states.output_point_vars(algo): if v not in [FV.WS, FV.WD]: if v not in self.timelines_data: self.timelines_data[v] = (("height", FC.STATE), []) self.timelines_data[v][1].append(res[v][:, 0, 0]) del res, uv, dxy self.timelines_data = Dataset( coords={ FC.STATE: states.index(), "height": heights, }, data_vars={ v: (d[0], np.stack(d[1], axis=0)) for v, d in self.timelines_data.items() }, )
[docs] def initialize(self, algo, verbosity=0): """ Initializes the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent """ if not isinstance(algo, Iterative): raise TypeError( f"Incompatible algorithm type {type(algo).__name__}, expecting {Iterative.__name__}" ) super().initialize(algo, verbosity) # find turbine hub heights: t2h = np.zeros(algo.n_turbines, dtype=config.dtype_double) for ti, t in enumerate(algo.farm.turbines): t2h[ti] = ( t.H if t.H is not None else algo.farm_controller.turbine_types[ti].H ) heights = np.unique(t2h) # pre-calc data: from foxes.input.states import OnePointFlowTimeseries if isinstance(algo.states, OnePointFlowTimeseries): self._precalc_data(algo, algo.states.base_states, heights, verbosity) else: self._precalc_data(algo, algo.states, heights, verbosity)
[docs] def set_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to running, and moves all large data to stash. The stashed data will be returned by the unset_running() function after running calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().set_running(algo, data_stash, sel, isel, verbosity) if sel is not None or isel is not None: data_stash[self.name]["data"] = self.timelines_data if isel is not None: self.timelines_data = self.timelines_data.isel(isel) if sel is not None: self.timelines_data = self.timelines_data.sel(sel)
[docs] def unset_running( self, algo, data_stash, sel=None, isel=None, verbosity=0, ): """ Sets this model status to not running, recovering large data from stash Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm data_stash: dict Large data stash, this function adds data here. Key: model name. Value: dict, large model data sel: dict, optional The subset selection dictionary isel: dict, optional The index subset selection dictionary verbosity: int The verbosity level, 0 = silent """ super().unset_running(algo, data_stash, sel, isel, verbosity) data = data_stash[self.name] if "data" in data: self.timelines_data = data.pop("data")
[docs] def calc_order(self, algo, mdata, fdata): """ Calculates the order of turbine evaluation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- order: numpy.ndarray The turbine order, shape: (n_states, n_turbines) """ order = np.zeros((fdata.n_states, fdata.n_turbines), dtype=config.dtype_int) order[:] = np.arange(fdata.n_turbines)[None, :] return order
[docs] def get_wake_coos( self, algo, mdata, fdata, tdata, downwind_index, ): """ Calculate wake coordinates of rotor points. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data downwind_index: int The index of the wake causing turbine in the downwind order Returns ------- wake_coos: numpy.ndarray The wake frame coordinates of the evaluation points, shape: (n_states, n_targets, n_tpoints, 3) """ # prepare: targets = tdata[FC.TARGETS] n_states, n_targets, n_tpoints = targets.shape[:3] n_points = n_targets * n_tpoints points = targets.reshape(n_states, n_points, 3) rxyz = fdata[FV.TXYH][:, downwind_index] theights = fdata[FV.H][:, downwind_index] heights = self.timelines_data["height"].to_numpy() data_dxy = self.timelines_data["dxy"].to_numpy() D = np.zeros((n_states, n_points), dtype=config.dtype_double) D[:] = fdata[FV.D][:, downwind_index, None] wcoos = np.full((n_states, n_points, 3), 1e20, dtype=config.dtype_double) wcoosx = wcoos[:, :, 0] wcoosy = wcoos[:, :, 1] wcoos[:, :, 2] = points[:, :, 2] - rxyz[:, None, 2] i0 = mdata.states_i0(counter=True) i1 = i0 + mdata.n_states trace_si = np.zeros((n_states, n_points), dtype=config.dtype_int) trace_si[:] = i0 + np.arange(n_states)[:, None] for hi, h in enumerate(heights): dxy = data_dxy[hi][:i1] precond = theights[:, None] == h trace_p = np.zeros((n_states, n_points, 2), dtype=config.dtype_double) trace_p[:] = points[:, :, :2] - rxyz[:, None, :2] trace_l = np.zeros((n_states, n_points), dtype=config.dtype_double) trace_d = np.full((n_states, n_points), np.inf, dtype=config.dtype_double) h_trace_si = trace_si.copy() # flake8: noqa: F821 def _update_wcoos(sel): """Local function that updates coordinates and source times""" nonlocal wcoosx, wcoosy, trace_si d = np.linalg.norm(trace_p, axis=-1) sel = sel & (d <= trace_d) if np.any(sel): trace_d[sel] = d[sel] nx = dxy[h_trace_si[sel]] dx = np.linalg.norm(nx, axis=-1) nx /= dx[:, None] trp = trace_p[sel] projx = np.einsum("sd,sd->s", trp, nx) seln = (projx > -dx) & (projx < dx) if np.any(seln): wcoosx[sel] = np.where(seln, projx + trace_l[sel], wcoosx[sel]) ny = np.concatenate([-nx[:, 1, None], nx[:, 0, None]], axis=1) projy = np.einsum("sd,sd->s", trp, ny) wcoosy[sel] = np.where(seln, projy, wcoosy[sel]) del ny, projy trace_si[sel] = np.where(seln, h_trace_si[sel], trace_si[sel]) # step backwards in time, until wake source turbine is hit: _update_wcoos(precond) while True: sel = precond & (h_trace_si > 0) & (trace_l < self.max_length_km * 1e3) if np.any(sel): h_trace_si[sel] -= 1 delta = dxy[h_trace_si[sel]] dmag = np.linalg.norm(delta, axis=-1) trace_p[sel] -= delta trace_l[sel] += dmag del delta, dmag # check if this is closer to turbine: _update_wcoos(sel) del sel else: del sel break del trace_p, trace_l, trace_d, h_trace_si, dxy, precond # store turbines that cause wake: trace_si = np.minimum(trace_si, i0 + np.arange(n_states)[:, None]) tdata[FC.STATE_SOURCE_ORDERI] = downwind_index # store states that cause wake for each target point, # will be used by model.get_data() during wake calculation: tdata.add( FC.STATES_SEL, trace_si.reshape(n_states, n_targets, n_tpoints), (FC.STATE, FC.TARGET, FC.TPOINT), ) return wcoos.reshape(n_states, n_targets, n_tpoints, 3)
[docs] def get_centreline_points(self, algo, mdata, fdata, downwind_index, x): """ Gets the points along the centreline for given values of x. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data downwind_index: int The index in the downwind order x: numpy.ndarray The wake frame x coordinates, shape: (n_states, n_points) Returns ------- points: numpy.ndarray The centreline points, shape: (n_states, n_points, 3) """ # prepare: n_states, n_points = x.shape rxyz = fdata[FV.TXYH][:, downwind_index] theights = fdata[FV.H][:, downwind_index] heights = self.timelines_data["height"].to_numpy() data_dxy = self.timelines_data["dxy"].to_numpy() points = np.zeros((n_states, n_points, 3), dtype=config.dtype_double) points[:] = rxyz[:, None, :] trace_dp = np.zeros_like(points[..., :2]) trace_l = x.copy() trace_si = np.zeros((n_states, n_points), dtype=config.dtype_int) trace_si[:] = np.arange(n_states)[:, None] for hi, h in enumerate(heights): precond = theights == h if np.any(precond): sel = precond[:, None] & (trace_l > 0) while np.any(sel): dxy = data_dxy[hi][trace_si[sel]] trl = trace_l[sel] trp = trace_dp[sel] dl = np.linalg.norm(dxy, axis=-1) cl = np.abs(trl - dl) < np.abs(trl) if np.any(cl): trace_l[sel] = np.where(cl, trl - dl, trl) trace_dp[sel] = np.where(cl[:, None], trp + dxy, trp) del trl, trp, dl, cl, dxy trace_si[sel] -= 1 sel = precond[:, None] & (trace_l > 0) & (trace_si >= 0) si = trace_si[precond] + 1 dxy = data_dxy[hi][si] dl = np.linalg.norm(dxy, axis=-1)[:, :, None] trl = trace_l[precond][:, :, None] trp = trace_dp[precond] sel = np.abs(trl) < 2 * dl trace_dp[precond] = np.where(sel, trp - trl / dl * dxy, np.nan) del si, dxy, dl, trl, trp, sel del precond del trace_si, trace_l points[..., :2] += trace_dp return points
[docs] def finalize(self, algo, verbosity=0): """ Finalizes the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent """ super().finalize(algo, verbosity=verbosity) self.timelines_data = None