Source code for foxes.models.wake_frames.streamlines

import numpy as np
from scipy.interpolate import interpn

from foxes.core import WakeFrame
from foxes.utils import wd2uv
from foxes.core.data import TData
from foxes.config import config
import foxes.variables as FV
import foxes.constants as FC


[docs] class Streamlines2D(WakeFrame): """ Streamline following wakes Attributes ---------- step: float The streamline step size in m cl_ipars: dict Interpolation parameters for centre line point interpolation :group: models.wake_frames """
[docs] def __init__(self, step, max_length_km=20, cl_ipars={}, **kwargs): """ Constructor. Parameters ---------- step: float The streamline step size in m max_length_km: float The maximal streamline length in km cl_ipars: dict Interpolation parameters for centre line point interpolation kwargs: dict, optional Additional parameters for the base class """ super().__init__(max_length_km=max_length_km, **kwargs) self.step = step self.cl_ipars = cl_ipars self.DATA = self.var("DATA") self.STEPS = self.var("STEPS") self.SDAT = self.var("SDAT")
[docs] def __repr__(self): return ( f"{type(self).__name__}(step={self.step}, max_length={self.max_length_km})" )
def _calc_streamlines(self, algo, mdata, fdata): """ Helper function that computes all streamline data """ # prepare: n_states = mdata.n_states n_turbines = mdata.n_turbines N = int(self.max_length_km * 1e3 / self.step) # calc data: x, y, z, wd data = np.zeros((n_states, n_turbines, N, 4), dtype=config.dtype_double) for i in range(N): # set streamline start point data (rotor centre): if i == 0: data[:, :, i, :3] = fdata[FV.TXYH] data[:, :, i, 3] = fdata[FV.AMB_WD] # compute next step: else: # calculate next point: xyz = data[:, :, i - 1, :3] n = wd2uv(data[:, :, i - 1, 3]) data[:, :, i, :2] = xyz[:, :, :2] + self.step * n data[:, :, i, 2] = xyz[:, :, 2] # calculate next tangential vector: svars = algo.states.output_point_vars(algo) tdata = TData.from_points( data[:, :, i, :3], data={ v: np.full( (n_states, n_turbines, 1), np.nan, dtype=config.dtype_double ) for v in svars }, dims={v: (FC.STATE, FC.TARGET, FC.TPOINT) for v in svars}, ) data[:, :, i, 3] = algo.states.calculate(algo, mdata, fdata, tdata)[ FV.WD ][:, :, 0] sel = np.isnan(data[:, :, i, 3]) if np.any(sel): data[sel, i, 3] = data[sel, i - 1, 3] return data
[docs] def get_streamline_data(self, algo, mdata, fdata): """ Gets streamline data, generating it on the fly Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- data: numpy.ndarray The streamline data, shape: (n_states, n_turbines, n_steps, 4) with data x, y, z, wd """ if self.DATA not in mdata or not np.all( mdata[self.DATA][:, :, 0, :3] == fdata[FV.TXYH] ): mdata[self.DATA] = self._calc_streamlines(algo, mdata, fdata) mdata.dims[self.DATA] = (FC.STATE, FC.TURBINE, self.STEPS, self.SDAT) return mdata[self.DATA]
def _calc_coos(self, algo, mdata, fdata, targets, downwind_index): """ Helper function, calculates streamline coordinates for given points and given turbine """ # prepare: n_states, n_targets, n_tpoints = targets.shape[:3] n_points = n_targets * n_tpoints points = targets.reshape(n_states, n_points, 3) # find nearest streamline points: data = self.get_streamline_data(algo, mdata, fdata)[:, downwind_index] dists = np.linalg.norm(points[:, :, None, :2] - data[:, None, :, :2], axis=-1) selp = np.argmin(dists, axis=2) data = np.take_along_axis(data[:, None], selp[:, :, None, None], axis=2)[ :, :, 0 ] slen = self.step * selp del dists, selp # calculate coordinates: coos = np.full((n_states, n_points, 3), np.nan, dtype=config.dtype_double) coos[:, :, 2] = points[:, :, 2] - data[:, :, 2] delta = points[:, :, :2] - data[:, :, :2] nx = wd2uv(data[:, :, 3]) projx = np.einsum("spd,spd->sp", delta, nx) sel = (projx > -self.step) & (projx < self.step) if np.any(sel): ny = np.stack([-nx[:, :, 1], nx[:, :, 0]], axis=2) coos[sel, 0] = slen[sel] + projx[sel] coos[sel, 1] = np.einsum("spd,spd->sp", delta, ny)[sel] return coos.reshape(n_states, n_targets, n_tpoints, 3)
[docs] def calc_order(self, algo, mdata, fdata): """ Calculates the order of turbine evaluation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- order: numpy.ndarray The turbine order, shape: (n_states, n_turbines) """ # prepare: n_states = fdata.n_states n_turbines = algo.n_turbines tdata = TData.from_points(points=fdata[FV.TXYH]) # calculate streamline x coordinates for turbines rotor centre points: # n_states, n_turbines_source, n_turbines_target coosx = np.zeros((n_states, n_turbines, n_turbines), dtype=config.dtype_double) for ti in range(n_turbines): coosx[:, ti, :] = self.get_wake_coos(algo, mdata, fdata, tdata, ti)[ :, :, 0, 0 ] # derive turbine order: # TODO: Remove loop over states order = np.zeros((n_states, n_turbines), dtype=config.dtype_int) for si in range(n_states): order[si] = np.lexsort(keys=coosx[si]) return order
[docs] def get_wake_coos( self, algo, mdata, fdata, tdata, downwind_index, ): """ Calculate wake coordinates of rotor points. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data downwind_index: int The index of the wake causing turbine in the downwind order Returns ------- wake_coos: numpy.ndarray The wake frame coordinates of the evaluation points, shape: (n_states, n_targets, n_tpoints, 3) """ return self._calc_coos(algo, mdata, fdata, tdata[FC.TARGETS], downwind_index)
[docs] def get_centreline_points(self, algo, mdata, fdata, downwind_index, x): """ Gets the points along the centreline for given values of x. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data downwind_index: int The index in the downwind order x: numpy.ndarray The wake frame x coordinates, shape: (n_states, n_points) Returns ------- points: numpy.ndarray The centreline points, shape: (n_states, n_points, 3) """ # get streamline points: n_states, n_points = x.shape data = self.get_streamline_data(algo, mdata, fdata)[:, downwind_index] spts = data[:, :, :3] n_spts = spts.shape[1] xs = self.step * np.arange(n_spts) # interpolate to x of interest: qts = np.zeros((n_states, n_points, 2), dtype=config.dtype_double) qts[:, :, 0] = np.arange(n_states)[:, None] qts[:, :, 1] = np.minimum(x, xs[-1]) qts = qts.reshape(n_states * n_points, 2) ipars = dict(bounds_error=False, fill_value=0.0) ipars.update(self.cl_ipars) results = interpn((np.arange(n_states), xs), spts, qts, **ipars) return results.reshape(n_states, n_points, 3)