Source code for foxes.input.states.field_data_nc

import numpy as np
import pandas as pd
import xarray as xr
from scipy.interpolate import interpn

from foxes.core import States
from foxes.utils import wd2uv, uv2wd
from foxes.data import STATES, StaticData
import foxes.variables as FV
import foxes.constants as FC


[docs] class FieldDataNC(States): """ Heterogeneous ambient states on a regular horizontal grid in NetCDF format. Attributes ---------- data_source: str or xarray.Dataset The data or the file search pattern, should end with suffix '.nc'. One or many files. ovars: list of str The output variables var2ncvar: dict Mapping from variable names to variable names in the nc file fixed_vars: dict Uniform values for output variables, instead of reading from data states_coord: str The states coordinate name in the data x_coord: str The x coordinate name in the data y_coord: str The y coordinate name in the data h_coord: str The height coordinate name in the data pre_load: bool Flag for loading all data into memory during initialization weight_ncvar: str Name of the weight data variable in the nc file(s) bounds_error: bool Flag for raising errors if bounds are exceeded fill_value: number Fill value in case of exceeding bounds, if no bounds error time_format: str The datetime parsing format string interp_nans: bool Linearly interpolate nan values interpn_pars: dict, optional Additional parameters for scipy.interpolate.interpn :group: input.states """
[docs] def __init__( self, data_source, output_vars, var2ncvar={}, fixed_vars={}, states_coord="Time", x_coord="UTMX", y_coord="UTMY", h_coord="height", pre_load=True, weight_ncvar=None, time_format="%Y-%m-%d_%H:%M:%S", sel=None, isel=None, interp_nans=False, verbosity=1, **interpn_pars, ): """ Constructor. Parameters ---------- data_source: str or xarray.Dataset The data or the file search pattern, should end with suffix '.nc'. One or many files. output_vars: list of str The output variables var2ncvar: dict, optional Mapping from variable names to variable names in the nc file fixed_vars: dict, optional Uniform values for output variables, instead of reading from data states_coord: str The states coordinate name in the data x_coord: str The x coordinate name in the data y_coord: str The y coordinate name in the data h_coord: str The height coordinate name in the data pre_load: bool Flag for loading all data into memory during initialization weight_ncvar: str, optional Name of the weight data variable in the nc file(s) time_format: str The datetime parsing format string sel: dict, optional Subset selection via xr.Dataset.sel() isel: dict, optional Subset selection via xr.Dataset.isel() interp_nans: bool Linearly interpolate nan values verbosity: int Verbosity level for pre_load file reading interpn_pars: dict, optional Additional parameters for scipy.interpolate.interpn """ super().__init__() self.data_source = data_source self.states_coord = states_coord self.ovars = output_vars self.fixed_vars = fixed_vars self.x_coord = x_coord self.y_coord = y_coord self.h_coord = h_coord self.weight_ncvar = weight_ncvar self.pre_load = pre_load self.time_format = time_format self.sel = sel self.isel = isel self.interpn_pars = interpn_pars self.interp_nans = interp_nans self.var2ncvar = { v: var2ncvar.get(v, v) for v in output_vars if v not in fixed_vars } self._inds = None self._N = None self._weights = None # pre-load file reading, usually prior to DaskRunner: if not isinstance(self.data_source, xr.Dataset): if "*" in str(self.data_source): pass else: self.data_source = StaticData().get_file_path( STATES, self.data_source, check_raw=True ) if verbosity: if pre_load: print( f"States '{self.name}': Reading data from '{self.data_source}'" ) else: print( f"States '{self.name}': Reading index from '{self.data_source}'" ) with xr.open_mfdataset( str(self.data_source), parallel=False, concat_dim=self.states_coord, combine="nested", data_vars="minimal", coords="minimal", compat="override", ) as ds: self.data_source = ds if sel is not None: self.data_source = self.data_source.sel(self.sel) if isel is not None: self.data_source = self.data_source.isel(self.isel) if pre_load: self.data_source.load() self._get_inds(self.data_source)
def _get_inds(self, ds): """ Helper function for index and weights reading """ for c in [self.states_coord, self.x_coord, self.y_coord, self.h_coord]: if not c in ds: raise KeyError( f"States '{self.name}': Missing coordinate '{c}' in data" ) self._inds = ds[self.states_coord].to_numpy() if self.time_format is not None: self._inds = pd.to_datetime(self._inds, format=self.time_format).to_numpy() self._N = len(self._inds) if self.weight_ncvar is not None: self._weights = ds[self.weight_ncvar].to_numpy() for v in self.ovars: if v in self.var2ncvar: ncv = self.var2ncvar[v] if not ncv in ds: raise KeyError( f"States '{self.name}': nc variable '{ncv}' not found in data, found: {sorted(list(ds.keys()))}" ) elif v not in self.fixed_vars: raise ValueError( f"States '{self.name}': Variable '{v}' neither found in var2ncvar not in fixed_vars" ) def _get_data(self, ds, verbosity): """ Helper function for data extraction """ x = ds[self.x_coord].to_numpy() y = ds[self.y_coord].to_numpy() h = ds[self.h_coord].to_numpy() n_x = len(x) n_y = len(y) n_h = len(h) n_sts = ds.sizes[self.states_coord] cor_shxy = (self.states_coord, self.h_coord, self.x_coord, self.y_coord) cor_shyx = (self.states_coord, self.h_coord, self.y_coord, self.x_coord) cor_sh = (self.states_coord, self.h_coord) cor_s = (self.states_coord,) vars_shyx = [] vars_sh = [] vars_s = [] for v, ncv in self.var2ncvar.items(): if ds[ncv].dims == cor_shyx or ds[ncv].dims == cor_shxy: vars_shyx.append(v) elif ds[ncv].dims == cor_sh: vars_sh.append(v) elif ds[ncv].dims == cor_s: vars_s.append(v) else: raise ValueError( f"States '{self.name}': Wrong coordinate order for variable '{ncv}': Found {ds[ncv].dims}, expecting {cor_shxy}, {cor_shyx}, {cor_sh} or {cor_s}" ) data = np.zeros((n_sts, n_h, n_y, n_x, len(self.var2ncvar)), dtype=FC.DTYPE) for v in vars_shyx: ncv = self.var2ncvar[v] if ds[ncv].dims == cor_shyx: data[..., self._dkys[v]] = ds[ncv][:] else: data[..., self._dkys[v]] = np.swapaxes(ds[ncv].to_numpy(), 2, 3) for v in vars_sh: ncv = self.var2ncvar[v] data[..., self._dkys[v]] = ds[ncv].to_numpy()[:, :, None, None] for v in vars_s: ncv = self.var2ncvar[v] data[..., self._dkys[v]] = ds[ncv].to_numpy()[:, None, None, None] if FV.WD in self.fixed_vars: data[..., self._dkys[FV.WD]] = np.full( (n_sts, n_h, n_y, n_x), self.fixed_vars[FV.WD], dtype=FC.DTYPE ) if verbosity > 1: print(f"\n{self.name}: Data ranges") for v, i in self._dkys.items(): d = data[..., i] nn = np.sum(np.isnan(d)) print( f" {v}: {np.nanmin(d)} --> {np.nanmax(d)}, nans: {nn} ({100*nn/len(d.flat):.2f}%)" ) return data def output_point_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ return self.ovars
[docs] def load_data(self, algo, verbosity=0): """ Load and/or create all model data that is subject to chunking. Such data should not be stored under self, for memory reasons. The data returned here will automatically be chunked and then provided as part of the mdata object during calculations. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm verbosity: int The verbosity level, 0 = silent Returns ------- idata: dict The dict has exactly two entries: `data_vars`, a dict with entries `name_str -> (dim_tuple, data_ndarray)`; and `coords`, a dict with entries `dim_name_str -> dim_array` """ if (FV.WS in self.ovars and FV.WD not in self.ovars) or ( FV.WS not in self.ovars and FV.WD in self.ovars ): raise KeyError( f"States '{self.name}': Missing '{FV.WS}' or '{FV.WD}' in output variables {self.ovars}" ) # ensure WD and WS get the first two slots of data: self._dkys = {} if FV.WS in self.ovars: self._dkys[FV.WD] = 0 if FV.WS in self.var2ncvar: self._dkys[FV.WS] = 1 for v in self.var2ncvar: if v not in self._dkys: self._dkys[v] = len(self._dkys) self._n_dvars = len(self._dkys) if self._weights is None: self._weights = np.full( (self._N, algo.n_turbines), 1.0 / self._N, dtype=FC.DTYPE ) idata = super().load_data(algo, verbosity) if self.pre_load: self.X = self.var(FV.X) self.Y = self.var(FV.Y) self.H = self.var(FV.H) self.VARS = self.var("vars") self.DATA = self.var("data") ds = self.data_source h = ds[self.h_coord].to_numpy() y = ds[self.y_coord].to_numpy() x = ds[self.x_coord].to_numpy() v = list(self._dkys.keys()) coos = (FC.STATE, self.H, self.Y, self.X, self.VARS) data = self._get_data(ds, verbosity) data = (coos, data) idata["coords"][self.H] = h idata["coords"][self.Y] = y idata["coords"][self.X] = x idata["coords"][self.VARS] = v idata["data_vars"][self.DATA] = data return idata
[docs] def size(self): """ The total number of states. Returns ------- int: The total number of states """ return self._N
[docs] def index(self): """ The index list Returns ------- indices: array_like The index labels of states, or None for default integers """ return self._inds
[docs] def output_point_vars(self, algo): """ The variables which are being modified by the model. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- output_vars: list of str The output variable names """ return self.ovars
[docs] def weights(self, algo): """ The statistical weights of all states. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm Returns ------- weights: numpy.ndarray The weights, shape: (n_states, n_turbines) """ return self._weights
[docs] def calculate(self, algo, mdata, fdata, tdata): """ " The main model calculation. This function is executed on a single chunk of data, all computations should be based on numpy arrays. Parameters ---------- algo: foxes.core.Algorithm The calculation algorithm mdata: foxes.core.MData The model data fdata: foxes.core.FData The farm data tdata: foxes.core.TData The target point data Returns ------- results: dict The resulting data, keys: output variable str. Values: numpy.ndarray with shape (n_states, n_targets, n_tpoints) """ # prepare: n_states = tdata.n_states n_targets = tdata.n_targets n_tpoints = tdata.n_tpoints points = tdata[FC.TARGETS].reshape(n_states, n_targets * n_tpoints, 3) n_pts = points.shape[1] n_states = fdata.n_states # pick pre-loaded data: if self.pre_load: x = mdata[self.X] y = mdata[self.Y] h = mdata[self.H] data = mdata[self.DATA].copy() # read data for this chunk: else: i0 = np.where(self._inds == mdata[FC.STATE][0])[0][0] s = slice(i0, i0 + n_states) ds = self.data_source.isel({self.states_coord: s}).load() x = ds[self.x_coord].to_numpy() y = ds[self.y_coord].to_numpy() h = ds[self.h_coord].to_numpy() data = self._get_data(ds, verbosity=0) del ds n_h = len(h) n_y = len(y) n_x = len(x) # translate WS, WD into U, V: if FV.WD in self.ovars and FV.WS in self.ovars: wd = data[..., self._dkys[FV.WD]] ws = ( data[..., self._dkys[FV.WS]] if FV.WS in self._dkys else self.fixed_vars[FV.WS] ) wdwsi = [self._dkys[FV.WD], self._dkys[FV.WS]] data[..., wdwsi] = wd2uv(wd, ws, axis=-1) del ws, wd # prepare points: sts = np.arange(n_states) pts = np.append(points, np.zeros((n_states, n_pts, 1), dtype=FC.DTYPE), axis=2) pts[:, :, 3] = sts[:, None] pts = pts.reshape(n_states * n_pts, 4) pts = np.flip(pts, axis=1) gvars = (sts, h, y, x) # interpolate nan values: if self.interp_nans and np.any(np.isnan(data)): df = pd.DataFrame( index=pd.MultiIndex.from_product( gvars, names=["state", "height", "y", "x"] ), data={ v: data[..., vi].reshape(n_states * n_h * n_y * n_x) for v, vi in self._dkys.items() }, ) df.interpolate( axis=0, method="linear", limit_direction="forward", inplace=True ) df.interpolate( axis=0, method="linear", limit_direction="backward", inplace=True ) data = df.to_numpy().reshape(n_states, n_h, n_y, n_x, self._n_dvars) del df # interpolate: try: data = interpn(gvars, data, pts, **self.interpn_pars).reshape( n_states, n_pts, self._n_dvars ) except ValueError as e: print(f"\n\nStates '{self.name}': Interpolation error") print("INPUT VARS: (state, heights, y, x)") print( "DATA BOUNDS:", [np.min(d) for d in gvars], [np.max(d) for d in gvars] ) print( "EVAL BOUNDS:", [np.min(p) for p in pts.T], [np.max(p) for p in pts.T] ) raise e del pts, x, y, h, gvars # interpolate nan values: if self.interp_nans and np.any(np.isnan(data)): df = pd.DataFrame( index=pd.MultiIndex.from_product( (sts, range(n_pts)), names=["state", "point"] ), data={ v: data[:, :, vi].reshape(n_states * n_pts) for v, vi in self._dkys.items() }, ) df["x"] = points[:, :, 0].reshape(n_states * n_pts) df["y"] = points[:, :, 1].reshape(n_states * n_pts) df = df.reset_index().set_index(["state", "x", "y"]) df.interpolate( axis=0, method="linear", limit_direction="forward", inplace=True ) df.interpolate( axis=0, method="linear", limit_direction="backward", inplace=True ) df = df.reset_index().drop(["x", "y"], axis=1).set_index(["state", "point"]) data = df.to_numpy().reshape(n_states, n_pts, self._n_dvars) del df # set output: out = {} if FV.WD in self.ovars and FV.WS in self.ovars: uv = data[..., wdwsi] out[FV.WS] = np.linalg.norm(uv, axis=-1) out[FV.WD] = uv2wd(uv, axis=-1) del uv for v in self.ovars: if v not in out: if v in self._dkys: out[v] = data[..., self._dkys[v]] else: out[v] = np.full( (n_states, n_pts), self.fixed_vars[v], dtype=FC.DTYPE ) return {v: d.reshape(n_states, n_targets, n_tpoints) for v, d in out.items()}