From 182bf12e2d82393423ed75a48073bae4af650514 Mon Sep 17 00:00:00 2001 From: fchinembiri Date: Thu, 23 Apr 2026 22:25:26 +0200 Subject: [PATCH] Add reusable MinIO storage client for training --- training/storage_client.py | 84 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 training/storage_client.py diff --git a/training/storage_client.py b/training/storage_client.py new file mode 100644 index 0000000..1ed60c0 --- /dev/null +++ b/training/storage_client.py @@ -0,0 +1,84 @@ +import boto3 +import os +import logging +import pandas as pd +from botocore.exceptions import ClientError +from io import BytesIO + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class MinIOStorageClient: + """ + A professional client for interacting with GeoCrop MinIO storage. + Optimized for JupyterLab training workflows. + """ + def __init__(self): + # Configuration from Kubernetes Environment Variables + self.endpoint_url = os.environ.get('AWS_S3_ENDPOINT_URL', 'http://minio.geocrop.svc.cluster.local:9000') + self.access_key = os.environ.get('AWS_ACCESS_KEY_ID') + self.secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') + + self.s3_client = boto3.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + config=boto3.session.Config(signature_version='s3v4'), + verify=False # Required for internal self-signed K3s certs + ) + + def list_files(self, bucket_name: str, prefix: str = "") -> list: + """Lists metadata for all files in a bucket/folder.""" + try: + response = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + files = [] + if 'Contents' in response: + for obj in response['Contents']: + files.append({ + "key": obj['Key'], + "size_mb": round(obj['Size'] / (1024 * 1024), 2), + "last_modified": obj['LastModified'] + }) + return files + except ClientError as e: + logger.error(f"Failed to list {bucket_name}: {e}") + return [] + + def load_dataset(self, bucket_name: str, object_name: str) -> pd.DataFrame: + """ + Reads a CSV directly from MinIO into a Pandas DataFrame without saving to disk. + Best for training on large batches. + """ + try: + logger.info(f"Streaming s3://{bucket_name}/{object_name} into memory...") + obj = self.s3_client.get_object(Bucket=bucket_name, Key=object_name) + return pd.read_csv(BytesIO(obj['Body'].read())) + except Exception as e: + logger.error(f"Failed to load dataset {object_name}: {e}") + return pd.DataFrame() + + def download_file(self, bucket_name: str, object_name: str, local_path: str): + """Downloads a binary file (e.g., a TIFF baseline) to local storage.""" + try: + self.s3_client.download_file(bucket_name, object_name, local_path) + logger.info(f"Successfully downloaded to {local_path}") + except ClientError as e: + logger.error(f"Download failed: {e}") + + def upload_artifact(self, local_path: str, bucket_name: str, object_name: str): + """Uploads a local file (e.g., a trained model .pkl) back to MinIO.""" + try: + self.s3_client.upload_file(local_path, bucket_name, object_name) + logger.info(f"Artifact uploaded to s3://{bucket_name}/{object_name}") + except ClientError as e: + logger.error(f"Upload failed: {e}") + + def get_pandas_options(self): + """Returns a dict to use with pd.read_csv('s3://...') if preferred.""" + return { + "key": self.access_key, + "secret": self.secret_key, + "client_kwargs": {"endpoint_url": self.endpoint_url} + }