762 lines
22 KiB
Markdown
762 lines
22 KiB
Markdown
# Plan 01: STAC Inference Worker Architecture
|
|
|
|
**Status**: Pending Implementation
|
|
**Date**: 2026-02-27
|
|
|
|
---
|
|
|
|
## Objective
|
|
|
|
Replace the mock worker with a real Python implementation that:
|
|
1. Queries Digital Earth Africa (DEA) STAC API for Sentinel-2 imagery
|
|
2. Computes vegetation indices (NDVI, EVI, SAVI) and seasonal peaks
|
|
3. Loads and applies ML models for crop classification
|
|
4. Applies neighborhood smoothing to refine results
|
|
5. Exports Cloud Optimized GeoTIFFs (COGs) to MinIO
|
|
|
|
---
|
|
|
|
## 1. Architecture Overview
|
|
|
|
```mermaid
|
|
graph TD
|
|
A[API: Job Request] -->|Queue| B[RQ Worker]
|
|
B --> C[DEA STAC API]
|
|
B --> D[MinIO: DW Baselines]
|
|
C -->|Sentinel-2 L2A| E[Feature Computation]
|
|
D -->|DW Raster| E
|
|
E --> F[ML Model Inference]
|
|
F --> G[Neighborhood Smoothing]
|
|
G --> H[COG Export]
|
|
H -->|Upload| I[MinIO: Results]
|
|
I -->|Signed URL| J[API Response]
|
|
```
|
|
|
|
---
|
|
|
|
## 2. Worker Architecture (Python Modules)
|
|
|
|
Create/keep the following modules in `apps/worker/`:
|
|
|
|
| Module | Purpose |
|
|
|--------|---------|
|
|
| `config.py` | STAC endpoints, season windows (Sep→May), allowed years 2015→present, max radius 5km, bucket/prefix config, kernel sizes (3/5/7) |
|
|
| `features.py` | STAC search + asset selection, download/stream windows for AOI, compute indices and composites, optional caching |
|
|
| `inference.py` | Load model artifacts from MinIO (`model.joblib`, `label_encoder.joblib`, `scaler.joblib`, `selected_features.json`), run prediction over feature stack, output class raster + optional confidence raster |
|
|
| `postprocess.py` (optional) | Neighborhood smoothing majority filter, class remapping utilities |
|
|
| `io.py` (optional) | MinIO read/write helpers, create signed URLs |
|
|
|
|
### 2.1 Key Configuration
|
|
|
|
From [`training/config.py`](training/config.py:146):
|
|
```python
|
|
# DEA STAC
|
|
dea_root: str = "https://explorer.digitalearth.africa/stac"
|
|
dea_search: str = "https://explorer.digitalearth.africa/stac/search"
|
|
|
|
# Season window (Sept → May)
|
|
summer_start_month: int = 9
|
|
summer_start_day: int = 1
|
|
summer_end_month: int = 5
|
|
summer_end_day: int = 31
|
|
|
|
# Smoothing
|
|
smoothing_kernel: int = 3
|
|
```
|
|
|
|
### 2.2 Job Payload Contract (API → Redis)
|
|
|
|
Define a stable payload schema (JSON):
|
|
|
|
```json
|
|
{
|
|
"job_id": "uuid",
|
|
"user_id": "uuid",
|
|
"aoi": {"lon": 30.46, "lat": -16.81, "radius_m": 2000},
|
|
"year": 2021,
|
|
"season": "summer",
|
|
"model": "Ensemble",
|
|
"smoothing_kernel": 5,
|
|
"outputs": {
|
|
"refined": true,
|
|
"dw_baseline": true,
|
|
"true_color": true,
|
|
"indices": ["ndvi_peak","evi_peak","savi_peak"]
|
|
}
|
|
}
|
|
```
|
|
|
|
Worker must accept missing optional fields and apply defaults.
|
|
|
|
## 3. AOI Validation
|
|
|
|
- Radius <= 5000m
|
|
- AOI inside Zimbabwe:
|
|
- **Preferred**: use a Zimbabwe boundary polygon (GeoJSON) baked into the worker image, then point-in-polygon test on center + buffer intersects.
|
|
- **Fallback**: bbox check (already in AGENTS) — keep as quick pre-check.
|
|
|
|
## 4. DEA STAC Data Strategy
|
|
|
|
### 4.1 STAC Endpoint
|
|
|
|
- `https://explorer.digitalearth.africa/stac/search`
|
|
|
|
### 4.2 Collections (Initial Shortlist)
|
|
|
|
Start with a stable optical source for true color + indices.
|
|
|
|
- Primary: Sentinel-2 L2A (DEA collection likely `s2_l2a` / `s2_l2a_c1`)
|
|
- Fallback: Landsat (e.g., `landsat_c2l2_ar`, `ls8_sr`, `ls9_sr`)
|
|
|
|
### 4.3 Season Window
|
|
|
|
Model season: **Sep 1 → May 31** (year to year+1).
|
|
Example for year=2018: 2018-09-01 to 2019-05-31.
|
|
|
|
### 4.4 Peak Indices Logic
|
|
|
|
- For each index (NDVI/EVI/SAVI): compute per-scene index, then take per-pixel max across the season.
|
|
- Use a cloud mask/quality mask if available in assets (or use best-effort filtering initially).
|
|
|
|
## 5. Dynamic World Baseline Loading
|
|
|
|
- Worker locates DW baseline by year/season using object key manifest.
|
|
- Read baseline COG from MinIO with rasterio's VSI S3 support (or download temporarily).
|
|
- Clip to AOI window.
|
|
- Baseline is used as an input feature and as a UI toggle layer.
|
|
|
|
## 6. Model Inference Strategy
|
|
|
|
- Feature raster stack → flatten to (N_pixels, N_features)
|
|
- Apply scaler if present
|
|
- Predict class for each pixel
|
|
- Reshape back to raster
|
|
- Save refined class raster (uint8)
|
|
|
|
### 6.1 Class List and Palette
|
|
|
|
- Treat classes as dynamic:
|
|
- label encoder classes_ define valid class names
|
|
- palette is generated at runtime (deterministic) or stored alongside model version as `palette.json`
|
|
|
|
## 7. Neighborhood Smoothing
|
|
|
|
- Majority filter over predicted class raster.
|
|
- Must preserve nodata.
|
|
- Kernel sizes 3/5/7; default 5.
|
|
|
|
## 8. Outputs
|
|
|
|
- **Refined class map (10m)**: GeoTIFF → convert to COG → upload to MinIO.
|
|
- Optional outputs:
|
|
- DW baseline clipped (COG)
|
|
- True color composite (COG)
|
|
- Index peaks (COG per index)
|
|
|
|
Object layout:
|
|
- `geocrop-results/results/<job_id>/refined.tif`
|
|
- `.../dw_baseline.tif`
|
|
- `.../truecolor.tif`
|
|
- `.../ndvi_peak.tif` etc.
|
|
|
|
## 9. Status & Progress Updates
|
|
|
|
Worker should update job state (queued/running/stage/progress/errors). Two options:
|
|
|
|
1. Store in Redis hash keyed by job_id (fast)
|
|
2. Store in a DB (later)
|
|
|
|
For portfolio MVP, Redis is fine:
|
|
- `job:<job_id>:status` = json blob
|
|
|
|
Stages:
|
|
- `fetch_stac` → `build_features` → `load_dw` → `infer` → `smooth` → `export_cog` → `upload` → `done`
|
|
|
|
---
|
|
|
|
## 11. Implementation Components
|
|
|
|
### 3.1 STAC Client Module
|
|
|
|
Create `apps/worker/stac_client.py`:
|
|
|
|
```python
|
|
"""DEA STAC API client for fetching Sentinel-2 imagery."""
|
|
|
|
import pystac_client
|
|
import stackstac
|
|
import xarray as xr
|
|
from datetime import datetime
|
|
from typing import Tuple, List, Dict, Any
|
|
|
|
# DEA STAC endpoints (DEAfrom config.py)
|
|
_STAC_URL = "https://explorer.digitalearth.africa/stac"
|
|
|
|
class DEASTACClient:
|
|
"""Client for querying DEA STAC API."""
|
|
|
|
# Sentinel-2 L2A collection
|
|
COLLECTION = "s2_l2a"
|
|
|
|
# Required bands for feature computation
|
|
BANDS = ["red", "green", "blue", "nir", "swir_1", "swir_2"]
|
|
|
|
def __init__(self, stac_url: str = DEA_STAC_URL):
|
|
self.client = pystac_client.Client.open(stac_url)
|
|
|
|
def search(
|
|
self,
|
|
bbox: List[float], # [minx, miny, maxx, maxy]
|
|
start_date: str, # YYYY-MM-DD
|
|
end_date: str, # YYYY-MM-DD
|
|
collections: List[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Search for STAC items matching criteria."""
|
|
if collections is None:
|
|
collections = [self.COLLECTION]
|
|
|
|
search = self.client.search(
|
|
collections=collections,
|
|
bbox=bbox,
|
|
datetime=f"{start_date}/{end_date}",
|
|
query={
|
|
"eo:cloud_cover": {"lt": 20}, # Filter cloudy scenes
|
|
}
|
|
)
|
|
return list(search.items())
|
|
|
|
def load_data(
|
|
self,
|
|
items: List[Dict],
|
|
bbox: List[float],
|
|
bands: List[str] = None,
|
|
resolution: int = 10,
|
|
) -> xr.DataArray:
|
|
"""Load STAC items as xarray DataArray using stackstac."""
|
|
if bands is None:
|
|
bands = self.BANDS
|
|
|
|
# Use stackstac to load and stack the items
|
|
cube = stackstac.stack(
|
|
items,
|
|
bounds=bbox,
|
|
resolution=resolution,
|
|
bands=bands,
|
|
chunks={"x": 512, "y": 512},
|
|
epsg=32736, # UTM Zone 36S (Zimbabwe)
|
|
)
|
|
return cube
|
|
```
|
|
|
|
### 3.2 Feature Computation Module
|
|
|
|
Update `apps/worker/features.py`:
|
|
|
|
```python
|
|
"""Feature computation from DEA STAC data."""
|
|
|
|
import numpy as np
|
|
import xarray as xr
|
|
from typing import Tuple, Dict
|
|
|
|
|
|
def compute_indices(da: xr.DataArray) -> Dict[str, xr.DataArray]:
|
|
"""Compute vegetation indices from STAC data.
|
|
|
|
Args:
|
|
da: xarray DataArray with bands (red, green, blue, nir, swir_1, swir_2)
|
|
|
|
Returns:
|
|
Dictionary of index name -> index DataArray
|
|
"""
|
|
# Get band arrays
|
|
red = da.sel(band="red")
|
|
nir = da.sel(band="nir")
|
|
blue = da.sel(band="blue")
|
|
green = da.sel(band="green")
|
|
swir1 = da.sel(band="swir_1")
|
|
|
|
# NDVI = (NIR - Red) / (NIR + Red)
|
|
ndvi = (nir - red) / (nir + red)
|
|
|
|
# EVI = 2.5 * (NIR - Red) / (NIR + 6*Red - 7.5*Blue + 1)
|
|
evi = 2.5 * (nir - red) / (nir + 6*red - 7.5*blue + 1)
|
|
|
|
# SAVI = ((NIR - Red) / (NIR + Red + L)) * (1 + L)
|
|
# L = 0.5 for semi-arid areas
|
|
L = 0.5
|
|
savi = ((nir - red) / (nir + red + L)) * (1 + L)
|
|
|
|
return {
|
|
"ndvi": ndvi,
|
|
"evi": evi,
|
|
"savi": savi,
|
|
}
|
|
|
|
|
|
def compute_seasonal_peaks(
|
|
timeseries: xr.DataArray,
|
|
) -> Tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
|
|
"""Compute peak (maximum) values for the season.
|
|
|
|
Args:
|
|
timeseries: xarray DataArray with time dimension
|
|
|
|
Returns:
|
|
Tuple of (ndvi_peak, evi_peak, savi_peak)
|
|
"""
|
|
ndvi_peak = timeseries["ndvi"].max(dim="time")
|
|
evi_peak = timeseries["evi"].max(dim="time")
|
|
savi_peak = timeseries["savi"].max(dim="time")
|
|
|
|
return ndvi_peak, evi_peak, savi_peak
|
|
|
|
|
|
def compute_true_color(da: xr.DataArray) -> xr.DataArray:
|
|
"""Compute true color composite (RGB)."""
|
|
rgb = xr.concat([
|
|
da.sel(band="red"),
|
|
da.sel(band="green"),
|
|
da.sel(band="blue"),
|
|
], dim="band")
|
|
return rgb
|
|
```
|
|
|
|
### 3.3 MinIO Storage Adapter
|
|
|
|
Update `apps/worker/config.py` with MinIO-backed storage:
|
|
|
|
```python
|
|
"""MinIO storage adapter for inference."""
|
|
|
|
import io
|
|
import boto3
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from botocore.config import Config
|
|
|
|
|
|
class MinIOStorage(StorageAdapter):
|
|
"""Production storage adapter using MinIO."""
|
|
|
|
def __init__(
|
|
self,
|
|
endpoint: str = "minio.geocrop.svc.cluster.local:9000",
|
|
access_key: str = None,
|
|
secret_key: str = None,
|
|
bucket_baselines: str = "geocrop-baselines",
|
|
bucket_results: str = "geocrop-results",
|
|
bucket_models: str = "geocrop-models",
|
|
):
|
|
self.endpoint = endpoint
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
self.bucket_baselines = bucket_baselines
|
|
self.bucket_results = bucket_results
|
|
self.bucket_models = bucket_models
|
|
|
|
# Configure S3 client with path-style addressing
|
|
self.s3 = boto3.client(
|
|
"s3",
|
|
endpoint_url=f"http://{endpoint}",
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
config=Config(signature_version="s3v4"),
|
|
)
|
|
|
|
def download_model_bundle(self, model_key: str, dest_dir: Path):
|
|
"""Download model files from geocrop-models bucket."""
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Expected files: model.joblib, scaler.joblib, label_encoder.json, selected_features.json
|
|
files = ["model.joblib", "scaler.joblib", "label_encoder.json", "selected_features.json"]
|
|
|
|
for filename in files:
|
|
try:
|
|
key = f"{model_key}/{filename}"
|
|
local_path = dest_dir / filename
|
|
self.s3.download_file(self.bucket_models, key, str(local_path))
|
|
except Exception as e:
|
|
if filename == "scaler.joblib":
|
|
# Scaler is optional
|
|
continue
|
|
raise FileNotFoundError(f"Missing model file: {key}") from e
|
|
|
|
def get_dw_local_path(self, year: int, season: str) -> str:
|
|
"""Download DW baseline to temp and return path.
|
|
|
|
Uses DW_Zim_HighestConf_{year}_{year+1}.tif format.
|
|
"""
|
|
import tempfile
|
|
|
|
# Map to filename convention in MinIO
|
|
filename = f"DW_Zim_HighestConf_{year}_{year+1}.tif"
|
|
|
|
# For tiled COGs, we need to handle multiple tiles
|
|
# This is a simplified version - actual implementation needs
|
|
# to handle the 2x2 tile structure
|
|
|
|
# For now, return a prefix that the clip function will handle
|
|
return f"s3://{self.bucket_baselines}/DW_Zim_HighestConf_{year}_{year+1}"
|
|
|
|
def download_dw_baseline(self, year: int, aoi_bounds: list) -> str:
|
|
"""Download DW baseline tiles covering AOI to temp storage."""
|
|
import tempfile
|
|
|
|
# Based on AOI bounds, determine which tiles needed
|
|
# Each tile is ~65536 x 65536 pixels
|
|
# Files named: DW_Zim_HighestConf_{year}_{year+1}-{tileX}-{tileY}.tif
|
|
|
|
temp_dir = tempfile.mkdtemp(prefix="dw_baseline_")
|
|
|
|
# Determine tiles needed based on AOI bounds
|
|
# This is simplified - needs proper bounds checking
|
|
|
|
return temp_dir
|
|
|
|
def upload_result(self, local_path: Path, job_id: str, filename: str = "refined.tif") -> str:
|
|
"""Upload result COG to MinIO."""
|
|
key = f"jobs/{job_id}/{filename}"
|
|
self.s3.upload_file(str(local_path), self.bucket_results, key)
|
|
return f"s3://{self.bucket_results}/{key}"
|
|
|
|
def generate_presigned_url(self, bucket: str, key: str, expires: int = 3600) -> str:
|
|
"""Generate presigned URL for download."""
|
|
url = self.s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": bucket, "Key": key},
|
|
ExpiresIn=expires,
|
|
)
|
|
return url
|
|
```
|
|
|
|
### 3.4 Updated Worker Entry Point
|
|
|
|
Update `apps/worker/worker.py`:
|
|
|
|
```python
|
|
"""GeoCrop Worker - Real STAC + ML inference pipeline."""
|
|
|
|
import os
|
|
import json
|
|
import tempfile
|
|
import numpy as np
|
|
import joblib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from redis import Redis
|
|
from rq import Worker, Queue
|
|
|
|
# Import local modules
|
|
from config import InferenceConfig, MinIOStorage
|
|
from features import (
|
|
validate_aoi_zimbabwe,
|
|
clip_raster_to_aoi,
|
|
majority_filter,
|
|
)
|
|
from stac_client import DEASTACClient
|
|
from feature_computation import compute_indices, compute_seasonal_peaks
|
|
|
|
|
|
# Configuration
|
|
REDIS_HOST = os.getenv("REDIS_HOST", "redis.geocrop.svc.cluster.local")
|
|
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio.geocrop.svc.cluster.local:9000")
|
|
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
|
|
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
|
|
|
|
redis_conn = Redis(host=REDIS_HOST, port=6379)
|
|
|
|
|
|
def run_inference(job_data: dict):
|
|
"""Main inference function called by RQ worker."""
|
|
|
|
print(f"🚀 Starting inference job {job_data.get('job_id', 'unknown')}")
|
|
|
|
# Extract parameters
|
|
lat = job_data["lat"]
|
|
lon = job_data["lon"]
|
|
radius_km = job_data["radius_km"]
|
|
year = job_data["year"]
|
|
model_name = job_data["model_name"]
|
|
job_id = job_data.get("job_id")
|
|
|
|
# Validate AOI
|
|
aoi = (lon, lat, radius_km * 1000) # Convert to meters
|
|
validate_aoi_zimbabwe(aoi)
|
|
|
|
# Initialize config
|
|
cfg = InferenceConfig(
|
|
storage=MinIOStorage(
|
|
endpoint=MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
)
|
|
)
|
|
|
|
# Get season dates
|
|
start_date, end_date = cfg.season_dates(int(year), "summer")
|
|
print(f"📅 Season: {start_date} to {end_date}")
|
|
|
|
# Step 1: Query DEA STAC
|
|
print("🔍 Querying DEA STAC API...")
|
|
stac_client = DEASTACClient()
|
|
|
|
# Convert AOI to bbox (approximate)
|
|
radius_deg = radius_km / 111.0 # Rough conversion
|
|
bbox = [lon - radius_deg, lat - radius_deg, lon + radius_deg, lat + radius_deg]
|
|
|
|
items = stac_client.search(bbox, start_date, end_date)
|
|
print(f"📡 Found {len(items)} Sentinel-2 scenes")
|
|
|
|
if len(items) == 0:
|
|
raise ValueError("No Sentinel-2 imagery available for the selected AOI and date range")
|
|
|
|
# Step 2: Load and process STAC data
|
|
print("📥 Loading satellite imagery...")
|
|
data = stac_client.load_data(items, bbox)
|
|
|
|
# Step 3: Compute features
|
|
print("🧮 Computing vegetation indices...")
|
|
indices = compute_indices(data)
|
|
ndvi_peak, evi_peak, savi_peak = compute_seasonal_peaks(indices)
|
|
|
|
# Stack features for model
|
|
feature_stack = np.stack([
|
|
ndvi_peak.values,
|
|
evi_peak.values,
|
|
savi_peak.values,
|
|
], axis=-1)
|
|
|
|
# Handle NaN values
|
|
feature_stack = np.nan_to_num(feature_stack, nan=0.0)
|
|
|
|
# Step 4: Load DW baseline
|
|
print("🗺️ Loading Dynamic World baseline...")
|
|
dw_path = cfg.storage.download_dw_baseline(int(year), bbox)
|
|
dw_arr, dw_profile = clip_raster_to_aoi(dw_path, aoi)
|
|
|
|
# Step 5: Load ML model
|
|
print("🤖 Loading ML model...")
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
model_dir = Path(tmpdir)
|
|
cfg.storage.download_model_bundle(model_name, model_dir)
|
|
|
|
model = joblib.load(model_dir / "model.joblib")
|
|
scaler = joblib.load(model_dir / "scaler.joblib") if (model_dir / "scaler.joblib").exists() else None
|
|
|
|
with open(model_dir / "selected_features.json") as f:
|
|
feature_names = json.load(f)
|
|
|
|
# Scale features
|
|
if scaler:
|
|
X = scaler.transform(feature_stack.reshape(-1, len(feature_names)))
|
|
else:
|
|
X = feature_stack.reshape(-1, len(feature_names))
|
|
|
|
# Run inference
|
|
print("⚙️ Running crop classification...")
|
|
predictions = model.predict(X)
|
|
predictions = predictions.reshape(feature_stack.shape[:2])
|
|
|
|
# Step 6: Apply smoothing
|
|
if cfg.smoothing_enabled:
|
|
print("🧼 Applying neighborhood smoothing...")
|
|
predictions = majority_filter(predictions, cfg.smoothing_kernel)
|
|
|
|
# Step 7: Export COG
|
|
print("💾 Exporting results...")
|
|
output_path = Path(tmpdir) / "refined.tif"
|
|
|
|
profile = dw_profile.copy()
|
|
profile.update({
|
|
"driver": "COG",
|
|
"compress": "DEFLATE",
|
|
"predictor": 2,
|
|
})
|
|
|
|
import rasterio
|
|
with rasterio.open(output_path, "w", **profile) as dst:
|
|
dst.write(predictions, 1)
|
|
|
|
# Step 8: Upload to MinIO
|
|
print("☁️ Uploading to MinIO...")
|
|
s3_uri = cfg.storage.upload_result(output_path, job_id)
|
|
|
|
# Generate signed URL
|
|
download_url = cfg.storage.generate_presigned_url(
|
|
"geocrop-results",
|
|
f"jobs/{job_id}/refined.tif",
|
|
)
|
|
|
|
print("✅ Inference complete!")
|
|
|
|
return {
|
|
"status": "success",
|
|
"job_id": job_id,
|
|
"download_url": download_url,
|
|
"s3_uri": s3_uri,
|
|
"metadata": {
|
|
"year": year,
|
|
"season": "summer",
|
|
"model": model_name,
|
|
"aoi": {"lat": lat, "lon": lon, "radius_km": radius_km},
|
|
"features_used": feature_names,
|
|
}
|
|
}
|
|
|
|
|
|
# Worker entry point
|
|
if __name__ == "__main__":
|
|
print("🎧 Starting GeoCrop Worker with real inference pipeline...")
|
|
worker_queue = Queue("geocrop_tasks", connection=redis_conn)
|
|
worker = Worker([worker_queue], connection=redis_conn)
|
|
worker.work()
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Dependencies Required
|
|
|
|
Add to `apps/worker/requirements.txt`:
|
|
|
|
```
|
|
# STAC and raster processing
|
|
pystac-client>=0.7.0
|
|
stackstac>=0.4.0
|
|
rasterio>=1.3.0
|
|
rioxarray>=0.14.0
|
|
|
|
# AWS/MinIO
|
|
boto3>=1.28.0
|
|
|
|
# Array computing
|
|
numpy>=1.24.0
|
|
xarray>=2023.1.0
|
|
|
|
# ML
|
|
scikit-learn>=1.3.0
|
|
joblib>=1.3.0
|
|
|
|
# Progress tracking
|
|
tqdm>=4.65.0
|
|
```
|
|
|
|
---
|
|
|
|
## 5. File Changes Summary
|
|
|
|
| File | Action | Description |
|
|
|------|--------|-------------|
|
|
| `apps/worker/requirements.txt` | Update | Add STAC/raster dependencies |
|
|
| `apps/worker/stac_client.py` | Create | DEA STAC API client |
|
|
| `apps/worker/feature_computation.py` | Create | Index computation functions |
|
|
| `apps/worker/storage.py` | Create | MinIO storage adapter |
|
|
| `apps/worker/config.py` | Update | Add MinIOStorage class |
|
|
| `apps/worker/features.py` | Update | Implement STAC feature loading |
|
|
| `apps/worker/worker.py` | Update | Replace mock with real pipeline |
|
|
| `apps/worker/Dockerfile` | Update | Install dependencies |
|
|
|
|
---
|
|
|
|
## 6. Error Handling
|
|
|
|
### 6.1 STAC Failures
|
|
|
|
- **No scenes found**: Return user-friendly error explaining date range issue
|
|
- **STAC timeout**: Retry 3 times with exponential backoff
|
|
- **Partial scene failure**: Skip scene, continue with remaining
|
|
|
|
### 6.2 Model Errors
|
|
|
|
- **Missing model files**: Log error, return failure status
|
|
- **Feature mismatch**: Validate features against expected list, pad/truncate as needed
|
|
|
|
### 6.3 MinIO Errors
|
|
|
|
- **Upload failure**: Retry 3 times, then return error with local temp path
|
|
- **Download failure**: Retry with fresh signed URL
|
|
|
|
---
|
|
|
|
## 7. Testing Strategy
|
|
|
|
### 7.1 Unit Tests
|
|
|
|
- `test_stac_client.py`: Mock STAC responses, test search/load
|
|
- `test_features.py`: Compute indices on synthetic data
|
|
- `test_smoothing.py`: Verify majority filter on known arrays
|
|
|
|
### 7.2 Integration Tests
|
|
|
|
- Test against real DEA STAC (use small AOI)
|
|
- Test MinIO upload/download roundtrip
|
|
- Test end-to-end with known AOI and expected output
|
|
|
|
---
|
|
|
|
## 8. Implementation Checklist
|
|
|
|
- [ ] Update `requirements.txt` with STAC dependencies
|
|
- [ ] Create `stac_client.py` with DEA STAC client
|
|
- [ ] Create `feature_computation.py` with index functions
|
|
- [ ] Create `storage.py` with MinIO adapter
|
|
- [ ] Update `config.py` to use MinIOStorage
|
|
- [ ] Update `features.py` to load from STAC
|
|
- [ ] Update `worker.py` with full pipeline
|
|
- [ ] Update `Dockerfile` for new dependencies
|
|
- [ ] Test locally with mock STAC
|
|
- [ ] Test with real DEA STAC (small AOI)
|
|
- [ ] Verify MinIO upload/download
|
|
|
|
---
|
|
|
|
## 12. Acceptance Criteria
|
|
|
|
- [ ] Given AOI+year, worker produces refined COG in MinIO under results/<job_id>/refined.tif
|
|
- [ ] API can return a signed URL for download
|
|
- [ ] Worker rejects AOI outside Zimbabwe or >5km
|
|
|
|
## 13. Technical Notes
|
|
|
|
### 13.1 Season Window (Critical)
|
|
|
|
Per AGENTS.md: Use `InferenceConfig.season_dates(year, "summer")` which returns Sept 1 to May 31 of following year.
|
|
|
|
### 13.2 AOI Format (Critical)
|
|
|
|
Per training/features.py: AOI is `(lon, lat, radius_m)` NOT `(lat, lon, radius)`.
|
|
|
|
### 13.3 DW Baseline Object Path
|
|
|
|
Per Plan 00: Object key format is `dw/zim/summer/<season>/highest_conf/DW_Zim_HighestConf_<year>_<year+1>.tif`
|
|
|
|
### 13.4 Feature Names
|
|
|
|
Per training/features.py: Currently `["ndvi_peak", "evi_peak", "savi_peak"]`
|
|
|
|
### 13.5 Smoothing Kernel
|
|
|
|
Per training/features.py: Must be odd (3, 5, 7) - default is 5
|
|
|
|
### 13.6 Model Artifacts
|
|
|
|
Expected files in MinIO:
|
|
- `model.joblib` - Trained ensemble model
|
|
- `label_encoder.joblib` - Class label encoder
|
|
- `scaler.joblib` (optional) - Feature scaler
|
|
- `selected_features.json` - List of feature names used
|
|
|
|
---
|
|
|
|
## 14. Next Steps
|
|
|
|
After implementation approval:
|
|
|
|
1. Add dependencies to requirements.txt
|
|
2. Implement STAC client
|
|
3. Implement feature computation
|
|
4. Implement MinIO storage adapter
|
|
5. Update worker with full pipeline
|
|
6. Build and deploy new worker image
|
|
7. Test with real data
|