Source code for foxes.input.states.wrg_states

import numpy as np
from scipy.interpolate import interpn

from foxes.core.states import States
from foxes.config import config, get_input_path
from foxes.utils.wrg_utils import ReaderWRG
from foxes.data import STATES
import foxes.variables as FV
import foxes.constants as FC


[docs] class WRGStates(States): """ Ambient states based on WRG data Attributes ---------- wrg_fname: str Name of the WRG file ws_bins: numpy.ndarray The wind speed bins, including lower and upper bounds, shape: (n_ws_bins+1,) fixed_vars: dict Fixed uniform variable values, instead of reading from data bounds_extra_space: float or str The extra space, either float in m, or str for units of D, e.g. '2.5D' interpn_pars: dict Additional parameters for scipy.interpolate.interpn :group: input.states """
[docs] def __init__( self, wrg_fname, ws_bins, fixed_vars={}, bounds_extra_space="1D", **interpn_pars, ): """ Constructor Parameters ---------- wrg_fname: str Name of the WRG file ws_bins: list of float The wind speed bins, including lower and upper bounds fixed_vars: dict Fixed uniform variable values, instead of reading from data bounds_extra_space: float or str, optional The extra space, either float in m, or str for units of D, e.g. '2.5D' interpn_pars: dict, optional Additional parameters for scipy.interpolate.interpn """ super().__init__() self.wrg_fname = wrg_fname self.ws_bins = np.asarray(ws_bins) self.fixed_vars = fixed_vars self.bounds_extra_space = bounds_extra_space self.interpn_pars = interpn_pars
[docs] def load_data(self, algo, verbosity=0): """ Load and/or create all model data that is subject to chunking. Such data should not be stored under self, for memory reasons. The data returned here will automatically be chunked and then provided as part of the mdata object during calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent Returns ------- idata: dict The dict has exactly two entries: `data_vars`, a dict with entries `name_str -> (dim_tuple, data_ndarray)`; and `coords`, a dict with entries `dim_name_str -> dim_array` """ # read wrg file: fpath = get_input_path(self.wrg_fname) if not fpath.is_file(): if verbosity > 0: print( f"States '{self.name}': Reading static data '{self.wrg_fname}' from context '{STATES}'" ) fpath = algo.dbook.get_file_path(STATES, self.wrg_fname, check_raw=False) if verbosity > 0: print(f"Path: {fpath}") elif verbosity: print(f"States '{self.name}': Reading file {fpath}") wrg = ReaderWRG(fpath) p0 = np.array([wrg.x0, wrg.y0], dtype=config.dtype_double) nx = wrg.nx ny = wrg.ny ns = wrg.n_sectors res = wrg.resolution p1 = p0 + np.array([nx * res, ny * res]) if verbosity > 0: print(f"States '{self.name}': Data bounds {p0} - {p1}") # find bounds: if self.bounds_extra_space is not None: xy_min, xy_max = algo.farm.get_xy_bounds( extra_space=self.bounds_extra_space, algo=algo ) if verbosity > 0: print(f"States '{self.name}': Farm bounds {xy_min} - {xy_max}") ij_min = np.asarray((xy_min - p0) / res, dtype=config.dtype_int) ij_max = np.asarray((xy_max - p0) / res, dtype=config.dtype_int) + 1 sx = slice(ij_min[0], ij_max[0]) sy = slice(ij_min[1], ij_max[1]) else: sx = np.s_[:] sy = np.s_[:] self._x = p0[0] + np.arange(nx) * res self._x = self._x[sx] self._y = p0[1] + np.arange(ny) * res self._y = self._y[sy] if len(self._x) < 2 or len(self._y) < 2: raise ValueError(f"No overlap between data bounds and farm bounds") p0[0] = np.min(self._x) p0[1] = np.min(self._y) p1[0] = np.max(self._x) p1[1] = np.max(self._y) if verbosity > 0: print(f"States '{self.name}': New bounds {p0} - {p1}") # store data: A = [] k = [] f = [] for s in range(ns): A.append(wrg.data[f"As_{s}"].to_numpy().reshape(ny, nx)[sy, sx]) k.append(wrg.data[f"Ks_{s}"].to_numpy().reshape(ny, nx)[sy, sx]) f.append(wrg.data[f"fs_{s}"].to_numpy().reshape(ny, nx)[sy, sx]) del wrg A = np.stack(A, axis=0).T k = np.stack(k, axis=0).T f = np.stack(f, axis=0).T self._data = np.stack([A, k, f], axis=-1) # (x, y, wd, AKfs) # store ws and wd: self.VARS = self.var("VARS") self.DATA = self.var("DATA") self._wds = np.arange(0.0, 360.0, 360 / ns) self._wsd = self.ws_bins[1:] - self.ws_bins[:-1] self._wss = 0.5 * (self.ws_bins[:-1] + self.ws_bins[1:]) self._N = len(self._wss) * ns data = np.zeros((len(self._wss), ns, 3), dtype=config.dtype_double) data[..., 0] = self._wss[:, None] data[..., 1] = self._wds[None, :] data[..., 2] = self._wsd[:, None] data = data.reshape(self._N, 3) idata = super().load_data(algo, verbosity) idata["coords"][self.VARS] = ["ws", "wd", "dws"] idata["data_vars"][self.DATA] = ((FC.STATE, self.VARS), data) return idata
[docs] def size(self): """ The total number of states. Returns ------- int: The total number of states """ return self._N
[docs] def output_point_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ ovars = set([FV.WS, FV.WD]) ovars.update(self.fixed_vars.keys()) return list(ovars)
[docs] def calculate(self, algo, mdata, fdata, tdata): """ The main model calculation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data Returns ------- results: dict The resulting data, keys: output variable str. Values: numpy.ndarray with shape (n_states, n_targets, n_tpoints) """ # prepare: n_states = tdata.n_states n_targets = tdata.n_targets n_tpoints = tdata.n_tpoints n_pts = n_states * n_targets * n_tpoints points = tdata[FC.TARGETS] ws = mdata[self.DATA][:, 0] wd = mdata[self.DATA][:, 1] wsd = mdata[self.DATA][:, 2] out = {} out[FV.WS] = tdata[FV.WS] out[FV.WS][:] = ws[:, None, None] out[FV.WD] = tdata[FV.WD] out[FV.WD][:] = wd[:, None, None] for v, d in self.fixed_vars.items(): out[v] = tdata[v] out[v][:] = d # interpolate A, k, f from x, y, wd z = points[..., 2].copy() points[..., 2] = wd[:, None, None] pts = points.reshape(n_pts, 3) gvars = (self._x, self._y, self._wds) try: ipars = dict(bounds_error=True, fill_value=None) ipars.update(self.interpn_pars) data = interpn(gvars, self._data, pts, **ipars).reshape( n_states, n_targets, n_tpoints, 3 ) except ValueError as e: print(f"\nStates '{self.name}': Interpolation error") print("INPUT VARS: (x, y, wd)") print( "DATA BOUNDS:", [float(np.min(d)) for d in gvars], [float(np.max(d)) for d in gvars], ) print( "EVAL BOUNDS:", [float(np.min(p)) for p in pts.T], [float(np.max(p)) for p in pts.T], ) print( "\nMaybe you want to try the option 'bounds_error=False'? This will extrapolate the data.\n" ) raise e A = data[..., 0] k = data[..., 1] f = data[..., 2] points[..., 2] = z del data, gvars, pts, z, wd tdata.add( FV.WEIGHT, f, dims=(FC.STATE, FC.TARGET, FC.TPOINT), ) wsA = out[FV.WS] / A tdata[FV.WEIGHT] *= wsd[:, None, None] * ( k / A * wsA ** (k - 1) * np.exp(-(wsA**k)) ) return out