"""Pure numpy-based feature engineering for crop classification. STEP 4A: Feature computation functions that align with training pipeline. This module provides: - Savitzky-Golay smoothing with zero-filling fallback - Phenology metrics computation - Harmonic/Fourier features - Index computations (NDVI, NDRE, EVI, SAVI, CI_RE, NDWI) - Per-pixel feature builder - Gap handling for temporal and spatial missing data NOTE: Seasonal window summaries come in Step 4B. """ from __future__ import annotations import math from typing import Dict, List, Tuple import numpy as np # Try to import scipy for Savitzky-Golay, fall back to pure numpy try: from scipy.signal import savgol_filter as _savgol_filter HAS_SCIPY = True except ImportError: HAS_SCIPY = False # ========================================== # Smoothing Functions # ========================================== def fill_zeros_linear(y: np.ndarray) -> np.ndarray: """Fill zeros using linear interpolation. Treats 0 as missing ONLY when there are non-zero neighbors. Keeps true zeros if the whole series is zero. Args: y: 1D array Returns: Array with zeros filled by linear interpolation """ y = np.array(y, dtype=np.float64).copy() n = len(y) if n == 0: return y # Find zero positions zero_mask = (y == 0) # If all zeros, return as is if np.all(zero_mask): return y # Simple linear interpolation for interior zeros # Find first and last non-zero nonzero_idx = np.where(~zero_mask)[0] if len(nonzero_idx) == 0: return y first_nz = nonzero_idx[0] last_nz = nonzero_idx[-1] # Interpolate interior zeros for i in range(first_nz, last_nz + 1): if zero_mask[i]: # Find surrounding non-zero values left_idx = i - 1 while left_idx >= first_nz and zero_mask[left_idx]: left_idx -= 1 right_idx = i + 1 while right_idx <= last_nz and zero_mask[right_idx]: right_idx += 1 # Interpolate if left_idx >= first_nz and right_idx <= last_nz: left_val = y[left_idx] right_val = y[right_idx] dist = right_idx - left_idx if dist > 0: y[i] = left_val + (right_val - left_val) * (i - left_idx) / dist return y def savgol_smooth_1d(y: np.ndarray, window: int = 5, polyorder: int = 2) -> np.ndarray: """Apply Savitzky-Golay smoothing to 1D array. Uses scipy.signal.savgol_filter if available, otherwise falls back to simple polynomial least squares. Args: y: 1D array window: Window size (must be odd) polyorder: Polynomial order Returns: Smoothed array """ y = np.array(y, dtype=np.float64).copy() # Handle edge cases n = len(y) if n < window: return y # Can't apply SavGol to short series if HAS_SCIPY: return _savgol_filter(y, window, polyorder, mode='nearest') # Fallback: Simple moving average (simplified) # A proper implementation would do polynomial fitting pad = window // 2 result = np.zeros_like(y) for i in range(n): start = max(0, i - pad) end = min(n, i + pad + 1) result[i] = np.mean(y[start:end]) return result def smooth_series(y: np.ndarray) -> np.ndarray: """Apply full smoothing pipeline: fill zeros + Savitzky-Golay. Args: y: 1D array (time series) Returns: Smoothed array """ # Fill zeros first y_filled = fill_zeros_linear(y) # Then apply Savitzky-Golay return savgol_smooth_1d(y_filled, window=5, polyorder=2) # ========================================== # Gap Handling for Missing Data # ========================================== def handle_temporal_gaps(y: np.ndarray, gap_threshold: int = 3) -> np.ndarray: """Handle temporal gaps in a 1D time series. This function marks significant gaps (>= gap_threshold consecutive NaNs) for special handling while interpolating smaller gaps. Args: y: 1D time series (may contain NaN values) gap_threshold: Minimum consecutive NaNs to be considered a "significant gap" Pixels with gaps >= threshold will be marked as NoData Returns: Array with small gaps filled by interpolation, large gaps preserved as NaN The calling code should use the gap mask to mark pixels as NoData """ y = np.array(y, dtype=np.float64).copy() n = len(y) if n == 0: return y # Convert to NaN where appropriate (0s might be missing) # Only treat as NaN if there are non-zero neighbors zero_mask = (y == 0) if not np.all(zero_mask): # Find first and last non-zero nonzero_idx = np.where(~zero_mask)[0] if len(nonzero_idx) > 0: first_nz = nonzero_idx[0] last_nz = nonzero_idx[-1] # Mark interior zeros as NaN for interpolation for i in range(first_nz, last_nz + 1): if zero_mask[i]: y[i] = np.nan # Find consecutive NaN runs nan_mask = np.isnan(y) # Run-length encoding for NaN runs in_gap = False gap_start = 0 gap_lengths = [] for i in range(n + 1): is_nan = i < n and nan_mask[i] if is_nan and not in_gap: # Start of a gap in_gap = True gap_start = i elif not is_nan and in_gap: # End of a gap in_gap = False gap_lengths.append(i - gap_start) # Identify large gaps (>= threshold) that should NOT be filled large_gap_mask = np.zeros(n, dtype=bool) in_gap = False gap_start = 0 for i in range(n + 1): is_nan = i < n and nan_mask[i] if is_nan and not in_gap: in_gap = True gap_start = i elif not is_nan and in_gap: in_gap = False gap_len = i - gap_start if gap_len >= gap_threshold: # Mark this as a large gap - don't fill large_gap_mask[gap_start:i] = True # Interpolate only small gaps (and boundaries) # Use linear interpolation valid_mask = ~nan_mask if not np.any(valid_mask): return y # All NaN # Linear interpolation for all NaNs first x = np.arange(n) valid_x = x[valid_mask] valid_y = y[valid_mask] if len(valid_x) > 0: y_interp = np.interp(x, valid_x, valid_y) else: y_interp = np.full(n, np.nan) # Restore large gaps as NaN y_interp[large_gap_mask] = np.nan return y_interp def spatial_fill_nan(data_2d: np.ndarray, max_iterations: int = 3) -> np.ndarray: """Fill NaN values in a 2D spatial raster using spatial interpolation. This function iteratively fills NaN values using neighboring non-NaN values. Works from edges inward, progressively filling larger areas. Args: data_2d: 2D numpy array (H, W) with possible NaN values max_iterations: Maximum number of passes (more iterations fill more NaNs) Returns: Array with NaN values filled using spatial median """ data = data_2d.copy() H, W = data.shape # Create mask of valid pixels valid_mask = ~np.isnan(data) if np.all(valid_mask): return data # No NaNs for iteration in range(max_iterations): changed = False for i in range(H): for j in range(W): if np.isnan(data[i, j]): # Get 4-connected neighbors (up, down, left, right) neighbors = [] if i > 0 and not np.isnan(data[i-1, j]): neighbors.append(data[i-1, j]) if i < H-1 and not np.isnan(data[i+1, j]): neighbors.append(data[i+1, j]) if j > 0 and not np.isnan(data[i, j-1]): neighbors.append(data[i, j-1]) if j < W-1 and not np.isnan(data[i, j+1]): neighbors.append(data[i, j+1]) if neighbors: # Fill with median of neighbors data[i, j] = np.median(neighbors) changed = True if not changed: break # No more NaNs filled in this iteration # If still NaNs remain, fill with global median if np.any(np.isnan(data)): global_median = np.nanmedian(data) data = np.where(np.isnan(data), global_median, data) return data def compute_gap_mask(y: np.ndarray, gap_threshold: int = 3) -> np.ndarray: """Compute a boolean mask indicating pixels with significant temporal gaps. Args: y: 1D time series (may contain NaN values) gap_threshold: Minimum consecutive NaNs to be considered a "significant gap" Returns: Boolean array where True indicates a significant gap (>= threshold consecutive NaNs) """ y = np.array(y, dtype=np.float64) n = len(y) nan_mask = np.isnan(y) gap_mask = np.zeros(n, dtype=bool) in_gap = False gap_start = 0 for i in range(n + 1): is_nan = i < n and nan_mask[i] if is_nan and not in_gap: in_gap = True gap_start = i elif not is_nan and in_gap: in_gap = False gap_len = i - gap_start if gap_len >= gap_threshold: gap_mask[gap_start:i] = True return gap_mask # ========================================== # Index Computations # ========================================== def ndvi(nir: np.ndarray, red: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Vegetation Index. NDVI = (NIR - Red) / (NIR + Red) """ denom = nir + red return np.where(np.abs(denom) > eps, (nir - red) / denom, 0.0) def ndre(nir: np.ndarray, rededge: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Red-Edge Index. NDRE = (NIR - RedEdge) / (NIR + RedEdge) """ denom = nir + rededge return np.where(np.abs(denom) > eps, (nir - rededge) / denom, 0.0) def evi(nir: np.ndarray, red: np.ndarray, blue: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Enhanced Vegetation Index. EVI = 2.5 * (NIR - Red) / (NIR + 6*Red - 7.5*Blue + 1) """ denom = nir + 6 * red - 7.5 * blue + 1 return np.where(np.abs(denom) > eps, 2.5 * (nir - red) / denom, 0.0) def savi(nir: np.ndarray, red: np.ndarray, L: float = 0.5, eps: float = 1e-8) -> np.ndarray: """Soil Adjusted Vegetation Index. SAVI = ((NIR - Red) / (NIR + Red + L)) * (1 + L) """ denom = nir + red + L return np.where(np.abs(denom) > eps, ((nir - red) / denom) * (1 + L), 0.0) def ci_re(nir: np.ndarray, rededge: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Chlorophyll Index - Red-Edge. CI_RE = (NIR / RedEdge) - 1 """ return np.where(np.abs(rededge) > eps, nir / rededge - 1, 0.0) def ndwi(green: np.ndarray, nir: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Water Index. NDWI = (Green - NIR) / (Green + NIR) """ denom = green + nir return np.where(np.abs(denom) > eps, (green - nir) / denom, 0.0) # ========================================== # Phenology Metrics # ========================================== def phenology_metrics(y: np.ndarray, step_days: int = 10) -> Dict[str, float]: """Compute phenology metrics from time series. Args: y: 1D time series array (already smoothed or raw) step_days: Days between observations (for AUC calculation) Returns: Dict with: max, min, mean, std, amplitude, auc, peak_timestep, max_slope_up, max_slope_down """ # Handle all-NaN or all-zero if y is None or len(y) == 0 or np.all(np.isnan(y)) or np.all(y == 0): return { "max": 0.0, "min": 0.0, "mean": 0.0, "std": 0.0, "amplitude": 0.0, "auc": 0.0, "peak_timestep": 0, "max_slope_up": 0.0, "max_slope_down": 0.0, } y = np.array(y, dtype=np.float64) # Replace NaN with 0 for computation y_clean = np.nan_to_num(y, nan=0.0) result = {} result["max"] = float(np.max(y_clean)) result["min"] = float(np.min(y_clean)) result["mean"] = float(np.mean(y_clean)) result["std"] = float(np.std(y_clean)) result["amplitude"] = result["max"] - result["min"] # AUC - trapezoidal integration n = len(y_clean) if n > 1: auc = 0.0 for i in range(n - 1): auc += (y_clean[i] + y_clean[i + 1]) * step_days / 2 result["auc"] = float(auc) else: result["auc"] = 0.0 # Peak timestep (argmax) result["peak_timestep"] = int(np.argmax(y_clean)) # Slopes if n > 1: slopes = np.diff(y_clean) result["max_slope_up"] = float(np.max(slopes)) result["max_slope_down"] = float(np.min(slopes)) else: result["max_slope_up"] = 0.0 result["max_slope_down"] = 0.0 return result # ========================================== # Harmonic Features # ========================================== def harmonic_features(y: np.ndarray) -> Dict[str, float]: """Compute harmonic/Fourier features from time series. Projects onto sin/cos at 1st and 2nd harmonics, and calculates Amplitude and Phase. Args: y: 1D time series array Returns: Dict with: sin, cos, amplitude, phase for 1st and 2nd harmonics """ y = np.array(y, dtype=np.float64) y_clean = np.nan_to_num(y, nan=0.0) n = len(y_clean) if n == 0: return { "harmonic1_sin": 0.0, "harmonic1_cos": 0.0, "harmonic1_amp": 0.0, "harmonic1_pha": 0.0, "harmonic2_sin": 0.0, "harmonic2_cos": 0.0, "harmonic2_amp": 0.0, "harmonic2_pha": 0.0, } # Normalize time to 0-2pi t = np.array([2 * math.pi * k / n for k in range(n)]) result = {} # First harmonic s1 = float(np.mean(y_clean * np.sin(t))) c1 = float(np.mean(y_clean * np.cos(t))) result["harmonic1_sin"] = s1 result["harmonic1_cos"] = c1 result["harmonic1_amp"] = math.sqrt(s1**2 + c1**2) result["harmonic1_pha"] = math.atan2(s1, c1) # Second harmonic t2 = 2 * t s2 = float(np.mean(y_clean * np.sin(t2))) c2 = float(np.mean(y_clean * np.cos(t2))) result["harmonic2_sin"] = s2 result["harmonic2_cos"] = c2 result["harmonic2_amp"] = math.sqrt(s2**2 + c2**2) result["harmonic2_pha"] = math.atan2(s2, c2) return result # ========================================== # Per-Pixel Feature Builder # ========================================== def build_features_for_pixel( ts: Dict[str, np.ndarray], step_days: int = 10, ) -> Dict[str, float]: """Build all scalar features for a single pixel's time series. Args: ts: Dict of index name -> 1D array time series Keys: "ndvi", "ndre", "evi", "savi", "ci_re", "ndwi" step_days: Days between observations Returns: Dict with ONLY scalar computed features (no arrays): - phenology: ndvi_*, ndre_*, evi_* (max, min, mean, std, amplitude, auc, peak_timestep, max_slope_up, max_slope_down) - harmonics: ndvi_harmonic1_sin, ndvi_harmonic1_cos, ndvi_harmonic2_sin, ndvi_harmonic2_cos - interactions: ndvi_ndre_peak_diff, canopy_density_contrast NOTE: Smoothed time series are NOT included (they are arrays, not scalars). For seasonal window features, use add_seasonal_windows() separately. """ features = {} # Ensure all arrays are float64 ts_clean = {} for key, arr in ts.items(): arr = np.array(arr, dtype=np.float64) ts_clean[key] = arr # Indices to process for phenology phenology_indices = ["ndvi", "ndre", "evi"] # Process each index: smooth + phenology phenology_results = {} for idx in phenology_indices: if idx in ts_clean and ts_clean[idx] is not None: # Smooth (but don't store array in features dict - only use for phenology) smoothed = smooth_series(ts_clean[idx]) # Phenology on smoothed pheno = phenology_metrics(smoothed, step_days) phenology_results[idx] = pheno # Add to features with prefix (SCALARS ONLY) for metric_name, value in pheno.items(): features[f"{idx}_{metric_name}"] = value # Handle savi - just smooth (no phenology in training for savi) # Note: savi_smooth is NOT stored in features (it's an array) # Harmonic features (only for ndvi) if "ndvi" in ts_clean and ts_clean["ndvi"] is not None: # Use smoothed ndvi ndvi_smooth = smooth_series(ts_clean["ndvi"]) harms = harmonic_features(ndvi_smooth) for name, value in harms.items(): features[f"ndvi_{name}"] = value # Interaction features # ndvi_ndre_peak_diff = ndvi_max - ndre_max if "ndvi" in phenology_results and "ndre" in phenology_results: features["ndvi_ndre_peak_diff"] = ( phenology_results["ndvi"]["max"] - phenology_results["ndre"]["max"] ) # canopy_density_contrast = evi_mean / (ndvi_mean + 0.001) if "evi" in phenology_results and "ndvi" in phenology_results: features["canopy_density_contrast"] = ( phenology_results["evi"]["mean"] / (phenology_results["ndvi"]["mean"] + 0.001) ) return features # ========================================== # STEP 4B: Seasonal Window Summaries # ========================================== def _get_window_indices(n_steps: int, dates=None) -> Dict[str, List[int]]: """Get time indices for each seasonal window. Args: n_steps: Number of time steps dates: Optional list of dates (datetime, date, or str) Returns: Dict mapping window name to list of indices """ if dates is not None: # Use dates to determine windows window_idx = {"early": [], "peak": [], "late": []} for i, d in enumerate(dates): # Parse date if isinstance(d, str): # Try to parse as date try: from datetime import datetime d = datetime.fromisoformat(d.replace('Z', '+00:00')) except: continue elif hasattr(d, 'month'): month = d.month else: continue if month in [10, 11, 12]: window_idx["early"].append(i) elif month in [1, 2, 3]: window_idx["peak"].append(i) elif month in [4, 5, 6]: window_idx["late"].append(i) return window_idx else: # Fallback: positional split (27 steps = ~9 months Oct-Jun at 10-day intervals) # Early: Oct-Dec (first ~9 steps) # Peak: Jan-Mar (next ~9 steps) # Late: Apr-Jun (next ~9 steps) early_end = min(9, n_steps // 3) peak_end = min(18, 2 * n_steps // 3) return { "early": list(range(0, early_end)), "peak": list(range(early_end, peak_end)), "late": list(range(peak_end, n_steps)), } def _compute_window_stats(arr: np.ndarray, indices: List[int]) -> Dict[str, float]: """Compute mean and max for a window. Args: arr: 1D array of values indices: List of indices for this window Returns: Dict with mean and max (or 0.0 if no indices) """ if not indices or len(indices) == 0: return {"mean": 0.0, "max": 0.0} # Filter out NaN values = [arr[i] for i in indices if i < len(arr) and not np.isnan(arr[i])] if not values: return {"mean": 0.0, "max": 0.0} return { "mean": float(np.mean(values)), "max": float(np.max(values)), } def add_seasonal_windows( ts: Dict[str, np.ndarray], dates=None, ) -> Dict[str, float]: """Add seasonal window summary features. Season: Oct-Jun split into: - Early: Oct-Dec - Peak: Jan-Mar - Late: Apr-Jun For each window, compute mean and max for NDVI, NDWI, NDRE. This function computes smoothing internally so it accepts raw time series. Args: ts: Dict of index name -> raw 1D array time series dates: Optional dates for window determination Returns: Dict with 18 window features (scalars only): - ndvi_early_mean, ndvi_early_max - ndvi_peak_mean, ndvi_peak_max - ndvi_late_mean, ndvi_late_max - ndwi_early_mean, ndwi_early_max - ... (same for ndre) """ features = {} # Determine window indices first_arr = next(iter(ts.values())) n_steps = len(first_arr) window_idx = _get_window_indices(n_steps, dates) # Process each index - smooth internally for idx in ["ndvi", "ndwi", "ndre"]: if idx not in ts: continue # Smooth the time series internally arr_raw = np.array(ts[idx], dtype=np.float64) arr_smoothed = smooth_series(arr_raw) for window_name in ["early", "peak", "late"]: indices = window_idx.get(window_name, []) stats = _compute_window_stats(arr_smoothed, indices) features[f"{idx}_{window_name}_mean"] = stats["mean"] features[f"{idx}_{window_name}_max"] = stats["max"] return features # ========================================== # STEP 4B: Feature Ordering # ========================================== # Phenology metric order (matching training) PHENO_METRIC_ORDER = [ "max", "min", "mean", "std", "amplitude", "auc", "peak_timestep", "max_slope_up", "max_slope_down" ] # Feature order V1: 51 features total (strictly immutable) FEATURE_ORDER_V1 = [] # A) Phenology for ndvi, ndre, evi (in that order, each with 9 metrics) for idx in ["ndvi", "ndre", "evi"]: for metric in PHENO_METRIC_ORDER: FEATURE_ORDER_V1.append(f"{idx}_{metric}") # B) Harmonics for ndvi FEATURE_ORDER_V1.extend([ "ndvi_harmonic1_sin", "ndvi_harmonic1_cos", "ndvi_harmonic2_sin", "ndvi_harmonic2_cos", ]) # C) Interaction features FEATURE_ORDER_V1.extend([ "ndvi_ndre_peak_diff", "canopy_density_contrast", ]) # D) Window summaries: ndvi, ndwi, ndre (in that order) # Early, Peak, Late (in that order) # Mean, Max (in that order) for idx in ["ndvi", "ndwi", "ndre"]: for window in ["early", "peak", "late"]: FEATURE_ORDER_V1.append(f"{idx}_{window}_mean") FEATURE_ORDER_V1.append(f"{idx}_{window}_max") # Feature order V2: Comprehensive set for LULC models FEATURE_ORDER_V2 = [] # V2 Stats: ndvi, ndre, evi, savi, cl_re V2_INDICES = ["ndvi", "ndre", "evi", "savi", "cl_re"] V2_STATS = ["mean", "median", "min", "max", "std", "p10", "p25", "p75", "p90", "skew", "kurtosis", "cv"] V2_PHENO = ["sos_step", "pos_step", "eos_step", "los", "max_greenup_slope", "max_senescence_slope", "small_integral", "large_integral"] for idx in V2_INDICES: for stat in V2_STATS: FEATURE_ORDER_V2.append(f"{idx}_{stat}") for pheno in V2_PHENO: FEATURE_ORDER_V2.append(f"{idx}_{pheno}") FEATURE_ORDER_V2.append("ndvi_jan_nov_diff") FEATURE_ORDER_V2.append("ndvi_ndre_peak_diff") FEATURE_ORDER_V2.append("canopy_density_contrast") # Also include harmonics in V2 for consistency for idx in ["ndvi", "ndre", "evi"]: for h in ["harmonic1_sin", "harmonic1_cos", "harmonic1_amp", "harmonic1_pha", "harmonic2_sin", "harmonic2_cos", "harmonic2_amp", "harmonic2_pha"]: FEATURE_ORDER_V2.append(f"{idx}_{h}") # Add window features to V2 for idx in ["ndvi", "ndwi", "ndre"]: for window in ["early", "peak", "late"]: FEATURE_ORDER_V2.append(f"{idx}_{window}_mean") FEATURE_ORDER_V2.append(f"{idx}_{window}_max") def build_features_v2_for_pixel( ts: Dict[str, np.ndarray], dates: List[str], ) -> Dict[str, float]: """Build V2 feature set for a single pixel. Matches the CropFeatureEngineer class in the training notebook. """ from scipy.stats import skew, kurtosis from scipy.integrate import simpson features = {} dt_dates = pd.to_datetime(dates, format='%Y%m%d') t_days = (dt_dates - dt_dates[0]).days.values n_steps = len(dates) # Pre-compute smoothed series smoothed = {} for idx in set(V2_INDICES + ["ndwi"]): if idx in ts: smoothed[idx] = smooth_series(ts[idx]) # Stats and Phenology for V2_INDICES for idx in V2_INDICES: if idx not in smoothed: continue arr = smoothed[idx] # Central Tendency & Dispersion features[f"{idx}_mean"] = float(np.mean(arr)) features[f"{idx}_median"] = float(np.median(arr)) features[f"{idx}_min"] = float(np.min(arr)) features[f"{idx}_max"] = float(np.max(arr)) features[f"{idx}_std"] = float(np.std(arr)) # Range & Percentiles features[f"{idx}_p10"] = float(np.percentile(arr, 10)) features[f"{idx}_p25"] = float(np.percentile(arr, 25)) features[f"{idx}_p75"] = float(np.percentile(arr, 75)) features[f"{idx}_p90"] = float(np.percentile(arr, 90)) # Shape Metrics features[f"{idx}_skew"] = float(skew(arr)) features[f"{idx}_kurtosis"] = float(kurtosis(arr)) # CV features[f"{idx}_cv"] = features[f"{idx}_std"] / (features[f"{idx}_mean"] + 0.001) # Integrals (Large) features[f"{idx}_large_integral"] = float(simpson(y=arr, x=t_days)) # Phenology pos_step = int(np.argmax(arr)) features[f"{idx}_pos_step"] = pos_step amp = features[f"{idx}_max"] - features[f"{idx}_min"] thresh = features[f"{idx}_min"] + 0.2 * amp crossings = np.where(arr > thresh)[0] if len(crossings) > 0: sos_step = int(crossings[0]) eos_step = int(crossings[-1]) else: sos_step, eos_step = 0, n_steps - 1 features[f"{idx}_sos_step"] = sos_step features[f"{idx}_eos_step"] = eos_step features[f"{idx}_los"] = eos_step - sos_step # Rates if n_steps > 1: diffs = np.diff(arr) dt = np.diff(t_days) rates = diffs / dt features[f"{idx}_max_greenup_slope"] = float(np.max(rates)) features[f"{idx}_max_senescence_slope"] = float(np.min(rates)) else: features[f"{idx}_max_greenup_slope"] = 0.0 features[f"{idx}_max_senescence_slope"] = 0.0 # Small Integral if eos_step > sos_step: features[f"{idx}_small_integral"] = float(simpson(y=arr[sos_step:eos_step+1], x=t_days[sos_step:eos_step+1])) else: features[f"{idx}_small_integral"] = 0.0 # Difference Features jan_idx = [i for i, d in enumerate(dt_dates) if d.month == 1] nov_idx = [i for i, d in enumerate(dt_dates) if d.month == 11] if jan_idx and nov_idx and "ndvi" in smoothed: features["ndvi_jan_nov_diff"] = float(smoothed["ndvi"][jan_idx[0]] - smoothed["ndvi"][nov_idx[0]]) else: features["ndvi_jan_nov_diff"] = 0.0 # Interaction features if "ndvi_max" in features and "ndre_max" in features: features["ndvi_ndre_peak_diff"] = features["ndvi_max"] - features["ndre_max"] else: features["ndvi_ndre_peak_diff"] = 0.0 if "evi_mean" in features and "ndvi_mean" in features: features["canopy_density_contrast"] = features["evi_mean"] / (features["ndvi_mean"] + 0.001) else: features["canopy_density_contrast"] = 0.0 # Harmonics for idx in ["ndvi", "ndre", "evi"]: if idx in smoothed: harms = harmonic_features(smoothed[idx]) for h_name, h_val in harms.items(): features[f"{idx}_{h_name}"] = h_val else: for h_name in ["harmonic1_sin", "harmonic1_cos", "harmonic1_amp", "harmonic1_pha", "harmonic2_sin", "harmonic2_cos", "harmonic2_amp", "harmonic2_pha"]: features[f"{idx}_{h_name}"] = 0.0 # Window features win_feats = add_seasonal_windows(ts, dates=dt_dates) features.update(win_feats) return features def to_feature_vector(features: Dict[str, float], order: List[str] = None) -> np.ndarray: """Convert feature dict to ordered numpy array. Args: features: Dict of feature name -> value order: List of feature names in desired order Returns: 1D numpy array of features Raises: ValueError: If a key is missing from features """ if order is None: order = FEATURE_ORDER_V1 missing = [k for k in order if k not in features] if missing: raise ValueError(f"Missing features: {missing}") return np.array([features[k] for k in order], dtype=np.float32) # ========================================== # Test / Self-Test # ========================================== if __name__ == "__main__": print("=== Feature Computation Self-Test ===") # Create synthetic time series n = 24 # 24 observations (e.g., monthly for 2 years) t = np.linspace(0, 2 * np.pi, n) # Create synthetic NDVI: seasonal pattern with noise np.random.seed(42) ndvi = 0.5 + 0.3 * np.sin(t) + np.random.normal(0, 0.05, n) # Add some zeros (cloud gaps) ndvi[5] = 0 ndvi[12] = 0 # Create synthetic other indices ndre = 0.3 + 0.2 * np.sin(t) + np.random.normal(0, 0.03, n) evi = 0.4 + 0.25 * np.sin(t) + np.random.normal(0, 0.04, n) savi = 0.35 + 0.2 * np.sin(t) + np.random.normal(0, 0.03, n) ci_re = 0.1 + 0.1 * np.sin(t) + np.random.normal(0, 0.02, n) ndwi = 0.2 + 0.15 * np.cos(t) + np.random.normal(0, 0.02, n) ts = { "ndvi": ndvi, "ndre": ndre, "evi": evi, "savi": savi, "ci_re": ci_re, "ndwi": ndwi, } print("\n1. Testing fill_zeros_linear...") filled = fill_zeros_linear(ndvi.copy()) print(f" Original zeros at 5,12: {ndvi[5]:.2f}, {ndvi[12]:.2f}") print(f" After fill: {filled[5]:.2f}, {filled[12]:.2f}") print("\n2. Testing savgol_smooth_1d...") smoothed = savgol_smooth_1d(filled) print(f" Smoothed: min={smoothed.min():.3f}, max={smoothed.max():.3f}") print("\n3. Testing phenology_metrics...") pheno = phenology_metrics(smoothed) print(f" max={pheno['max']:.3f}, amplitude={pheno['amplitude']:.3f}, peak={pheno['peak_timestep']}") print("\n4. Testing harmonic_features...") harms = harmonic_features(smoothed) print(f" h1_sin={harms['harmonic1_sin']:.3f}, h1_cos={harms['harmonic1_cos']:.3f}") print("\n5. Testing build_features_for_pixel...") features = build_features_for_pixel(ts, step_days=10) # Print sorted keys keys = sorted(features.keys()) print(f" Total features (step 4A): {len(keys)}") print(f" Keys: {keys[:15]}...") # Print a few values print(f"\n Sample values:") print(f" ndvi_max: {features.get('ndvi_max', 'N/A')}") print(f" ndvi_amplitude: {features.get('ndvi_amplitude', 'N/A')}") print(f" ndvi_harmonic1_sin: {features.get('ndvi_harmonic1_sin', 'N/A')}") print(f" ndvi_ndre_peak_diff: {features.get('ndvi_ndre_peak_diff', 'N/A')}") print(f" canopy_density_contrast: {features.get('canopy_density_contrast', 'N/A')}") print("\n6. Testing seasonal windows (Step 4B)...") # Generate synthetic dates spanning Oct-Jun (27 steps = 270 days, 10-day steps) from datetime import datetime, timedelta start_date = datetime(2021, 10, 1) dates = [start_date + timedelta(days=i*10) for i in range(27)] # Pass RAW time series to add_seasonal_windows (it computes smoothing internally now) window_features = add_seasonal_windows(ts, dates=dates) print(f" Window features: {len(window_features)}") # Combine with base features features.update(window_features) print(f" Total features (with windows): {len(features)}") # Check window feature values print(f" Sample window features:") print(f" ndvi_early_mean: {window_features.get('ndvi_early_mean', 'N/A'):.3f}") print(f" ndvi_peak_max: {window_features.get('ndvi_peak_max', 'N/A'):.3f}") print(f" ndre_late_mean: {window_features.get('ndre_late_mean', 'N/A'):.3f}") print("\n7. Testing feature ordering (Step 4B)...") print(f" FEATURE_ORDER_V1 length: {len(FEATURE_ORDER_V1)}") print(f" First 10 features: {FEATURE_ORDER_V1[:10]}") # Create feature vector vector = to_feature_vector(features) print(f" Feature vector shape: {vector.shape}") print(f" Feature vector sum: {vector.sum():.3f}") # Verify lengths match - all should be 51 assert len(FEATURE_ORDER_V1) == 51, f"Expected 51 features in order, got {len(FEATURE_ORDER_V1)}" assert len(features) == 51, f"Expected 51 features in dict, got {len(features)}" assert vector.shape == (51,), f"Expected shape (51,), got {vector.shape}" print("\n8. Testing gap handling functions...") # Create time series with gaps gap_series = np.array([0.5, 0.6, np.nan, np.nan, np.nan, 0.7, 0.8, np.nan, 0.9, 0.4]) # Test handle_temporal_gaps with threshold=3 filled_series = handle_temporal_gaps(gap_series, gap_threshold=3) print(f" Original: {gap_series}") print(f" After gap handling (threshold=3): {filled_series}") # Test compute_gap_mask gap_mask = compute_gap_mask(gap_series, gap_threshold=3) print(f" Gap mask (threshold=3): {gap_mask}") # Test spatial_fill_nan spatial_arr = np.array([[0.5, 0.6, np.nan, 0.8], [0.7, np.nan, 0.9, 0.4], [np.nan, 0.3, 0.2, np.nan], [0.1, 0.2, 0.3, 0.4]]) filled_spatial = spatial_fill_nan(spatial_arr, max_iterations=2) print(f" Original spatial (2D):\n{spatial_arr}") print(f" After spatial fill:\n{filled_spatial}") print("\n=== STEP 4B All Tests Passed ===") print(f" Total features: {len(features)}") print(f" Feature order length: {len(FEATURE_ORDER_V1)}") print(f" Feature vector shape: {vector.shape}")