Source code for foxes.models.wake_frames.seq_dynamic_wakes

import numpy as np
from scipy.spatial.distance import cdist

from foxes.utils import wd2uv
from foxes.core.data import TData
from foxes.config import config
from foxes.algorithms.sequential import Sequential
import foxes.variables as FV
import foxes.constants as FC

from .farm_order import FarmOrder


[docs] class SeqDynamicWakes(FarmOrder): """ Dynamic wakes for the sequential algorithm. Attributes ---------- cl_ipars: dict Interpolation parameters for centre line point interpolation dt_min: float, optional The delta t value in minutes, if not from timeseries data :group: models.wake_frames.sequential """
[docs] def __init__(self, cl_ipars={}, dt_min=None, **kwargs): """ Constructor. Parameters ---------- cl_ipars: dict Interpolation parameters for centre line point interpolation dt_min: float, optional The delta t value in minutes, if not from timeseries data kwargs: dict, optional Additional parameters for the base class """ super().__init__(**kwargs) self.cl_ipars = cl_ipars self.dt_min = dt_min
[docs] def __repr__(self): return f"{type(self).__name__}(dt_min={self.dt_min})"
[docs] def initialize(self, algo, verbosity=0): """ Initializes the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent """ super().initialize(algo, verbosity) if not isinstance(algo, Sequential): raise TypeError( f"Incompatible algorithm type {type(algo).__name__}, expecting {Sequential.__name__}" ) # determine time step: times = np.asarray(algo.states.index()) if self.dt_min is None: if not np.issubdtype(times.dtype, np.datetime64): raise TypeError( f"{self.name}: Expecting state index of type np.datetime64, found {times.dtype}" ) elif len(times) == 1: raise KeyError( f"{self.name}: Expecting 'dt_min' for single step timeseries" ) self._dt = ( (times[1:] - times[:-1]) .astype("timedelta64[s]") .astype(config.dtype_int) ) else: n = max(len(times) - 1, 1) self._dt = np.full(n, self.dt_min * 60, dtype="timedelta64[s]").astype( config.dtype_int ) # init wake traces data: self._traces_p = np.zeros( (algo.n_states, algo.n_turbines, 3), dtype=config.dtype_double ) self._traces_v = np.zeros( (algo.n_states, algo.n_turbines, 3), dtype=config.dtype_double ) self._traces_l = np.full( (algo.n_states, algo.n_turbines), np.nan, dtype=config.dtype_double )
[docs] def calc_order(self, algo, mdata, fdata): """ Calculates the order of turbine evaluation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- order: numpy.ndarray The turbine order, shape: (n_states, n_turbines) """ return super().calc_order(algo, mdata, fdata)
[docs] def get_wake_coos( self, algo, mdata, fdata, tdata, downwind_index, ): """ Calculate wake coordinates of rotor points. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data downwind_index: int The index of the wake causing turbine in the downwind order Returns ------- wake_coos: numpy.ndarray The wake frame coordinates of the evaluation points, shape: (n_states, n_targets, n_tpoints, 3) """ # prepare: n_states = 1 n_targets = tdata.n_targets n_tpoints = tdata.n_tpoints n_points = n_targets * n_tpoints points = tdata[FC.TARGETS].reshape(n_states, n_points, 3) counter = algo.states.counter N = counter + 1 if np.isnan(self._traces_l[counter, downwind_index]): # new wake starts at turbine: self._traces_p[counter, downwind_index][:] = fdata[FV.TXYH][ 0, downwind_index ] self._traces_l[counter, downwind_index] = 0 # transport wakes that originate from previous time steps: if counter > 0: dxyz = self._traces_v[:counter, downwind_index] * self._dt[counter - 1] self._traces_p[:counter, downwind_index] += dxyz self._traces_l[:counter, downwind_index] += np.linalg.norm( dxyz, axis=-1 ) del dxyz # compute wind vectors at wake traces: # TODO: dz from U_z is missing here hpdata = TData.from_points(points=self._traces_p[None, :N, downwind_index]) res = algo.states.calculate(algo, mdata, fdata, hpdata) self._traces_v[:N, downwind_index, :2] = wd2uv( res[FV.WD][0, :, 0], res[FV.WS][0, :, 0] ) del hpdata, res # find nearest wake point: dists = cdist(points[0], self._traces_p[:N, downwind_index]) tri = np.argmin(dists, axis=1) del dists # project: wcoos = np.full((n_states, n_points, 3), 1e20, dtype=config.dtype_double) wcoos[0, :, 2] = points[0, :, 2] - fdata[FV.TXYH][0, downwind_index, None, 2] nx = self._traces_v[tri, downwind_index, :2] mv = np.linalg.norm(nx, axis=-1) nx /= mv[:, None] delp = points[0, :, :2] - self._traces_p[tri, downwind_index, :2] projx = np.einsum("pd,pd->p", delp, nx) dt = self._dt[counter] if counter < len(self._dt) else self._dt[-1] dx = mv * dt sel = (projx > -dx) & (projx < dx) if np.any(sel): ny = np.concatenate([-nx[:, 1, None], nx[:, 0, None]], axis=1) wcoos[0, sel, 0] = projx[sel] + self._traces_l[tri[sel], downwind_index] wcoos[0, sel, 1] = np.einsum("pd,pd->p", delp, ny)[sel] del ny del delp, projx, mv, dx, nx, sel # turbines that cause wake: tdata[FC.STATE_SOURCE_ORDERI] = downwind_index # states that cause wake for each target point: tdata.add( FC.STATES_SEL, tri[None, :].reshape(n_states, n_targets, n_tpoints), (FC.STATE, FC.TARGET, FC.TPOINT), ) return wcoos.reshape(n_states, n_targets, n_tpoints, 3)
[docs] def get_wake_modelling_data( self, algo, variable, downwind_index, fdata, tdata, target, states0=None, ): """ Return data that is required for computing the wake from source turbines to evaluation points. Parameters ---------- algo: foxes.core.Algorithm, optional The algorithm, needed for data from previous iteration variable: str The variable, serves as data key downwind_index: int, optional The index in the downwind order fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data target: str, optional The dimensions identifier for the output, FC.STATE_TARGET, FC.STATE_TARGET_TPOINT states0: numpy.ndarray, optional The states of wake creation Returns ------- data: numpy.ndarray Data for wake modelling, shape: (n_states, n_turbines) or (n_states, n_target) """ if states0 is None and FC.STATE_SOURCE_ORDERI in tdata: # from previous iteration: if downwind_index != tdata[FC.STATE_SOURCE_ORDERI]: raise ValueError( f"Model '{self.name}': Mismatch of '{FC.STATE_SOURCE_ORDERI}'. Expected {tdata[FC.STATE_SOURCE_ORDERI]}, got {downwind_index}" ) n_states = 1 n_targets = tdata.n_targets n_tpoints = tdata.n_tpoints n_points = n_targets * n_tpoints s = tdata[FC.STATES_SEL][0].reshape(n_points) data = algo.farm_results_downwind[variable].to_numpy() data[algo.counter] = fdata[variable][0] data = data[s, downwind_index].reshape(n_states, n_targets, n_tpoints) if target == FC.STATE_TARGET: if n_tpoints == 1: data = data[:, :, 0] else: data = np.einsum("stp,p->st", data, tdata[FC.TWEIGHTS]) return data elif target == FC.STATE_TARGET_TPOINT: return data else: raise ValueError( f"Cannot handle target '{target}', choices are {FC.STATE_TARGET}, {FC.STATE_TARGET_TPOINT}" ) else: return super().get_wake_modelling_data( algo, variable, downwind_index, fdata, tdata, target, states0 )
[docs] def get_centreline_points(self, algo, mdata, fdata, downwind_index, x): """ Gets the points along the centreline for given values of x. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data downwind_index: int The index in the downwind order x: numpy.ndarray The wake frame x coordinates, shape: (n_states, n_points) Returns ------- points: numpy.ndarray The centreline points, shape: (n_states, n_points, 3) """ raise NotImplementedError