Source code for foxes.models.wake_frames.streamlines

import numpy as np
from scipy.interpolate import interpn

from foxes.core import WakeFrame
from foxes.utils import wd2uv
from foxes.core.data import TData
import foxes.variables as FV
import foxes.constants as FC


[docs] class Streamlines2D(WakeFrame): """ Streamline following wakes Attributes ---------- step: float The streamline step size in m max_length: float The maximal streamline length cl_ipars: dict Interpolation parameters for centre line point interpolation :group: models.wake_frames """
[docs] def __init__(self, step, max_length=1e4, cl_ipars={}): """ Constructor. Parameters ---------- step: float The streamline step size in m max_length: float The maximal streamline length cl_ipars: dict Interpolation parameters for centre line point interpolation """ super().__init__() self.step = step self.max_length = max_length self.cl_ipars = cl_ipars self.DATA = self.var("DATA") self.STEPS = self.var("STEPS") self.SDAT = self.var("SDAT")
[docs] def __repr__(self): return f"{type(self).__name__}(step={self.step})"
def _calc_streamlines(self, algo, mdata, fdata): """ Helper function that computes all streamline data """ # prepare: n_states = mdata.n_states n_turbines = mdata.n_turbines N = int(self.max_length / self.step) # calc data: x, y, z, wd data = np.zeros((n_states, n_turbines, N, 4), dtype=FC.DTYPE) for i in range(N): # set streamline start point data (rotor centre): if i == 0: data[:, :, i, :3] = fdata[FV.TXYH] data[:, :, i, 3] = fdata[FV.AMB_WD] # compute next step: else: # calculate next point: xyz = data[:, :, i - 1, :3] n = wd2uv(data[:, :, i - 1, 3]) data[:, :, i, :2] = xyz[:, :, :2] + self.step * n data[:, :, i, 2] = xyz[:, :, 2] # calculate next tangential vector: svars = algo.states.output_point_vars(algo) tdata = TData.from_points( data[:, :, i, :3], data={ v: np.full((n_states, n_turbines, 1), np.nan, dtype=FC.DTYPE) for v in svars }, dims={v: (FC.STATE, FC.TARGET, FC.TPOINT) for v in svars}, ) data[:, :, i, 3] = algo.states.calculate(algo, mdata, fdata, tdata)[ FV.WD ][:, :, 0] sel = np.isnan(data[:, :, i, 3]) if np.any(sel): data[sel, i, 3] = data[sel, i - 1, 3] return data
[docs] def get_streamline_data(self, algo, mdata, fdata): """ Gets streamline data, generating it on the fly Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- data: numpy.ndarray The streamline data, shape: (n_states, n_turbines, n_steps, 4) with data x, y, z, wd """ if self.DATA not in mdata or not np.all( mdata[self.DATA][:, :, 0, :3] == fdata[FV.TXYH] ): mdata[self.DATA] = self._calc_streamlines(algo, mdata, fdata) mdata.dims[self.DATA] = (FC.STATE, FC.TURBINE, self.STEPS, self.SDAT) return mdata[self.DATA]
def _calc_coos(self, algo, mdata, fdata, targets, downwind_index): """ Helper function, calculates streamline coordinates for given points and given turbine """ # prepare: n_states, n_targets, n_tpoints = targets.shape[:3] n_points = n_targets * n_tpoints points = targets.reshape(n_states, n_points, 3) # find nearest streamline points: data = self.get_streamline_data(algo, mdata, fdata)[:, downwind_index] dists = np.linalg.norm(points[:, :, None, :2] - data[:, None, :, :2], axis=-1) selp = np.argmin(dists, axis=2) data = np.take_along_axis(data[:, None], selp[:, :, None, None], axis=2)[ :, :, 0 ] slen = self.step * selp del dists, selp # calculate coordinates: coos = np.zeros((n_states, n_points, 3), dtype=FC.DTYPE) nx = wd2uv(data[:, :, 3]) ny = np.stack([-nx[:, :, 1], nx[:, :, 0]], axis=2) delta = points[:, :, :2] - data[:, :, :2] coos[:, :, 0] = slen + np.einsum("spd,spd->sp", delta, nx) coos[:, :, 1] = np.einsum("spd,spd->sp", delta, ny) coos[:, :, 2] = points[:, :, 2] - data[:, :, 2] return coos.reshape(n_states, n_targets, n_tpoints, 3)
[docs] def calc_order(self, algo, mdata, fdata): """ " Calculates the order of turbine evaluation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- order: numpy.ndarray The turbine order, shape: (n_states, n_turbines) """ # prepare: n_states = fdata.n_states n_turbines = algo.n_turbines tdata = TData.from_points(points=fdata[FV.TXYH]) # calculate streamline x coordinates for turbines rotor centre points: # n_states, n_turbines_source, n_turbines_target coosx = np.zeros((n_states, n_turbines, n_turbines), dtype=FC.DTYPE) for ti in range(n_turbines): coosx[:, ti, :] = self.get_wake_coos(algo, mdata, fdata, tdata, ti)[ :, :, 0, 0 ] # derive turbine order: # TODO: Remove loop over states order = np.zeros((n_states, n_turbines), dtype=FC.ITYPE) for si in range(n_states): order[si] = np.lexsort(keys=coosx[si]) return order
[docs] def get_wake_coos( self, algo, mdata, fdata, tdata, downwind_index, ): """ Calculate wake coordinates of rotor points. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data downwind_index: int The index of the wake causing turbine in the downwnd order Returns ------- wake_coos: numpy.ndarray The wake frame coordinates of the evaluation points, shape: (n_states, n_targets, n_tpoints, 3) """ return self._calc_coos(algo, mdata, fdata, tdata[FC.TARGETS], downwind_index)
[docs] def get_centreline_points(self, algo, mdata, fdata, downwind_index, x): """ Gets the points along the centreline for given values of x. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data downwind_index: int The index in the downwind order x: numpy.ndarray The wake frame x coordinates, shape: (n_states, n_points) Returns ------- points: numpy.ndarray The centreline points, shape: (n_states, n_points, 3) """ # calculate long enough streamlines: xmax = np.max(x) self._ensure_min_length(algo, mdata, fdata, xmax) # get streamline points: n_states, n_points = x.shape data = self.get_streamline_data(algo, mdata, fdata)[:, downwind_index] spts = data[:, :, :3] n_spts = spts.shape[1] xs = self.step * np.arange(n_spts) # interpolate to x of interest: qts = np.zeros((n_states, n_points, 2), dtype=FC.DTYPE) qts[:, :, 0] = np.arange(n_states)[:, None] qts[:, :, 1] = np.minimum(x, xs[-1]) qts = qts.reshape(n_states * n_points, 2) ipars = dict(bounds_error=False, fill_value=0.0) ipars.update(self.cl_ipars) results = interpn((np.arange(n_states), xs), spts, qts, **ipars) return results.reshape(n_states, n_points, 3)