"""MinIO/S3 storage adapter for the worker. STEP 2: MinIO storage adapter with boto3, retry logic, and model filename mapping. This module provides: - Configuration from environment variables - boto3 S3 client with retry configuration - Methods for bucket/object operations - Model filename mapping with fallback logic """ from __future__ import annotations import os import time import logging from pathlib import Path from typing import List, Optional, Tuple # Configure logging logger = logging.getLogger(__name__) # ========================================== # Configuration # ========================================== # Environment variables with defaults MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio.geocrop.svc.cluster.local:9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin123") MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true" MINIO_REGION = os.getenv("MINIO_REGION", "us-east-1") MINIO_BUCKET_MODELS = os.getenv("MINIO_BUCKET_MODELS", "geocrop-models") MINIO_BUCKET_BASELINES = os.getenv("MINIO_BUCKET_BASELINES", "geocrop-baselines") MINIO_BUCKET_RESULTS = os.getenv("MINIO_BUCKET_RESULTS", "geocrop-results") # Model filename mapping # Maps job model names to MinIO object names MODEL_FILENAME_MAP = { "Ensemble": { "primary": "Zimbabwe_Ensemble_Raw_Model.pkl", "fallback": "Zimbabwe_Ensemble_Model.pkl", }, "Ensemble_Raw": { "primary": "Zimbabwe_Ensemble_Raw_Model.pkl", "fallback": None, }, "RandomForest": { "primary": "Zimbabwe_RandomForest_Raw_Model.pkl", "fallback": "Zimbabwe_RandomForest_Model.pkl", }, "XGBoost": { "primary": "Zimbabwe_XGBoost_Raw_Model.pkl", "fallback": "Zimbabwe_XGBoost_Model.pkl", }, "LightGBM": { "primary": "Zimbabwe_LightGBM_Raw_Model.pkl", "fallback": "Zimbabwe_LightGBM_Model.pkl", }, "CatBoost": { "primary": "Zimbabwe_CatBoost_Raw_Model.pkl", "fallback": "Zimbabwe_CatBoost_Model.pkl", }, } def get_model_filename(model_name: str) -> str: """Resolve model name to filename with fallback. Args: model_name: Model name from job payload (e.g., "Ensemble", "XGBoost") Returns: Filename to use (e.g., "Zimbabwe_Ensemble_Raw_Model.pkl") Raises: FileNotFoundError: If neither primary nor fallback exists """ mapping = MODEL_FILENAME_MAP.get(model_name, { "primary": f"Zimbabwe_{model_name}_Model.pkl", "fallback": f"Zimbabwe_{model_name}_Raw_Model.pkl", }) # Try primary first primary = mapping.get("primary") fallback = mapping.get("fallback") # If primary ends with just .pkl (dynamic mapping), try both if primary and not any(primary.endswith(v) for v in ["_Model.pkl", "_Raw_Model.pkl"]): # Dynamic case - try both patterns candidates = [ f"Zimbabwe_{model_name}_Model.pkl", f"Zimbabwe_{model_name}_Raw_Model.pkl", ] return candidates[0] # Return first, caller will handle missing return primary if primary else fallback # ========================================== # Storage Adapter Class # ========================================== class MinIOStorage: """MinIO/S3 storage adapter for worker. Provides methods for: - Bucket/object operations - Model file downloading - Result uploading - Presigned URL generation """ def __init__( self, endpoint: str = MINIO_ENDPOINT, access_key: str = MINIO_ACCESS_KEY, secret_key: str = MINIO_SECRET_KEY, secure: bool = MINIO_SECURE, region: str = MINIO_REGION, bucket_models: str = MINIO_BUCKET_MODELS, bucket_baselines: str = MINIO_BUCKET_BASELINES, bucket_results: str = MINIO_BUCKET_RESULTS, ): self.endpoint = endpoint self.access_key = access_key self.secret_key = secret_key self.secure = secure self.region = region self.bucket_models = bucket_models self.bucket_baselines = bucket_baselines self.bucket_results = bucket_results # Lazy-load boto3 self._client = None self._resource = None @property def client(self): """Lazy-load boto3 S3 client.""" if self._client is None: import boto3 from botocore.config import Config self._client = boto3.client( "s3", endpoint_url=f"{'https' if self.secure else 'http'}://{self.endpoint}", aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region, config=Config( signature_version="s3v4", s3={"addressing_style": "path"}, retries={"max_attempts": 3}, ), ) return self._client def ping(self) -> Tuple[bool, str]: """Ping MinIO to check connectivity. Returns: Tuple of (success: bool, message: str) """ try: self.client.head_bucket(Bucket=self.bucket_models) return True, f"Connected to MinIO at {self.endpoint}" except Exception as e: return False, f"Failed to connect to MinIO: {type(e).__name__}: {e}" def _retry_operation(self, operation, *args, max_retries: int = 3, **kwargs): """Execute operation with exponential backoff retry. Args: operation: Callable to execute *args: Positional args for operation max_retries: Maximum retry attempts **kwargs: Keyword args for operation Returns: Result of operation Raises: Last exception if all retries fail """ import botocore.exceptions as boto_exc last_exception = None for attempt in range(max_retries): try: return operation(*args, **kwargs) except ( boto_exc.ConnectionError, boto_exc.EndpointConnectionError, getattr(boto_exc, "ReadTimeout", Exception), boto_exc.ClientError, ) as e: last_exception = e if attempt < max_retries - 1: wait_time = 2 ** attempt # 1s, 2s, 4s logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s: {e}") time.sleep(wait_time) else: logger.error(f"All {max_retries} retries failed: {e}") raise last_exception def head_object(self, bucket: str, key: str) -> Optional[dict]: """Get object metadata without downloading.""" try: return self._retry_operation( self.client.head_object, Bucket=bucket, Key=key, ) except Exception as e: if hasattr(e, "response") and e.response.get("Error", {}).get("Code") == "404": return None raise def list_objects(self, bucket: str, prefix: str = "") -> List[str]: """List object keys in bucket with prefix. Args: bucket: Bucket name prefix: Key prefix to filter Returns: List of object keys """ keys = [] paginator = self.client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=prefix): if "Contents" in page: for obj in page["Contents"]: keys.append(obj["Key"]) return keys def download_file(self, bucket: str, key: str, dest_path: Path) -> Path: """Download file from MinIO. Args: bucket: Bucket name key: Object key dest_path: Local destination path Returns: Path to downloaded file """ dest_path = Path(dest_path) dest_path.parent.mkdir(parents=True, exist_ok=True) self._retry_operation( self.client.download_file, Bucket=bucket, Key=key, Filename=str(dest_path), ) return dest_path def download_model_file(self, model_name: str, dest_dir: Path) -> Path: """Download model file from geocrop-models bucket. Attempts to download primary filename, falls back to alternative if missing. Args: model_name: Model name (e.g., "Ensemble", "XGBoost") dest_dir: Local destination directory Returns: Path to downloaded model file Raises: FileNotFoundError: If model file not found """ dest_dir = Path(dest_dir) dest_dir.mkdir(parents=True, exist_ok=True) # Get filename mapping mapping = MODEL_FILENAME_MAP.get(model_name, { "primary": f"Zimbabwe_{model_name}_Model.pkl", "fallback": f"Zimbabwe_{model_name}_Raw_Model.pkl", }) # Try primary primary = mapping.get("primary") fallback = mapping.get("fallback") if primary: try: dest = dest_dir / primary self.download_file(self.bucket_models, primary, dest) logger.info(f"Downloaded model: {primary}") return dest except Exception as e: logger.warning(f"Primary model not found ({primary}): {e}") if fallback: try: dest = dest_dir / fallback self.download_file(self.bucket_models, fallback, dest) logger.info(f"Downloaded model (fallback): {fallback}") return dest except Exception as e2: logger.warning(f"Fallback model not found ({fallback}): {e2}") # Build error message with available options available = self.list_objects(self.bucket_models, prefix="Zimbabwe_") raise FileNotFoundError( f"Model '{model_name}' not found in {self.bucket_models}. " f"Available: {available[:10]}..." ) def upload_file( self, bucket: str, key: str, local_path: Path, content_type: Optional[str] = None, ) -> str: """Upload file to MinIO. Args: bucket: Bucket name key: Object key local_path: Local file path content_type: Optional content type Returns: S3 URI: s3://bucket/key """ local_path = Path(local_path) extra_args = {} if content_type: extra_args["ContentType"] = content_type self._retry_operation( self.client.upload_file, str(local_path), bucket, key, ExtraArgs=extra_args if extra_args else None, ) return f"s3://{bucket}/{key}" def upload_result( self, local_path: Path, key: str, ) -> str: """Upload result file to geocrop-results. Args: local_path: Local file path key: Object key (including results// prefix) Returns: S3 URI: s3://bucket/key """ return self.upload_file(self.bucket_results, key, local_path) def presign_get( self, bucket: str, key: str, expires: int = 3600, ) -> str: """Generate presigned URL for GET. Args: bucket: Bucket name key: Object key expires: Expiration in seconds Returns: Presigned URL """ return self._retry_operation( self.client.generate_presigned_url, "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires, ) # ========================================== # Self-Test # ========================================== if __name__ == "__main__": print("=== MinIO Storage Adapter Self-Test ===") print(f"Endpoint: {MINIO_ENDPOINT}") print(f"Bucket (models): {MINIO_BUCKET_MODELS}") print(f"Bucket (baselines): {MINIO_BUCKET_BASELINES}") print(f"Bucket (results): {MINIO_BUCKET_RESULTS}") print() # Create storage instance storage = MinIOStorage() # Test ping print("Testing ping...") success, msg = storage.ping() print(f" Ping: {'✓' if success else '✗'} - {msg}") if success: # List models print("\nListing models in geocrop-models...") try: models = storage.list_objects(MINIO_BUCKET_MODELS, prefix="Zimbabwe_") print(f" Found {len(models)} model files:") for m in models[:10]: print(f" - {m}") if len(models) > 10: print(f" ... and {len(models) - 10} more") except Exception as e: print(f" Error listing: {e}") # Test head_object on first model if models: print("\nTesting head_object on first model...") first_key = models[0] meta = storage.head_object(MINIO_BUCKET_MODELS, first_key) if meta: print(f" ✓ {first_key}: {meta.get('ContentLength', '?')} bytes") else: print(f" ✗ {first_key}: not found") print("\n=== Self-Test Complete ===")