geocrop-platform./test_async_inference.py

112 lines
3.7 KiB
Python

import requests
import time
import json
import sys
# API Configuration
API_URL = "http://localhost:8088"
# Admin user credentials from main.py
LOGIN_DATA = {
"username": "fchinembiri24@gmail.com",
"password": "P@55w0rd.123"
}
def test_inference():
print("=== GeoCrop Async Inference End-to-End Test ===")
# 1. Login to get token
print("\n1. Logging in...")
try:
response = requests.post(f"{API_URL}/auth/login", data=LOGIN_DATA)
response.raise_for_status()
token = response.json()["access_token"]
print("✓ Login successful")
except Exception as e:
print(f"✗ Login failed: {e}")
sys.exit(1)
headers = {"Authorization": f"Bearer {token}"}
# 2. Submit Inference Job
# Coordinates in Zimbabwe (Agricultural area near Mazowe)
payload = {
"lat": -17.51,
"lon": 30.91,
"radius_km": 1.0,
"year": "2022",
"model_name": "Hybrid_SpatioTemporal"
}
print(f"\n2. Submitting inference job for AOI ({payload['lat']}, {payload['lon']})...")
try:
response = requests.post(f"{API_URL}/jobs", json=payload, headers=headers)
response.raise_for_status()
job_data = response.json()
job_id = job_data["job_id"]
status = job_data["status"]
print(f"✓ Job submitted. ID: {job_id}, Initial Status: {status}")
except Exception as e:
print(f"✗ Job submission failed: {e}")
sys.exit(1)
# 3. Poll for Status
print("\n3. Polling for job status (every 2s)...")
last_status = None
last_stage = None
start_time = time.time()
timeout = 600 # 10 minutes
while time.time() - start_time < timeout:
try:
response = requests.get(f"{API_URL}/jobs/{job_id}", headers=headers)
response.raise_for_status()
data = response.json()
current_status = data.get("status")
worker_status = data.get("worker_status")
stage = data.get("stage")
progress = data.get("progress", 0)
message = data.get("message", "")
status_str = f"Status: {current_status}"
if worker_status: status_str += f" | Worker: {worker_status}"
if stage: status_str += f" | Stage: {stage}"
if progress: status_str += f" | Progress: {progress}%"
if status_str != last_status:
print(f"[{time.strftime('%H:%M:%S')}] {status_str}")
if message: print(f" Message: {message}")
last_status = status_str
if current_status == "finished":
print("\n✓ Inference Job Completed Successfully!")
print("\n=== Final Results ===")
result = data.get("result")
print(json.dumps(result, indent=2))
# Print specific LULC statistics if available in detailed status
detailed = data.get("detailed")
if detailed and "outputs" in detailed:
print("\nOutput Artifacts:")
for k, v in detailed["outputs"].items():
print(f" - {k}: {v[:80]}...")
return
if current_status == "failed":
print("\n✗ Job Failed!")
print(f"Error: {data.get('error')}")
sys.exit(1)
time.sleep(2)
except Exception as e:
print(f"Polling error: {e}")
time.sleep(5)
print("\n✗ Test timed out after 10 minutes.")
sys.exit(1)
if __name__ == "__main__":
test_inference()