# CI/CD End-to-End Validation Test from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel, EmailStr from datetime import datetime, timedelta import jwt import hashlib import json from passlib.context import CryptContext from redis import Redis from rq import Queue from rq.job import Job import os from typing import List, Optional # --- Configuration --- SECRET_KEY = os.getenv("SECRET_KEY", "your-super-secret-portfolio-key-change-this") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # Redis Connection REDIS_HOST = os.getenv("REDIS_HOST", "redis.geocrop.svc.cluster.local") redis_conn = Redis(host=REDIS_HOST, port=6379) task_queue = Queue('geocrop_tasks', connection=redis_conn) IDEMPOTENCY_TTL = 86400 * 7 # 7 days def generate_idempotency_key(lat: float, lon: float, radius_km: float, year: str, model_name: str, season: str = "summer") -> str: """Generate deterministic key for job deduplication. Uses SHA256 of normalized parameters: (lon, lat, radius_m, year, model, season). This ensures same AOI+params always produce the same key. """ normalized = f"{lat:.6f}:{lon:.6f}:{radius_km:.3f}:{year}:{model_name}:{season}" return hashlib.sha256(normalized.encode()).hexdigest()[:32] def check_existing_job(idem_key: str) -> Optional[str]: """Check if a job with this idempotency key exists and is not failed. Returns job_id if exists and is in progress/complete, None otherwise. """ key = f"idem:{idem_key}:job_id" job_id = redis_conn.get(key) if job_id: job_id = job_id.decode('utf-8') if isinstance(job_id, bytes) else job_id try: job = Job.fetch(job_id, connection=redis_conn) if job.is_finished: return job_id if job.is_queued or job.is_started: return job_id # Failed job - allow resubmission except Exception: pass return None def register_job_idempotency(idem_key: str, job_id: str) -> None: """Register job_id for idempotency key with 7-day TTL.""" key = f"idem:{idem_key}:job_id" redis_conn.set(key, job_id, ex=IDEMPOTENCY_TTL) def check_cached_result(idem_key: str) -> Optional[dict]: """Check if a completed result exists in cache. Returns result dict if exists, None otherwise. """ key = f"idem:{idem_key}:result" cached = redis_conn.get(key) if cached: try: return json.loads(cached.decode('utf-8')) except Exception: pass return None def cache_result(idem_key: str, result: dict, ttl: int = 86400) -> None: """Cache successful result for duplicate requests.""" key = f"idem:{idem_key}:result" redis_conn.set(key, json.dumps(result), ex=ttl) from fastapi.middleware.cors import CORSMiddleware app = FastAPI(title="GeoCrop API", version="1.1") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["https://portfolio.techarvest.co.zw", "http://localhost:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") # In-memory DB USERS = { "fchinembiri24@gmail.com": { "email": "fchinembiri24@gmail.com", "hashed_password": "$2b$12$iyR6fFeQAd2CfCDm/CdTSeB8CIjJhAHjA6Et7/UMWm0i0nIAFu21W", "is_active": True, "is_admin": True, "login_count": 0, "login_limit": 9999 } } class UserCreate(BaseModel): email: EmailStr password: str login_limit: int = 3 class UserResponse(BaseModel): email: EmailStr is_active: bool is_admin: bool login_count: int login_limit: int class Token(BaseModel): access_token: str token_type: str is_admin: bool class InferenceJobRequest(BaseModel): lat: float lon: float radius_km: float year: str model_name: str def create_access_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if email is None or email not in USERS: raise HTTPException(status_code=401, detail="Invalid credentials") return USERS[email] except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid credentials") async def get_admin_user(current_user: dict = Depends(get_current_user)): if not current_user.get("is_admin"): raise HTTPException(status_code=403, detail="Admin privileges required") return current_user @app.post("/auth/login", response_model=Token, tags=["Authentication"]) async def login(form_data: OAuth2PasswordRequestForm = Depends()): username = form_data.username.strip() password = form_data.password.strip() # Check Admin Bypass if username == "fchinembiri24@gmail.com" and password == "P@55w0rd.123": user = USERS["fchinembiri24@gmail.com"] user["login_count"] += 1 access_token = create_access_token( data={"sub": user["email"]}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) return {"access_token": access_token, "token_type": "bearer", "is_admin": True} user = USERS.get(username) if not user or not pwd_context.verify(password, user["hashed_password"]): raise HTTPException(status_code=401, detail="Incorrect email or password") if user["login_count"] >= user.get("login_limit", 3): raise HTTPException(status_code=403, detail=f"Login limit reached.") user["login_count"] += 1 access_token = create_access_token( data={"sub": user["email"]}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) return {"access_token": access_token, "token_type": "bearer", "is_admin": user.get("is_admin", False)} @app.get("/admin/users", response_model=List[UserResponse], tags=["Admin"]) async def list_users(admin: dict = Depends(get_admin_user)): return [ { "email": u["email"], "is_active": u["is_active"], "is_admin": u.get("is_admin", False), "login_count": u.get("login_count", 0), "login_limit": u.get("login_limit", 3) } for u in USERS.values() ] @app.post("/admin/users", response_model=UserResponse, tags=["Admin"]) async def create_user(user_in: UserCreate, admin: dict = Depends(get_admin_user)): if user_in.email in USERS: raise HTTPException(status_code=400, detail="User already exists") USERS[user_in.email] = { "email": user_in.email, "hashed_password": pwd_context.hash(user_in.password), "is_active": True, "is_admin": False, "login_count": 0, "login_limit": user_in.login_limit } return { "email": user_in.email, "is_active": True, "is_admin": False, "login_count": 0, "login_limit": user_in.login_limit } @app.post("/jobs", tags=["Inference"]) async def create_inference_job(job_req: InferenceJobRequest, current_user: dict = Depends(get_current_user)): if job_req.radius_km > 5.0: raise HTTPException(status_code=400, detail="Radius exceeds 5km limit.") idem_key = generate_idempotency_key( lat=job_req.lat, lon=job_req.lon, radius_km=job_req.radius_km, year=job_req.year, model_name=job_req.model_name, season="summer" ) existing_job_id = check_existing_job(idem_key) if existing_job_id: # Return 200 OK with the existing job_id instead of 409 return { "job_id": existing_job_id, "status": "already_exists", "idempotency_key": idem_key, "message": "Job already exists for these parameters. Returning existing job ID." } cached = check_cached_result(idem_key) if cached: cached["job_id"] = cached.get("job_id", "cached") cached["status"] = "cached" cached["cached"] = True return cached job = task_queue.enqueue( 'worker.run_inference', job_req.model_dump(), job_timeout='25m', result_ttl=86400, failure_ttl=86400 ) register_job_idempotency(idem_key, job.id) return {"job_id": job.id, "status": "queued", "idempotency_key": idem_key} @app.get("/jobs/{job_id}", tags=["Inference"]) async def get_job_status(job_id: str, current_user: dict = Depends(get_current_user)): try: job = Job.fetch(job_id, connection=redis_conn) except Exception: raise HTTPException(status_code=404, detail="Job not found") # Try to get detailed status from custom Redis key detailed_status = None try: status_bytes = redis_conn.get(f"job:{job_id}:status") if status_bytes: import json detailed_status = json.loads(status_bytes.decode('utf-8')) except Exception as e: print(f"Error fetching detailed status: {e}") # Extract ROI from job args roi = None if job.args and len(job.args) > 0: args = job.args[0] if isinstance(args, dict): roi = { "lat": args.get("lat"), "lon": args.get("lon"), "radius_m": int(float(args.get("radius_km", 0)) * 1000) if "radius_km" in args else args.get("radius_m") } if job.is_finished: result = job.result # If detailed status has outputs, prefer those if detailed_status and "outputs" in detailed_status: result = detailed_status["outputs"] return { "job_id": job.id, "status": "finished", "result": result, "detailed": detailed_status, "roi": roi } elif job.is_failed: return { "job_id": job.id, "status": "failed", "error": detailed_status.get("error") if detailed_status else None, "roi": roi } else: status = job.get_status() # If we have detailed status, use its status/stage/progress response = { "job_id": job.id, "status": status, "roi": roi } if detailed_status: response.update({ "worker_status": detailed_status.get("status"), "stage": detailed_status.get("stage"), "progress": detailed_status.get("progress"), "message": detailed_status.get("message"), }) # Include intermediate outputs (e.g., dw_baseline_url) even if job not finished yet if "outputs" in detailed_status: response["outputs"] = detailed_status["outputs"] return response