From 84779a0527c00365f9ca99cbff3515713a7db86e Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Mon, 2 Mar 2026 09:21:46 +0000 Subject: [PATCH] add tests --- tests/test_api.py | 167 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 tests/test_api.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..bdf6307 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,167 @@ +""" +Tori API integration tests. + +Usage: + TORI_URL=https://tori.oci.euphon.net pytest tests/test_api.py -v + +Runs against a live deployment. Tests are ordered: project → workflow → feedback. +""" +import time + +import httpx +import pytest + +API = "https://tori.oci.euphon.net/api" +TIMEOUT = httpx.Timeout(30.0) + + +@pytest.fixture(scope="module") +def client(): + with httpx.Client(base_url=API, timeout=TIMEOUT) as c: + yield c + + +@pytest.fixture(scope="module") +def project(client: httpx.Client): + r = client.post("/projects", json={"name": "pytest-smoke", "description": "auto test"}) + assert r.status_code == 200 + p = r.json() + assert p["id"] + yield p + # cleanup + client.delete(f"/projects/{p['id']}") + + +@pytest.fixture(scope="module") +def workflow(client: httpx.Client, project: dict): + r = client.post( + f"/projects/{project['id']}/workflows", + json={"requirement": "Create a file called hello.txt with content 'hello world'"}, + ) + assert r.status_code == 200 + wf = r.json() + assert wf["id"] + assert wf["status"] in ("pending", "planning", "executing") + return wf + + +def poll_workflow_done(client: httpx.Client, wf_id: str, timeout_secs: int = 120) -> dict: + """Poll until workflow reaches a terminal status.""" + deadline = time.time() + timeout_secs + while time.time() < deadline: + r = client.get(f"/workflows/{wf_id}/steps") + assert r.status_code == 200 + # Check workflow status by fetching it indirectly through steps + # Actually we need a direct workflow endpoint — use the list endpoint + time.sleep(3) + # We don't have a direct GET /workflows/:id, so check via project workflows list + # For now just wait and check execution log grows + steps = r.json() + if steps and any(s.get("status") == "done" for s in steps): + return {"steps": steps, "status": "has_done_steps"} + pytest.fail(f"Workflow {wf_id} did not produce done steps within {timeout_secs}s") + + +class TestProjectCRUD: + def test_list_projects(self, client: httpx.Client): + r = client.get("/projects") + assert r.status_code == 200 + assert isinstance(r.json(), list) + + def test_create_project(self, project: dict): + assert project["name"] == "pytest-smoke" + + def test_get_project(self, client: httpx.Client, project: dict): + r = client.get(f"/projects/{project['id']}") + assert r.status_code == 200 + assert r.json()["id"] == project["id"] + + def test_update_project(self, client: httpx.Client, project: dict): + r = client.put(f"/projects/{project['id']}", json={"name": "pytest-renamed"}) + assert r.status_code == 200 + assert r.json()["name"] == "pytest-renamed" + + +class TestWorkflowExecution: + def test_workflow_created(self, workflow: dict): + assert workflow["requirement"] == "Create a file called hello.txt with content 'hello world'" + + def test_execution_log_populates(self, client: httpx.Client, workflow: dict): + """Wait for agent to start producing execution log entries.""" + deadline = time.time() + 90 + entries = [] + while time.time() < deadline: + r = client.get(f"/workflows/{workflow['id']}/steps") + assert r.status_code == 200 + entries = r.json() + if len(entries) >= 1: + break + time.sleep(3) + assert len(entries) >= 1, "No execution log entries within timeout" + # Verify entry structure + e = entries[0] + assert "id" in e + assert "tool_name" in e + assert "status" in e + + def test_workflow_completes(self, client: httpx.Client, project: dict, workflow: dict): + """Wait for workflow to reach terminal status.""" + deadline = time.time() + 120 + status = "executing" + while time.time() < deadline: + r = client.get(f"/projects/{project['id']}/workflows") + assert r.status_code == 200 + wfs = r.json() + wf = next((w for w in wfs if w["id"] == workflow["id"]), None) + assert wf is not None + status = wf["status"] + if status in ("done", "failed"): + break + time.sleep(5) + assert status in ("done", "failed"), f"Workflow stuck in '{status}'" + + def test_execution_log_has_entries(self, client: httpx.Client, workflow: dict): + r = client.get(f"/workflows/{workflow['id']}/steps") + assert r.status_code == 200 + entries = r.json() + assert len(entries) >= 2, f"Expected multiple log entries, got {len(entries)}" + tool_names = {e["tool_name"] for e in entries} + # Should have at least used write_file for hello.txt + assert len(tool_names) >= 1 + + +class TestFeedback: + def test_submit_comment(self, client: httpx.Client, workflow: dict): + r = client.post( + f"/workflows/{workflow['id']}/comments", + json={"content": "Also add a goodbye.txt with 'goodbye world'"}, + ) + assert r.status_code == 200 + comment = r.json() + assert comment["content"] == "Also add a goodbye.txt with 'goodbye world'" + + def test_feedback_processed(self, client: httpx.Client, project: dict, workflow: dict): + """After comment, workflow should eventually reach a terminal status. + + The feedback LLM may or may not trigger re-execution (revise_plan). + If it decides the feedback is informational, it stays 'done' without re-executing. + Either outcome is valid — we just verify it doesn't get stuck. + """ + deadline = time.time() + 120 + final_status = None + while time.time() < deadline: + r = client.get(f"/projects/{project['id']}/workflows") + wfs = r.json() + wf = next((w for w in wfs if w["id"] == workflow["id"]), None) + if wf and wf["status"] in ("done", "failed"): + final_status = wf["status"] + break + time.sleep(3) + assert final_status in ("done", "failed"), f"Workflow stuck in '{final_status}' after feedback" + + def test_comments_persisted(self, client: httpx.Client, workflow: dict): + r = client.get(f"/workflows/{workflow['id']}/comments") + assert r.status_code == 200 + comments = r.json() + assert len(comments) >= 1 + assert any("goodbye" in c["content"] for c in comments)