""" Tori API integration tests. Usage: TORI_URL=https://tori.oci.euphon.net pytest tests/test_api.py -v Runs against a live deployment. Tests are ordered: project → workflow → feedback. """ import time import httpx import pytest API = "https://tori.oci.euphon.net/api" TIMEOUT = httpx.Timeout(30.0) @pytest.fixture(scope="module") def client(): with httpx.Client(base_url=API, timeout=TIMEOUT) as c: yield c @pytest.fixture(scope="module") def project(client: httpx.Client): r = client.post("/projects", json={"name": "pytest-smoke", "description": "auto test"}) assert r.status_code == 200 p = r.json() assert p["id"] yield p # cleanup client.delete(f"/projects/{p['id']}") @pytest.fixture(scope="module") def workflow(client: httpx.Client, project: dict): r = client.post( f"/projects/{project['id']}/workflows", json={"requirement": "Create a file called hello.txt with content 'hello world'"}, ) assert r.status_code == 200 wf = r.json() assert wf["id"] assert wf["status"] in ("pending", "planning", "executing") return wf def poll_workflow_done(client: httpx.Client, wf_id: str, timeout_secs: int = 120) -> dict: """Poll until workflow reaches a terminal status.""" deadline = time.time() + timeout_secs while time.time() < deadline: r = client.get(f"/workflows/{wf_id}/steps") assert r.status_code == 200 # Check workflow status by fetching it indirectly through steps # Actually we need a direct workflow endpoint — use the list endpoint time.sleep(3) # We don't have a direct GET /workflows/:id, so check via project workflows list # For now just wait and check execution log grows steps = r.json() if steps and any(s.get("status") == "done" for s in steps): return {"steps": steps, "status": "has_done_steps"} pytest.fail(f"Workflow {wf_id} did not produce done steps within {timeout_secs}s") class TestProjectCRUD: def test_list_projects(self, client: httpx.Client): r = client.get("/projects") assert r.status_code == 200 assert isinstance(r.json(), list) def test_create_project(self, project: dict): assert project["name"] == "pytest-smoke" def test_get_project(self, client: httpx.Client, project: dict): r = client.get(f"/projects/{project['id']}") assert r.status_code == 200 assert r.json()["id"] == project["id"] def test_update_project(self, client: httpx.Client, project: dict): r = client.put(f"/projects/{project['id']}", json={"name": "pytest-renamed"}) assert r.status_code == 200 assert r.json()["name"] == "pytest-renamed" class TestWorkflowExecution: def test_workflow_created(self, workflow: dict): assert workflow["requirement"] == "Create a file called hello.txt with content 'hello world'" def test_execution_log_populates(self, client: httpx.Client, workflow: dict): """Wait for agent to start producing execution log entries.""" deadline = time.time() + 90 entries = [] while time.time() < deadline: r = client.get(f"/workflows/{workflow['id']}/steps") assert r.status_code == 200 entries = r.json() if len(entries) >= 1: break time.sleep(3) assert len(entries) >= 1, "No execution log entries within timeout" # Verify entry structure e = entries[0] assert "id" in e assert "tool_name" in e assert "status" in e def test_workflow_completes(self, client: httpx.Client, project: dict, workflow: dict): """Wait for workflow to reach terminal status.""" deadline = time.time() + 120 status = "executing" while time.time() < deadline: r = client.get(f"/projects/{project['id']}/workflows") assert r.status_code == 200 wfs = r.json() wf = next((w for w in wfs if w["id"] == workflow["id"]), None) assert wf is not None status = wf["status"] if status in ("done", "failed"): break time.sleep(5) assert status in ("done", "failed"), f"Workflow stuck in '{status}'" def test_execution_log_has_entries(self, client: httpx.Client, workflow: dict): r = client.get(f"/workflows/{workflow['id']}/steps") assert r.status_code == 200 entries = r.json() assert len(entries) >= 2, f"Expected multiple log entries, got {len(entries)}" tool_names = {e["tool_name"] for e in entries} # Should have at least used write_file for hello.txt assert len(tool_names) >= 1 class TestFeedback: def test_submit_comment(self, client: httpx.Client, workflow: dict): r = client.post( f"/workflows/{workflow['id']}/comments", json={"content": "Also add a goodbye.txt with 'goodbye world'"}, ) assert r.status_code == 200 comment = r.json() assert comment["content"] == "Also add a goodbye.txt with 'goodbye world'" def test_feedback_processed(self, client: httpx.Client, project: dict, workflow: dict): """After comment, workflow should eventually reach a terminal status. The feedback LLM may or may not trigger re-execution (revise_plan). If it decides the feedback is informational, it stays 'done' without re-executing. Either outcome is valid — we just verify it doesn't get stuck. """ deadline = time.time() + 120 final_status = None while time.time() < deadline: r = client.get(f"/projects/{project['id']}/workflows") wfs = r.json() wf = next((w for w in wfs if w["id"] == workflow["id"]), None) if wf and wf["status"] in ("done", "failed"): final_status = wf["status"] break time.sleep(3) assert final_status in ("done", "failed"), f"Workflow stuck in '{final_status}' after feedback" def test_comments_persisted(self, client: httpx.Client, workflow: dict): r = client.get(f"/workflows/{workflow['id']}/comments") assert r.status_code == 200 comments = r.json() assert len(comments) >= 1 assert any("goodbye" in c["content"] for c in comments)