diff --git a/README.md b/README.md new file mode 100644 index 0000000..c15ba42 --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# Sovereign MLOps Platform: GeoCrop LULC Portfolio + +Welcome to the **Sovereign MLOps Platform**, a comprehensive self-hosted environment on K3s designed for end-to-end Land Use / Land Cover (LULC) crop-mapping in Zimbabwe. + +This project showcases professional skills in **MLOps, Cloud-Native Architecture, Geospatial Analysis, and GitOps**. + +## πŸ—οΈ System Architecture + +The platform is built on a robust, self-hosted Kubernetes (K3s) cluster with a focus on data sovereignty and scalability. + +- **Source Control & CI/CD**: [Gitea](https://git.techarvest.co.zw) (Self-hosted GitHub alternative) +- **Infrastructure as Code**: Terraform (Managing K3s Namespaces & Quotas) +- **GitOps**: ArgoCD (Automated deployment from Git to Cluster) +- **Experiment Tracking**: [MLflow](https://ml.techarvest.co.zw) (Model versioning & metrics) +- **Interactive Workspace**: [JupyterLab](https://lab.techarvest.co.zw) (Data science & training) +- **Spatial Database**: Standalone PostgreSQL + PostGIS (Port 5433) +- **Object Storage**: MinIO (S3-compatible storage for datasets, baselines, and models) +- **Frontend**: React 19 + OpenLayers (Parallel loading of baselines and ML predictions) +- **Backend**: FastAPI + Redis Queue (Job orchestration) +- **Visualization**: TiTiler (Dynamic tile server for Cloud Optimized GeoTIFFs) + +## πŸ—ΊοΈ UX Data Flow: Parallel Loading Strategy + +To ensure a seamless user experience, the system implements a dual-loading strategy: +1. **Instant Context**: While waiting for ML inference, Dynamic World (DW) TIFF baselines (2015-2025) are immediately served from MinIO via TiTiler. +2. **Asynchronous Inference**: The ML worker processes heavy classification tasks in the background and overlays high-resolution predictions once complete. + +## πŸ› οΈ Training Workflow + +Training is performed in **JupyterLab** using a custom `MinIOStorageClient` that bridges the gap between object storage and in-memory data processing. + +### Using the MinIO Storage Client + +```python +from training.storage_client import MinIOStorageClient + +# Initialize client (uses environment variables automatically) +storage = MinIOStorageClient() + +# List available training batches +batches = storage.list_files('geocrop-datasets') + +# Load a batch directly into memory (No disk I/O) +df = storage.load_dataset('geocrop-datasets', 'batch_1.csv') + +# Train your model and upload the artifact +# ... training code ... +storage.upload_file('model.pkl', 'geocrop-models', 'Zimbabwe_Ensemble_Model.pkl') +``` + +## πŸš€ Deployment & GitOps + +The platform follows a strict **GitOps** workflow: +1. All changes are committed to the `geocrop-platform` repository on Gitea. +2. Gitea Actions build and push containers to Docker Hub (`frankchine`). +3. ArgoCD monitors the `k8s/base` directory and automatically synchronizes the cluster state. + +## πŸ–₯️ Service Registry + +- **Portfolio Frontend**: [portfolio.techarvest.co.zw](https://portfolio.techarvest.co.zw) +- **Source Control**: [git.techarvest.co.zw](https://git.techarvest.co.zw) +- **JupyterLab**: [lab.techarvest.co.zw](https://lab.techarvest.co.zw) +- **MLflow**: [ml.techarvest.co.zw](https://ml.techarvest.co.zw) +- **ArgoCD**: [cd.techarvest.co.zw](https://cd.techarvest.co.zw) +- **MinIO Console**: [console.minio.portfolio.techarvest.co.zw](https://console.minio.portfolio.techarvest.co.zw) + +--- +*Created and maintained by [fchinembiri](mailto:fchinembiri24@gmail.com).* diff --git a/training/storage_client.py b/training/storage_client.py index 1ed60c0..8e72901 100644 --- a/training/storage_client.py +++ b/training/storage_client.py @@ -2,83 +2,102 @@ import boto3 import os import logging import pandas as pd +import io from botocore.exceptions import ClientError -from io import BytesIO -# Configure logging +# Configure logging for the worker/training scripts logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class MinIOStorageClient: """ - A professional client for interacting with GeoCrop MinIO storage. - Optimized for JupyterLab training workflows. + A reusable client for interacting with the local MinIO storage. + Handles Datasets (CSVs), Baselines (DW TIFFs), and Model Artifacts. """ def __init__(self): - # Configuration from Kubernetes Environment Variables + # Initialize S3 client using environment variables + # Defaults to the internal Kubernetes DNS for MinIO if not provided self.endpoint_url = os.environ.get('AWS_S3_ENDPOINT_URL', 'http://minio.geocrop.svc.cluster.local:9000') - self.access_key = os.environ.get('AWS_ACCESS_KEY_ID') - self.secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') self.s3_client = boto3.client( 's3', endpoint_url=self.endpoint_url, - aws_access_key_id=self.access_key, - aws_secret_access_key=self.secret_key, + aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), config=boto3.session.Config(signature_version='s3v4'), - verify=False # Required for internal self-signed K3s certs + verify=False # MinIO often uses self-signed certs internally ) def list_files(self, bucket_name: str, prefix: str = "") -> list: - """Lists metadata for all files in a bucket/folder.""" + """Lists files in a specific bucket, optionally filtered by a prefix folder.""" try: response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) files = [] + + logger.info(f"Scanning bucket '{bucket_name}' with prefix '{prefix}'...") if 'Contents' in response: for obj in response['Contents']: - files.append({ - "key": obj['Key'], - "size_mb": round(obj['Size'] / (1024 * 1024), 2), - "last_modified": obj['LastModified'] - }) + size_mb = obj['Size'] / 1024 / 1024 + files.append({"key": obj['Key'], "size_mb": size_mb}) + logger.info(f" - Found: {obj['Key']} ({size_mb:.2f} MB)") + else: + logger.warning(f"No files found in bucket {bucket_name}.") + return files except ClientError as e: - logger.error(f"Failed to list {bucket_name}: {e}") + logger.error(f"Failed to list files in {bucket_name}: {e}") return [] + def download_file(self, bucket_name: str, object_name: str, download_path: str) -> bool: + """Downloads a file from MinIO to the local pod/container storage.""" + try: + logger.info(f"Downloading {object_name} from {bucket_name} to {download_path}...") + self.s3_client.download_file(bucket_name, object_name, download_path) + logger.info("Download complete.") + return True + except ClientError as e: + logger.error(f"Error downloading {object_name}: {e}") + return False + + def upload_file(self, file_path: str, bucket_name: str, object_name: str) -> bool: + """Uploads a local file (like a trained model or prediction COG) back to MinIO.""" + try: + logger.info(f"Uploading {file_path} to {bucket_name}/{object_name}...") + self.s3_client.upload_file(file_path, bucket_name, object_name) + logger.info("Upload complete.") + return True + except ClientError as e: + logger.error(f"Error uploading {file_path}: {e}") + return False + def load_dataset(self, bucket_name: str, object_name: str) -> pd.DataFrame: - """ - Reads a CSV directly from MinIO into a Pandas DataFrame without saving to disk. - Best for training on large batches. - """ + """Loads a CSV dataset directly from MinIO into a Pandas DataFrame in memory.""" try: - logger.info(f"Streaming s3://{bucket_name}/{object_name} into memory...") - obj = self.s3_client.get_object(Bucket=bucket_name, Key=object_name) - return pd.read_csv(BytesIO(obj['Body'].read())) + logger.info(f"Loading {object_name} from {bucket_name} into memory...") + response = self.s3_client.get_object(Bucket=bucket_name, Key=object_name) + df = pd.read_csv(io.BytesIO(response['Body'].read())) + logger.info(f"Successfully loaded dataset with shape: {df.shape}") + return df + except ClientError as e: + logger.error(f"Error loading {object_name}: {e}") + return None except Exception as e: - logger.error(f"Failed to load dataset {object_name}: {e}") - return pd.DataFrame() + logger.error(f"Error parsing {object_name} into DataFrame: {e}") + return None - def download_file(self, bucket_name: str, object_name: str, local_path: str): - """Downloads a binary file (e.g., a TIFF baseline) to local storage.""" - try: - self.s3_client.download_file(bucket_name, object_name, local_path) - logger.info(f"Successfully downloaded to {local_path}") - except ClientError as e: - logger.error(f"Download failed: {e}") - - def upload_artifact(self, local_path: str, bucket_name: str, object_name: str): - """Uploads a local file (e.g., a trained model .pkl) back to MinIO.""" - try: - self.s3_client.upload_file(local_path, bucket_name, object_name) - logger.info(f"Artifact uploaded to s3://{bucket_name}/{object_name}") - except ClientError as e: - logger.error(f"Upload failed: {e}") - - def get_pandas_options(self): - """Returns a dict to use with pd.read_csv('s3://...') if preferred.""" - return { - "key": self.access_key, - "secret": self.secret_key, - "client_kwargs": {"endpoint_url": self.endpoint_url} - } +# ========================================== +# Example Usage (For your Jupyter Notebooks) +# ========================================== +if __name__ == "__main__": + storage = MinIOStorageClient() + + # 1. List the Zimbabwe Augmented CSV batches + datasets_bucket = 'geocrop-datasets' + csv_files = storage.list_files(datasets_bucket) + + # 2. Load a batch directly into memory for training + if csv_files: + first_batch_key = csv_files[0]['key'] + df = storage.load_dataset(datasets_bucket, first_batch_key) + if df is not None: + print(df.info())