Source code for foxes.core.rotor_model

import numpy as np
from abc import abstractmethod

import foxes.variables as FV
import foxes.constants as FC
from .farm_data_model import FarmDataModel
from foxes.utils import wd2uv, uv2wd, all_subclasses

from .data import TData


[docs] class RotorModel(FarmDataModel): """ Abstract base class of rotor models. Rotor models calculate ambient farm data from states, and provide rotor points and weights for the calculation of rotor effective quantities. Attributes ---------- calc_vars: list of str The variables that are calculated by the model (Their ambients are added automatically) :group: core """
[docs] def __init__(self, calc_vars): """ Constructor. Parameters ---------- calc_vars: list of str The variables that are calculated by the model (Their ambients are added automatically) """ super().__init__() self.calc_vars = calc_vars self.RPOINTS = self.var("rpoints") self.RWEIGHTS = self.var("rweights") self.AMBRES = self.var("amb_res")
[docs] def output_farm_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ return list( set( self.calc_vars + [FV.var2amb[v] for v in self.calc_vars if v in FV.var2amb] ) )
[docs] @abstractmethod def n_rotor_points(self): """ The number of rotor points Returns ------- n_rpoints: int The number of rotor points """ pass
[docs] @abstractmethod def rotor_point_weights(self): """ The weights of the rotor points Returns ------- weights: numpy.ndarray The weights of the rotor points, add to one, shape: (n_rpoints,) """ pass
[docs] @abstractmethod def design_points(self): """ The rotor model design points. Design points are formulated in rotor plane (x,y,z)-coordinates in rotor frame, such that - (0,0,0) is the centre point, - (1,0,0) is the point radius * n_rotor_axis - (0,1,0) is the point radius * n_rotor_side - (0,0,1) is the point radius * n_rotor_up Returns ------- dpoints: numpy.ndarray The design points, shape: (n_points, 3) """ pass
[docs] def get_rotor_points(self, algo, mdata, fdata): """ Calculates rotor points from design points. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data Returns ------- points: numpy.ndarray The rotor points, shape: (n_states, n_turbines, n_rpoints, 3) """ n_states = mdata.n_states n_points = self.n_rotor_points() n_turbines = algo.n_turbines dpoints = self.design_points() D = fdata[FV.D] rax = np.zeros((n_states, n_turbines, 3, 3), dtype=FC.DTYPE) n = rax[:, :, 0, 0:2] m = rax[:, :, 1, 0:2] n[:] = wd2uv(fdata[FV.YAW], axis=-1) m[:] = np.stack([-n[:, :, 1], n[:, :, 0]], axis=-1) rax[:, :, 2, 2] = 1 points = np.zeros((n_states, n_turbines, n_points, 3), dtype=FC.DTYPE) points[:] = fdata[FV.TXYH][:, :, None, :] points[:] += ( 0.5 * D[:, :, None, None] * np.einsum("stad,pa->stpd", rax, dpoints) ) return points
def _set_res(self, fdata, v, res, downwind_index): """ Helper function for results setting """ if downwind_index is None: fdata[v] = res.copy() elif res.shape[1] == 1: fdata[v][:, downwind_index] = res[:, 0] else: raise ValueError( f"Rotor model '{self.name}': downwind_index is not None, but results shape for '{v}' has more than one turbine, {res.shape}" )
[docs] def eval_rpoint_results( self, algo, mdata, fdata, tdata, weights, downwind_index=None, copy_to_ambient=False, ): """ Evaluate rotor point results. This function modifies `fdata`, either for all turbines or one turbine per state, depending on parameter `states_turbine`. In the latter case, the turbine dimension of the `rpoint_results` is expected to have size one. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data weights: numpy.ndarray The rotor point weights, shape: (n_rpoints,) downwind_index: int, optional The index in the downwind order copy_to_ambient: bool If `True`, the fdata results are copied to ambient variables after calculation """ uvp = None uv = None if ( FV.WS in self.calc_vars or FV.WD in self.calc_vars or FV.YAW in self.calc_vars or FV.REWS in self.calc_vars or FV.REWS2 in self.calc_vars or FV.REWS3 in self.calc_vars ): wd = tdata[FV.WD] ws = tdata[FV.WS] uvp = wd2uv(wd, ws, axis=-1) uv = np.einsum("stpd,p->std", uvp, weights) wd = None vdone = [] for v in self.calc_vars: if v == FV.WD or v == FV.YAW: if wd is None: wd = uv2wd(uv, axis=-1) self._set_res(fdata, v, wd, downwind_index) vdone.append(v) elif v == FV.WS: ws = np.linalg.norm(uv, axis=-1) self._set_res(fdata, v, ws, downwind_index) del ws vdone.append(v) del uv, wd if ( FV.REWS in self.calc_vars or FV.REWS2 in self.calc_vars or FV.REWS3 in self.calc_vars ): if downwind_index is None: yaw = fdata[FV.YAW].copy() else: yaw = fdata[FV.YAW][:, downwind_index, None] nax = wd2uv(yaw, axis=-1) wsp = np.einsum("stpd,std->stp", uvp, nax) for v in self.calc_vars: if v == FV.REWS: rews = np.maximum(np.einsum("stp,p->st", wsp, weights), 0.0) self._set_res(fdata, v, rews, downwind_index) del rews vdone.append(v) elif v == FV.REWS2: # For highly inhomogeneous wind fields # and multiple rotor points some of the uv # vectors may have negative projections onto the # turbine axis direction: if uvp.shape[2] > 1: rews2 = np.sqrt( np.maximum( np.einsum("stp,p->st", np.sign(wsp) * wsp**2, weights), 0.0, ) ) else: rews2 = np.sqrt(np.einsum("stp,p->st", wsp**2, weights)) self._set_res(fdata, v, rews2, downwind_index) del rews2 vdone.append(v) elif v == FV.REWS3: # For highly inhomogeneous wind fields # and multiple rotor points some of the uv # vectors may have negative projections onto the # turbine axis direction: if uvp.shape[2] > 1: rews3 = np.maximum( np.einsum("stp,p->st", wsp**3, weights), 0.0 ) ** (1.0 / 3.0) else: rews3 = (np.einsum("stp,p->st", wsp**3, weights)) ** (1.0 / 3.0) self._set_res(fdata, v, rews3, downwind_index) del rews3 vdone.append(v) del wsp del uvp for v in self.calc_vars: if v not in vdone: res = np.einsum("stp,p->st", tdata[v], weights) self._set_res(fdata, v, res, downwind_index) if copy_to_ambient and v in FV.var2amb: fdata[FV.var2amb[v]] = fdata[v].copy()
[docs] def calculate( self, algo, mdata, fdata, rpoints=None, weights=None, store_rpoints=False, store_rweights=False, store_amb_res=False, downwind_index=None, ): """ Calculate ambient rotor effective results. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data rpoints: numpy.ndarray, optional The rotor points, or None for automatic for this rotor. Shape: (n_states, n_turbines, n_rpoints, 3) weights: numpy.ndarray, optional The rotor point weights, or None for automatic for this rotor. Shape: (n_rpoints,) store_rpoints: bool, optional Switch for storing rotor points to mdata store_rweights: bool, optional Switch for storing rotor point weights to mdata store_amb_res: bool, optional Switch for storing ambient rotor point reults as they come from the states to mdata downwind_index: int, optional Only compute for index in the downwind order Returns ------- results: dict results dict. Keys: Variable name str. Values: numpy.ndarray with results, shape: (n_states, n_turbines) """ if rpoints is None: rpoints = mdata.get(self.RPOINTS, self.get_rotor_points(algo, mdata, fdata)) if store_rpoints: mdata[self.RPOINTS] = rpoints mdata.dims[self.RPOINTS] = (FC.STATE, FC.TURBINE, FC.TPOINT, FC.XYH) self.data_to_store(self.RPOINTS, algo, mdata) if downwind_index is not None: rpoints = rpoints[:, downwind_index, None] if weights is None: weights = mdata.get(FC.TWEIGHTS, self.rotor_point_weights()) if store_rweights: mdata[self.RWEIGHTS] = weights mdata.dims[self.RWEIGHTS] = (FC.TPOINT,) self.data_to_store(self.RWEIGHTS, algo, mdata) tdata = TData.from_tpoints(rpoints, weights) svars = algo.states.output_point_vars(algo) for v in svars: tdata.add( v, data=np.full_like(rpoints[..., 0], np.nan), dims=(FC.STATE, FC.TARGET, FC.TPOINT), ) sres = algo.states.calculate(algo, mdata, fdata, tdata) tdata.update(sres) if store_amb_res: mdata[self.AMBRES] = sres.copy() self.data_to_store(self.AMBRES, algo, mdata) self.eval_rpoint_results( algo, mdata, fdata, tdata, weights, downwind_index, copy_to_ambient=True, ) return {v: fdata[v] for v in self.output_farm_vars(algo)}
[docs] @classmethod def new(cls, rmodel_type, *args, **kwargs): """ Run-time rotor model factory. Parameters ---------- rmodel_type: str The selected derived class name args: tuple, optional Additional parameters for constructor kwargs: dict, optional Additional parameters for constructor """ if rmodel_type is None: return None allc = all_subclasses(cls) found = rmodel_type in [scls.__name__ for scls in allc] if found: for scls in allc: if scls.__name__ == rmodel_type: return scls(*args, **kwargs) else: estr = "Rotor model type '{}' is not defined, available types are \n {}".format( rmodel_type, sorted([i.__name__ for i in allc]) ) raise KeyError(estr)