"""Pure numpy-based feature engineering for crop classification. STEP 4A: Feature computation functions that align with training pipeline. This module provides: - Savitzky-Golay smoothing with zero-filling fallback - Phenology metrics computation - Harmonic/Fourier features - Index computations (NDVI, NDRE, EVI, SAVI, CI_RE, NDWI) - Per-pixel feature builder NOTE: Seasonal window summaries come in Step 4B. """ from __future__ import annotations import math from typing import Dict, List import numpy as np # Try to import scipy for Savitzky-Golay, fall back to pure numpy try: from scipy.signal import savgol_filter as _savgol_filter HAS_SCIPY = True except ImportError: HAS_SCIPY = False # ========================================== # Smoothing Functions # ========================================== def fill_zeros_linear(y: np.ndarray) -> np.ndarray: """Fill zeros using linear interpolation. Treats 0 as missing ONLY when there are non-zero neighbors. Keeps true zeros if the whole series is zero. Args: y: 1D array Returns: Array with zeros filled by linear interpolation """ y = np.array(y, dtype=np.float64).copy() n = len(y) if n == 0: return y # Find zero positions zero_mask = (y == 0) # If all zeros, return as is if np.all(zero_mask): return y # Simple linear interpolation for interior zeros # Find first and last non-zero nonzero_idx = np.where(~zero_mask)[0] if len(nonzero_idx) == 0: return y first_nz = nonzero_idx[0] last_nz = nonzero_idx[-1] # Interpolate interior zeros for i in range(first_nz, last_nz + 1): if zero_mask[i]: # Find surrounding non-zero values left_idx = i - 1 while left_idx >= first_nz and zero_mask[left_idx]: left_idx -= 1 right_idx = i + 1 while right_idx <= last_nz and zero_mask[right_idx]: right_idx += 1 # Interpolate if left_idx >= first_nz and right_idx <= last_nz: left_val = y[left_idx] right_val = y[right_idx] dist = right_idx - left_idx if dist > 0: y[i] = left_val + (right_val - left_val) * (i - left_idx) / dist return y def savgol_smooth_1d(y: np.ndarray, window: int = 5, polyorder: int = 2) -> np.ndarray: """Apply Savitzky-Golay smoothing to 1D array. Uses scipy.signal.savgol_filter if available, otherwise falls back to simple polynomial least squares. Args: y: 1D array window: Window size (must be odd) polyorder: Polynomial order Returns: Smoothed array """ y = np.array(y, dtype=np.float64).copy() # Handle edge cases n = len(y) if n < window: return y # Can't apply SavGol to short series if HAS_SCIPY: return _savgol_filter(y, window, polyorder, mode='nearest') # Fallback: Simple moving average (simplified) # A proper implementation would do polynomial fitting pad = window // 2 result = np.zeros_like(y) for i in range(n): start = max(0, i - pad) end = min(n, i + pad + 1) result[i] = np.mean(y[start:end]) return result def smooth_series(y: np.ndarray) -> np.ndarray: """Apply full smoothing pipeline: fill zeros + Savitzky-Golay. Args: y: 1D array (time series) Returns: Smoothed array """ # Fill zeros first y_filled = fill_zeros_linear(y) # Then apply Savitzky-Golay return savgol_smooth_1d(y_filled, window=5, polyorder=2) # ========================================== # Index Computations # ========================================== def ndvi(nir: np.ndarray, red: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Vegetation Index. NDVI = (NIR - Red) / (NIR + Red) """ denom = nir + red return np.where(np.abs(denom) > eps, (nir - red) / denom, 0.0) def ndre(nir: np.ndarray, rededge: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Red-Edge Index. NDRE = (NIR - RedEdge) / (NIR + RedEdge) """ denom = nir + rededge return np.where(np.abs(denom) > eps, (nir - rededge) / denom, 0.0) def evi(nir: np.ndarray, red: np.ndarray, blue: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Enhanced Vegetation Index. EVI = 2.5 * (NIR - Red) / (NIR + 6*Red - 7.5*Blue + 1) """ denom = nir + 6 * red - 7.5 * blue + 1 return np.where(np.abs(denom) > eps, 2.5 * (nir - red) / denom, 0.0) def savi(nir: np.ndarray, red: np.ndarray, L: float = 0.5, eps: float = 1e-8) -> np.ndarray: """Soil Adjusted Vegetation Index. SAVI = ((NIR - Red) / (NIR + Red + L)) * (1 + L) """ denom = nir + red + L return np.where(np.abs(denom) > eps, ((nir - red) / denom) * (1 + L), 0.0) def ci_re(nir: np.ndarray, rededge: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Chlorophyll Index - Red-Edge. CI_RE = (NIR / RedEdge) - 1 """ return np.where(np.abs(rededge) > eps, nir / rededge - 1, 0.0) def ndwi(green: np.ndarray, nir: np.ndarray, eps: float = 1e-8) -> np.ndarray: """Normalized Difference Water Index. NDWI = (Green - NIR) / (Green + NIR) """ denom = green + nir return np.where(np.abs(denom) > eps, (green - nir) / denom, 0.0) # ========================================== # Phenology Metrics # ========================================== def phenology_metrics(y: np.ndarray, step_days: int = 10) -> Dict[str, float]: """Compute phenology metrics from time series. Args: y: 1D time series array (already smoothed or raw) step_days: Days between observations (for AUC calculation) Returns: Dict with: max, min, mean, std, amplitude, auc, peak_timestep, max_slope_up, max_slope_down """ # Handle all-NaN or all-zero if y is None or len(y) == 0 or np.all(np.isnan(y)) or np.all(y == 0): return { "max": 0.0, "min": 0.0, "mean": 0.0, "std": 0.0, "amplitude": 0.0, "auc": 0.0, "peak_timestep": 0, "max_slope_up": 0.0, "max_slope_down": 0.0, } y = np.array(y, dtype=np.float64) # Replace NaN with 0 for computation y_clean = np.nan_to_num(y, nan=0.0) result = {} result["max"] = float(np.max(y_clean)) result["min"] = float(np.min(y_clean)) result["mean"] = float(np.mean(y_clean)) result["std"] = float(np.std(y_clean)) result["amplitude"] = result["max"] - result["min"] # AUC - trapezoidal integration n = len(y_clean) if n > 1: auc = 0.0 for i in range(n - 1): auc += (y_clean[i] + y_clean[i + 1]) * step_days / 2 result["auc"] = float(auc) else: result["auc"] = 0.0 # Peak timestep (argmax) result["peak_timestep"] = int(np.argmax(y_clean)) # Slopes if n > 1: slopes = np.diff(y_clean) result["max_slope_up"] = float(np.max(slopes)) result["max_slope_down"] = float(np.min(slopes)) else: result["max_slope_up"] = 0.0 result["max_slope_down"] = 0.0 return result # ========================================== # Harmonic Features # ========================================== def harmonic_features(y: np.ndarray) -> Dict[str, float]: """Compute harmonic/Fourier features from time series. Projects onto sin/cos at 1st and 2nd harmonics. Args: y: 1D time series array Returns: Dict with: harmonic1_sin, harmonic1_cos, harmonic2_sin, harmonic2_cos """ y = np.array(y, dtype=np.float64) y_clean = np.nan_to_num(y, nan=0.0) n = len(y_clean) if n == 0: return { "harmonic1_sin": 0.0, "harmonic1_cos": 0.0, "harmonic2_sin": 0.0, "harmonic2_cos": 0.0, } # Normalize time to 0-2pi t = np.array([2 * math.pi * k / n for k in range(n)]) # First harmonic result = {} result["harmonic1_sin"] = float(np.mean(y_clean * np.sin(t))) result["harmonic1_cos"] = float(np.mean(y_clean * np.cos(t))) # Second harmonic t2 = 2 * t result["harmonic2_sin"] = float(np.mean(y_clean * np.sin(t2))) result["harmonic2_cos"] = float(np.mean(y_clean * np.cos(t2))) return result # ========================================== # Per-Pixel Feature Builder # ========================================== def build_features_for_pixel( ts: Dict[str, np.ndarray], step_days: int = 10, ) -> Dict[str, float]: """Build all scalar features for a single pixel's time series. Args: ts: Dict of index name -> 1D array time series Keys: "ndvi", "ndre", "evi", "savi", "ci_re", "ndwi" step_days: Days between observations Returns: Dict with ONLY scalar computed features (no arrays): - phenology: ndvi_*, ndre_*, evi_* (max, min, mean, std, amplitude, auc, peak_timestep, max_slope_up, max_slope_down) - harmonics: ndvi_harmonic1_sin, ndvi_harmonic1_cos, ndvi_harmonic2_sin, ndvi_harmonic2_cos - interactions: ndvi_ndre_peak_diff, canopy_density_contrast NOTE: Smoothed time series are NOT included (they are arrays, not scalars). For seasonal window features, use add_seasonal_windows() separately. """ features = {} # Ensure all arrays are float64 ts_clean = {} for key, arr in ts.items(): arr = np.array(arr, dtype=np.float64) ts_clean[key] = arr # Indices to process for phenology phenology_indices = ["ndvi", "ndre", "evi"] # Process each index: smooth + phenology phenology_results = {} for idx in phenology_indices: if idx in ts_clean and ts_clean[idx] is not None: # Smooth (but don't store array in features dict - only use for phenology) smoothed = smooth_series(ts_clean[idx]) # Phenology on smoothed pheno = phenology_metrics(smoothed, step_days) phenology_results[idx] = pheno # Add to features with prefix (SCALARS ONLY) for metric_name, value in pheno.items(): features[f"{idx}_{metric_name}"] = value # Handle savi - just smooth (no phenology in training for savi) # Note: savi_smooth is NOT stored in features (it's an array) # Harmonic features (only for ndvi) if "ndvi" in ts_clean and ts_clean["ndvi"] is not None: # Use smoothed ndvi ndvi_smooth = smooth_series(ts_clean["ndvi"]) harms = harmonic_features(ndvi_smooth) for name, value in harms.items(): features[f"ndvi_{name}"] = value # Interaction features # ndvi_ndre_peak_diff = ndvi_max - ndre_max if "ndvi" in phenology_results and "ndre" in phenology_results: features["ndvi_ndre_peak_diff"] = ( phenology_results["ndvi"]["max"] - phenology_results["ndre"]["max"] ) # canopy_density_contrast = evi_mean / (ndvi_mean + 0.001) if "evi" in phenology_results and "ndvi" in phenology_results: features["canopy_density_contrast"] = ( phenology_results["evi"]["mean"] / (phenology_results["ndvi"]["mean"] + 0.001) ) return features # ========================================== # STEP 4B: Seasonal Window Summaries # ========================================== def _get_window_indices(n_steps: int, dates=None) -> Dict[str, List[int]]: """Get time indices for each seasonal window. Args: n_steps: Number of time steps dates: Optional list of dates (datetime, date, or str) Returns: Dict mapping window name to list of indices """ if dates is not None: # Use dates to determine windows window_idx = {"early": [], "peak": [], "late": []} for i, d in enumerate(dates): # Parse date if isinstance(d, str): # Try to parse as date try: from datetime import datetime d = datetime.fromisoformat(d.replace('Z', '+00:00')) except: continue elif hasattr(d, 'month'): month = d.month else: continue if month in [10, 11, 12]: window_idx["early"].append(i) elif month in [1, 2, 3]: window_idx["peak"].append(i) elif month in [4, 5, 6]: window_idx["late"].append(i) return window_idx else: # Fallback: positional split (27 steps = ~9 months Oct-Jun at 10-day intervals) # Early: Oct-Dec (first ~9 steps) # Peak: Jan-Mar (next ~9 steps) # Late: Apr-Jun (next ~9 steps) early_end = min(9, n_steps // 3) peak_end = min(18, 2 * n_steps // 3) return { "early": list(range(0, early_end)), "peak": list(range(early_end, peak_end)), "late": list(range(peak_end, n_steps)), } def _compute_window_stats(arr: np.ndarray, indices: List[int]) -> Dict[str, float]: """Compute mean and max for a window. Args: arr: 1D array of values indices: List of indices for this window Returns: Dict with mean and max (or 0.0 if no indices) """ if not indices or len(indices) == 0: return {"mean": 0.0, "max": 0.0} # Filter out NaN values = [arr[i] for i in indices if i < len(arr) and not np.isnan(arr[i])] if not values: return {"mean": 0.0, "max": 0.0} return { "mean": float(np.mean(values)), "max": float(np.max(values)), } def add_seasonal_windows( ts: Dict[str, np.ndarray], dates=None, ) -> Dict[str, float]: """Add seasonal window summary features. Season: Oct-Jun split into: - Early: Oct-Dec - Peak: Jan-Mar - Late: Apr-Jun For each window, compute mean and max for NDVI, NDWI, NDRE. This function computes smoothing internally so it accepts raw time series. Args: ts: Dict of index name -> raw 1D array time series dates: Optional dates for window determination Returns: Dict with 18 window features (scalars only): - ndvi_early_mean, ndvi_early_max - ndvi_peak_mean, ndvi_peak_max - ndvi_late_mean, ndvi_late_max - ndwi_early_mean, ndwi_early_max - ... (same for ndre) """ features = {} # Determine window indices first_arr = next(iter(ts.values())) n_steps = len(first_arr) window_idx = _get_window_indices(n_steps, dates) # Process each index - smooth internally for idx in ["ndvi", "ndwi", "ndre"]: if idx not in ts: continue # Smooth the time series internally arr_raw = np.array(ts[idx], dtype=np.float64) arr_smoothed = smooth_series(arr_raw) for window_name in ["early", "peak", "late"]: indices = window_idx.get(window_name, []) stats = _compute_window_stats(arr_smoothed, indices) features[f"{idx}_{window_name}_mean"] = stats["mean"] features[f"{idx}_{window_name}_max"] = stats["max"] return features # ========================================== # STEP 4B: Feature Ordering # ========================================== # Phenology metric order (matching training) PHENO_METRIC_ORDER = [ "max", "min", "mean", "std", "amplitude", "auc", "peak_timestep", "max_slope_up", "max_slope_down" ] # Feature order V1: 55 features total (excluding smooth arrays which are not scalar) FEATURE_ORDER_V1 = [] # A) Phenology for ndvi, ndre, evi (in that order, each with 9 metrics) for idx in ["ndvi", "ndre", "evi"]: for metric in PHENO_METRIC_ORDER: FEATURE_ORDER_V1.append(f"{idx}_{metric}") # B) Harmonics for ndvi FEATURE_ORDER_V1.extend([ "ndvi_harmonic1_sin", "ndvi_harmonic1_cos", "ndvi_harmonic2_sin", "ndvi_harmonic2_cos", ]) # C) Interaction features FEATURE_ORDER_V1.extend([ "ndvi_ndre_peak_diff", "canopy_density_contrast", ]) # D) Window summaries: ndvi, ndwi, ndre (in that order) # Early, Peak, Late (in that order) # Mean, Max (in that order) for idx in ["ndvi", "ndwi", "ndre"]: for window in ["early", "peak", "late"]: FEATURE_ORDER_V1.append(f"{idx}_{window}_mean") FEATURE_ORDER_V1.append(f"{idx}_{window}_max") # Verify: 27 + 4 + 2 + 18 = 51 features (scalar only) # Note: The actual features dict may have additional array features (smoothed series) # which are not included in FEATURE_ORDER_V1 since they are not scalar def to_feature_vector(features: Dict[str, float], order: List[str] = None) -> np.ndarray: """Convert feature dict to ordered numpy array. Args: features: Dict of feature name -> value order: List of feature names in desired order Returns: 1D numpy array of features Raises: ValueError: If a key is missing from features """ if order is None: order = FEATURE_ORDER_V1 missing = [k for k in order if k not in features] if missing: raise ValueError(f"Missing features: {missing}") return np.array([features[k] for k in order], dtype=np.float32) # ========================================== # Test / Self-Test # ========================================== if __name__ == "__main__": print("=== Feature Computation Self-Test ===") # Create synthetic time series n = 24 # 24 observations (e.g., monthly for 2 years) t = np.linspace(0, 2 * np.pi, n) # Create synthetic NDVI: seasonal pattern with noise np.random.seed(42) ndvi = 0.5 + 0.3 * np.sin(t) + np.random.normal(0, 0.05, n) # Add some zeros (cloud gaps) ndvi[5] = 0 ndvi[12] = 0 # Create synthetic other indices ndre = 0.3 + 0.2 * np.sin(t) + np.random.normal(0, 0.03, n) evi = 0.4 + 0.25 * np.sin(t) + np.random.normal(0, 0.04, n) savi = 0.35 + 0.2 * np.sin(t) + np.random.normal(0, 0.03, n) ci_re = 0.1 + 0.1 * np.sin(t) + np.random.normal(0, 0.02, n) ndwi = 0.2 + 0.15 * np.cos(t) + np.random.normal(0, 0.02, n) ts = { "ndvi": ndvi, "ndre": ndre, "evi": evi, "savi": savi, "ci_re": ci_re, "ndwi": ndwi, } print("\n1. Testing fill_zeros_linear...") filled = fill_zeros_linear(ndvi.copy()) print(f" Original zeros at 5,12: {ndvi[5]:.2f}, {ndvi[12]:.2f}") print(f" After fill: {filled[5]:.2f}, {filled[12]:.2f}") print("\n2. Testing savgol_smooth_1d...") smoothed = savgol_smooth_1d(filled) print(f" Smoothed: min={smoothed.min():.3f}, max={smoothed.max():.3f}") print("\n3. Testing phenology_metrics...") pheno = phenology_metrics(smoothed) print(f" max={pheno['max']:.3f}, amplitude={pheno['amplitude']:.3f}, peak={pheno['peak_timestep']}") print("\n4. Testing harmonic_features...") harms = harmonic_features(smoothed) print(f" h1_sin={harms['harmonic1_sin']:.3f}, h1_cos={harms['harmonic1_cos']:.3f}") print("\n5. Testing build_features_for_pixel...") features = build_features_for_pixel(ts, step_days=10) # Print sorted keys keys = sorted(features.keys()) print(f" Total features (step 4A): {len(keys)}") print(f" Keys: {keys[:15]}...") # Print a few values print(f"\n Sample values:") print(f" ndvi_max: {features.get('ndvi_max', 'N/A')}") print(f" ndvi_amplitude: {features.get('ndvi_amplitude', 'N/A')}") print(f" ndvi_harmonic1_sin: {features.get('ndvi_harmonic1_sin', 'N/A')}") print(f" ndvi_ndre_peak_diff: {features.get('ndvi_ndre_peak_diff', 'N/A')}") print(f" canopy_density_contrast: {features.get('canopy_density_contrast', 'N/A')}") print("\n6. Testing seasonal windows (Step 4B)...") # Generate synthetic dates spanning Oct-Jun (27 steps = 270 days, 10-day steps) from datetime import datetime, timedelta start_date = datetime(2021, 10, 1) dates = [start_date + timedelta(days=i*10) for i in range(27)] # Pass RAW time series to add_seasonal_windows (it computes smoothing internally now) window_features = add_seasonal_windows(ts, dates=dates) print(f" Window features: {len(window_features)}") # Combine with base features features.update(window_features) print(f" Total features (with windows): {len(features)}") # Check window feature values print(f" Sample window features:") print(f" ndvi_early_mean: {window_features.get('ndvi_early_mean', 'N/A'):.3f}") print(f" ndvi_peak_max: {window_features.get('ndvi_peak_max', 'N/A'):.3f}") print(f" ndre_late_mean: {window_features.get('ndre_late_mean', 'N/A'):.3f}") print("\n7. Testing feature ordering (Step 4B)...") print(f" FEATURE_ORDER_V1 length: {len(FEATURE_ORDER_V1)}") print(f" First 10 features: {FEATURE_ORDER_V1[:10]}") # Create feature vector vector = to_feature_vector(features) print(f" Feature vector shape: {vector.shape}") print(f" Feature vector sum: {vector.sum():.3f}") # Verify lengths match - all should be 51 assert len(FEATURE_ORDER_V1) == 51, f"Expected 51 features in order, got {len(FEATURE_ORDER_V1)}" assert len(features) == 51, f"Expected 51 features in dict, got {len(features)}" assert vector.shape == (51,), f"Expected shape (51,), got {vector.shape}" print("\n=== STEP 4B All Tests Passed ===") print(f" Total features: {len(features)}") print(f" Feature order length: {len(FEATURE_ORDER_V1)}") print(f" Feature vector shape: {vector.shape}")