From 86c575330dc70cb31d12cc55d937e7a45c339d82 Mon Sep 17 00:00:00 2001 From: fchinembiri Date: Mon, 4 May 2026 22:28:58 +0200 Subject: [PATCH] feat: update GitOps workflow and gitea-runner for Docker builds - Updated workflow to use parallel jobs for worker, api, web builds - Added GitHub Actions cache for faster builds - Fixed Gitea runner to properly support Docker-in-Docker with TLS - Worker fixes: STAC S3 URL patching, MinIO internal endpoint, model path - storage.py: Fixed endpoint, credentials, and secure=false --- .gitea/workflows/build-push.yaml | 69 +++++++++++++++++---- apps/worker/config.py | 100 +++++++++++++++++++++++-------- apps/worker/features.py | 29 +++++++-- apps/worker/hybrid_inference.py | 15 ++++- apps/worker/stac_client.py | 20 +++---- apps/worker/worker.py | 4 +- k8s/base/gitea-runner.yaml | 26 +++++--- 7 files changed, 196 insertions(+), 67 deletions(-) diff --git a/.gitea/workflows/build-push.yaml b/.gitea/workflows/build-push.yaml index 6d36aeb..2db3c75 100644 --- a/.gitea/workflows/build-push.yaml +++ b/.gitea/workflows/build-push.yaml @@ -1,44 +1,89 @@ name: Build and Push Docker Images + on: push: branches: - main paths: - - 'apps/**' + - 'apps/worker/**' + - 'apps/api/**' + - 'apps/web/**' jobs: - build-and-push: + build-worker: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to Docker Hub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: - username: frankchine + username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push Worker Image - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./apps/worker push: true - tags: frankchine/geocrop-worker:latest, frankchine/geocrop-worker:${{ github.sha }} + tags: | + frankchine/geocrop-worker:latest + frankchine/geocrop-worker:${{ github.sha }} + cache-from: type=gha + cache-to: type=gha,mode=max + + build-api: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push API Image - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./apps/api push: true - tags: frankchine/geocrop-api:latest, frankchine/geocrop-api:${{ github.sha }} + tags: | + frankchine/geocrop-api:latest + frankchine/geocrop-api:${{ github.sha }} + cache-from: type=gha + cache-to: type=gha,mode=max + + build-web: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push Web Image - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./apps/web push: true - tags: frankchine/geocrop-web:latest, frankchine/geocrop-web:${{ github.sha }} + tags: | + frankchine/geocrop-web:latest + frankchine/geocrop-web:${{ github.sha }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/apps/worker/config.py b/apps/worker/config.py index 660bced..cdca65d 100644 --- a/apps/worker/config.py +++ b/apps/worker/config.py @@ -139,13 +139,19 @@ class MinIOStorage(StorageAdapter): def __init__( self, - endpoint: str = "minio.geocrop.svc.cluster.local:9000", + endpoint: str = None, access_key: str = None, secret_key: str = None, bucket_models: str = "geocrop-models", bucket_baselines: str = "geocrop-baselines", bucket_results: str = "geocrop-results", ): + # Default to internal service if not provided + if endpoint is None: + host = os.getenv("MINIO_SERVICE_HOST", "minio.geocrop.svc.cluster.local") + port = os.getenv("MINIO_SERVICE_PORT", "9000") + endpoint = f"{host}:{port}" + self.endpoint = endpoint self.access_key = access_key or os.getenv("MINIO_ACCESS_KEY", "minioadmin") self.secret_key = secret_key or os.getenv("MINIO_SECRET_KEY", "minioadmin") @@ -163,12 +169,20 @@ class MinIOStorage(StorageAdapter): import boto3 from botocore.config import Config + scheme = "https" if ".techarvest.co.zw" in self.endpoint else "http" + url = f"{scheme}://{self.endpoint}" + if "://" in self.endpoint: + url = self.endpoint + self._s3_client = boto3.client( "s3", - endpoint_url=f"http://{self.endpoint}", + endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, - config=Config(signature_version="s3v4"), + config=Config( + signature_version="s3v4", + retries={"max_attempts": 3, "mode": "standard"} + ), region_name="us-east-1", ) return self._s3_client @@ -177,25 +191,46 @@ class MinIOStorage(StorageAdapter): """Download model files from geocrop-models bucket. Args: - model_key: Full key including prefix (e.g., "models/Zimbabwe_Ensemble_Raw_Model.pkl") + model_key: Full key or prefix (e.g., "models/Zimbabwe_Ensemble_Raw_Model.pkl" or "models/v1/") dest_dir: Local directory to save files """ dest_dir = Path(dest_dir) dest_dir.mkdir(parents=True, exist_ok=True) - # Extract filename from key - filename = Path(model_key).name - local_path = dest_dir / filename - try: - print(f" Downloading s3://{self.bucket_models}/{model_key} -> {local_path}") - self.s3.download_file( - self.bucket_models, - model_key, - str(local_path) - ) + # Check if it's a single file or a prefix + response = self.s3.list_objects_v2(Bucket=self.bucket_models, Prefix=model_key) + + if 'Contents' not in response: + raise FileNotFoundError(f"No objects found with prefix/key {model_key} in {self.bucket_models}") + + # If it's a single file and the key matches exactly + if len(response['Contents']) == 1 and response['Contents'][0]['Key'] == model_key: + filename = Path(model_key).name + # If inference.py expects 'model.pkl', we provide it + local_path = dest_dir / "model.pkl" if model_key.endswith(".pkl") else dest_dir / filename + print(f" Downloading single file s3://{self.bucket_models}/{model_key} -> {local_path}") + self.s3.download_file(self.bucket_models, model_key, str(local_path)) + else: + # It's a prefix, download all files within it + print(f" Downloading prefix s3://{self.bucket_models}/{model_key} to {dest_dir}") + for obj in response['Contents']: + key = obj['Key'] + if key.endswith("/"): continue # Skip "directories" + + # Get relative path from prefix + rel_path = os.path.relpath(key, model_key) + if rel_path == ".": + rel_path = Path(key).name + + target_path = dest_dir / rel_path + target_path.parent.mkdir(parents=True, exist_ok=True) + + print(f" -> {key} to {target_path}") + self.s3.download_file(self.bucket_models, key, str(target_path)) + except Exception as e: - raise FileNotFoundError(f"Failed to download model {model_key}: {e}") from e + raise FileNotFoundError(f"Failed to download model bundle {model_key}: {e}") from e def get_dw_local_path(self, year: int, season: str) -> str: """Get path to DW baseline COG for given year/season. @@ -207,18 +242,33 @@ class MinIOStorage(StorageAdapter): season: Season type ("summer") Returns: - VSI S3 path string (e.g., "s3://geocrop-baselines/DW_Zim_HighestConf_2021_2022-...") + VSI S3 path string (e.g., "/vsis3/geocrop-baselines/dw/zim/summer/DW_Zim_HighestConf_2021_2022-...") """ - # Format: DW_Zim_HighestConf_{year}_{year+1}.tif - # Note: The actual files may have tile suffixes like -0000000000-0000000000.tif - # We'll return a prefix that rasterio can handle with wildcard + # Prefix in MinIO + prefix = f"dw/zim/summer/DW_Zim_HighestConf_{year}_{year + 1}" - # For now, construct the base path - # In production, we might need to find the exact tiles - base_key = f"DW_Zim_HighestConf_{year}_{year + 1}" - - # Return VSI path for rasterio to handle - return f"s3://{self.bucket_baselines}/{base_key}" + try: + # List objects to find the actual tiles + response = self.s3.list_objects_v2(Bucket=self.bucket_baselines, Prefix=prefix) + + if 'Contents' not in response: + # Try alternative prefix without dw/zim/summer + prefix_alt = f"DW_Zim_HighestConf_{year}_{year + 1}" + response = self.s3.list_objects_v2(Bucket=self.bucket_baselines, Prefix=prefix_alt) + if 'Contents' not in response: + raise FileNotFoundError(f"No DW baseline tiles found for {year} {season} in {self.bucket_baselines}") + + # For now, just pick the first tile. + # In a real system, we should use a VRT or find the tile that covers the AOI. + # But for testing, the first tile often works if the AOI is near the origin. + key = response['Contents'][0]['Key'] + print(f" Found DW baseline tile: {key}") + + # Return /vsis3 path for rasterio + return f"/vsis3/{self.bucket_baselines}/{key}" + + except Exception as e: + raise FileNotFoundError(f"Failed to find DW baseline: {e}") from e def upload_result(self, local_path: Path, key: str) -> str: """Upload result file to geocrop-results bucket. diff --git a/apps/worker/features.py b/apps/worker/features.py index d0ddf9a..b03a760 100644 --- a/apps/worker/features.py +++ b/apps/worker/features.py @@ -437,7 +437,23 @@ def load_dw_baseline_window(cfg, year: int, season: str, aoi: AOI) -> Tuple[np.n You can implement a mapping in cfg.dw_key_for(year, season). """ local_path = cfg.storage.get_dw_local_path(year=year, season=season) - arr, profile = clip_raster_to_aoi(local_path, aoi) + + # Configure Rasterio Env for MinIO /vsis3 access if needed + import rasterio.env + + storage = cfg.storage + endpoint = storage.endpoint + # Ensure no http/https prefix in endpoint for GDAL + if "://" in endpoint: + endpoint = endpoint.split("://")[1] + + env_config = { + "GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR", + } + + print(f" Configuring Rasterio Env for {local_path}...") + with rasterio.env.Env(**env_config): + arr, profile = clip_raster_to_aoi(local_path, aoi) # Ensure a single band profile profile.update({"count": 1}) @@ -586,14 +602,15 @@ def build_feature_stack_from_dea( search = client.search( collections=["s2_l2a"], bbox=bbox, - datetime=f"{start_date}/{end_date}", - query={ - "eo:cloud_cover": {"lt": 30}, # Cloud filter - } + datetime=f"{start_date}/{end_date}" ) items = list(search.items()) - print(f" Found {len(items)} Sentinel-2 scenes") + + # Filter by cloud cover manually since query extension is deprecated/unsupported + items = [it for it in items if it.properties.get("eo:cloud_cover", 100) < 30] + + print(f" Found {len(items)} Sentinel-2 scenes (after cloud filtering)") if len(items) == 0: raise ValueError("No Sentinel-2 imagery available for the selected AOI and date range") diff --git a/apps/worker/hybrid_inference.py b/apps/worker/hybrid_inference.py index 312687d..5108cf9 100644 --- a/apps/worker/hybrid_inference.py +++ b/apps/worker/hybrid_inference.py @@ -118,6 +118,15 @@ class DEAfricaSTACWrapper: print(f"Connecting to Digital Earth Africa STAC Catalog at {stac_url}...") self.catalog = Client.open(stac_url) + @staticmethod + def _patch_s3_url(url: str) -> str: + if url.startswith("s3://deafrica-sentinel-2"): + return url.replace( + "s3://deafrica-sentinel-2", + "/vsicurl/https://deafrica-sentinel-2.s3.af-south-1.amazonaws.com" + ) + return url + def fetch_and_format_data(self, lat_range, lon_range, time_range, resolution=20): bbox = [lon_range[0], lat_range[0], lon_range[1], lat_range[1]] print(f"Searching STAC for Bounding Box: {bbox} over {time_range}...") @@ -132,7 +141,6 @@ class DEAfricaSTACWrapper: print(f"Found {len(items)} STAC items. Loading into xarray...") - # Mapping for DE Africa S2 bands band_map = { 'B04': 'red', 'B03': 'green', @@ -142,13 +150,16 @@ class DEAfricaSTACWrapper: 'SCL': 'scl' } + os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR" + ds = odc.stac.load( items, measurements=list(band_map.keys()), bbox=bbox, crs="EPSG:6933", resolution=resolution, - groupby="solar_day" + groupby="solar_day", + patch_url=self._patch_s3_url ) # Rename bands to expected names diff --git a/apps/worker/stac_client.py b/apps/worker/stac_client.py index 8682bd3..61c594a 100644 --- a/apps/worker/stac_client.py +++ b/apps/worker/stac_client.py @@ -186,26 +186,20 @@ class DEASTACClient: collections = [coll_id] def _search(): - # Build query - query_params = {} - - # Try cloud cover filter if DEA_CLOUD_MAX > 0 - if self.cloud_max > 0: - try: - # Try with eo:cloud_cover (DEA supports this) - query_params["eo:cloud_cover"] = {"lt": self.cloud_max} - except Exception as e: - logger.warning(f"Cloud filter not supported: {e}") - search = self.client.search( collections=collections, bbox=bbox, datetime=f"{start_date}/{end_date}", limit=limit, - query=query_params if query_params else None, ) - return list(search.items()) + items = list(search.items()) + + # Filter by cloud cover manually if requested + if self.cloud_max > 0: + items = [it for it in items if it.properties.get("eo:cloud_cover", 100) < self.cloud_max] + + return items return self._retry_operation(_search) diff --git a/apps/worker/worker.py b/apps/worker/worker.py index 4772ce9..0d23d42 100644 --- a/apps/worker/worker.py +++ b/apps/worker/worker.py @@ -328,8 +328,8 @@ def run_job(payload_dict: dict) -> dict: print(f"[{job_id}] Downloaded {artifact}") except Exception as e: try: - storage.download_file(storage.bucket_models, f"hybrid/{artifact}", model_dir / artifact) - print(f"[{job_id}] Downloaded {artifact} (from hybrid/ prefix)") + storage.download_file(storage.bucket_models, f"models/{artifact}", model_dir / artifact) + print(f"[{job_id}] Downloaded {artifact} (from models/ prefix)") except Exception as e2: raise FileNotFoundError( f"Required artifact {artifact} not found in {storage.bucket_models}: {e2}" diff --git a/k8s/base/gitea-runner.yaml b/k8s/base/gitea-runner.yaml index 97992d0..e3c0c6d 100644 --- a/k8s/base/gitea-runner.yaml +++ b/k8s/base/gitea-runner.yaml @@ -15,6 +15,9 @@ spec: labels: app: gitea-runner spec: + securityContext: + supplementalGroups: + - 999 hostNetwork: true dnsPolicy: ClusterFirstWithHostNet containers: @@ -30,12 +33,17 @@ spec: - name: CONFIG_FILE value: /config.yaml - name: DOCKER_HOST - value: unix:///var/run/docker.sock + value: tcp://localhost:2376 + - name: DOCKER_TLS_CERTDIR + value: /certs/client + securityContext: + privileged: true volumeMounts: - name: runner-data mountPath: /data - - name: docker-socket - mountPath: /var/run + - name: docker-certs + mountPath: /certs/client + readOnly: true - name: config mountPath: /config.yaml subPath: config.yaml @@ -45,18 +53,22 @@ spec: privileged: true env: - name: DOCKER_TLS_CERTDIR - value: "" + value: /certs/client + - name: DOCKER_DRIVER + value: overlay2 volumeMounts: + - name: runner-data + mountPath: /data - name: docker-graph-storage mountPath: /var/lib/docker - - name: docker-socket - mountPath: /var/run + - name: docker-certs + mountPath: /certs/client volumes: - name: runner-data emptyDir: {} - name: docker-graph-storage emptyDir: {} - - name: docker-socket + - name: docker-certs emptyDir: {} - name: config configMap: