add tests

This commit is contained in:
Fam Zheng 2026-03-02 09:21:46 +00:00
parent 728ab2e8fd
commit 84779a0527

167
tests/test_api.py Normal file
View File

@ -0,0 +1,167 @@
"""
Tori API integration tests.
Usage:
TORI_URL=https://tori.oci.euphon.net pytest tests/test_api.py -v
Runs against a live deployment. Tests are ordered: project workflow feedback.
"""
import time
import httpx
import pytest
API = "https://tori.oci.euphon.net/api"
TIMEOUT = httpx.Timeout(30.0)
@pytest.fixture(scope="module")
def client():
with httpx.Client(base_url=API, timeout=TIMEOUT) as c:
yield c
@pytest.fixture(scope="module")
def project(client: httpx.Client):
r = client.post("/projects", json={"name": "pytest-smoke", "description": "auto test"})
assert r.status_code == 200
p = r.json()
assert p["id"]
yield p
# cleanup
client.delete(f"/projects/{p['id']}")
@pytest.fixture(scope="module")
def workflow(client: httpx.Client, project: dict):
r = client.post(
f"/projects/{project['id']}/workflows",
json={"requirement": "Create a file called hello.txt with content 'hello world'"},
)
assert r.status_code == 200
wf = r.json()
assert wf["id"]
assert wf["status"] in ("pending", "planning", "executing")
return wf
def poll_workflow_done(client: httpx.Client, wf_id: str, timeout_secs: int = 120) -> dict:
"""Poll until workflow reaches a terminal status."""
deadline = time.time() + timeout_secs
while time.time() < deadline:
r = client.get(f"/workflows/{wf_id}/steps")
assert r.status_code == 200
# Check workflow status by fetching it indirectly through steps
# Actually we need a direct workflow endpoint — use the list endpoint
time.sleep(3)
# We don't have a direct GET /workflows/:id, so check via project workflows list
# For now just wait and check execution log grows
steps = r.json()
if steps and any(s.get("status") == "done" for s in steps):
return {"steps": steps, "status": "has_done_steps"}
pytest.fail(f"Workflow {wf_id} did not produce done steps within {timeout_secs}s")
class TestProjectCRUD:
def test_list_projects(self, client: httpx.Client):
r = client.get("/projects")
assert r.status_code == 200
assert isinstance(r.json(), list)
def test_create_project(self, project: dict):
assert project["name"] == "pytest-smoke"
def test_get_project(self, client: httpx.Client, project: dict):
r = client.get(f"/projects/{project['id']}")
assert r.status_code == 200
assert r.json()["id"] == project["id"]
def test_update_project(self, client: httpx.Client, project: dict):
r = client.put(f"/projects/{project['id']}", json={"name": "pytest-renamed"})
assert r.status_code == 200
assert r.json()["name"] == "pytest-renamed"
class TestWorkflowExecution:
def test_workflow_created(self, workflow: dict):
assert workflow["requirement"] == "Create a file called hello.txt with content 'hello world'"
def test_execution_log_populates(self, client: httpx.Client, workflow: dict):
"""Wait for agent to start producing execution log entries."""
deadline = time.time() + 90
entries = []
while time.time() < deadline:
r = client.get(f"/workflows/{workflow['id']}/steps")
assert r.status_code == 200
entries = r.json()
if len(entries) >= 1:
break
time.sleep(3)
assert len(entries) >= 1, "No execution log entries within timeout"
# Verify entry structure
e = entries[0]
assert "id" in e
assert "tool_name" in e
assert "status" in e
def test_workflow_completes(self, client: httpx.Client, project: dict, workflow: dict):
"""Wait for workflow to reach terminal status."""
deadline = time.time() + 120
status = "executing"
while time.time() < deadline:
r = client.get(f"/projects/{project['id']}/workflows")
assert r.status_code == 200
wfs = r.json()
wf = next((w for w in wfs if w["id"] == workflow["id"]), None)
assert wf is not None
status = wf["status"]
if status in ("done", "failed"):
break
time.sleep(5)
assert status in ("done", "failed"), f"Workflow stuck in '{status}'"
def test_execution_log_has_entries(self, client: httpx.Client, workflow: dict):
r = client.get(f"/workflows/{workflow['id']}/steps")
assert r.status_code == 200
entries = r.json()
assert len(entries) >= 2, f"Expected multiple log entries, got {len(entries)}"
tool_names = {e["tool_name"] for e in entries}
# Should have at least used write_file for hello.txt
assert len(tool_names) >= 1
class TestFeedback:
def test_submit_comment(self, client: httpx.Client, workflow: dict):
r = client.post(
f"/workflows/{workflow['id']}/comments",
json={"content": "Also add a goodbye.txt with 'goodbye world'"},
)
assert r.status_code == 200
comment = r.json()
assert comment["content"] == "Also add a goodbye.txt with 'goodbye world'"
def test_feedback_processed(self, client: httpx.Client, project: dict, workflow: dict):
"""After comment, workflow should eventually reach a terminal status.
The feedback LLM may or may not trigger re-execution (revise_plan).
If it decides the feedback is informational, it stays 'done' without re-executing.
Either outcome is valid we just verify it doesn't get stuck.
"""
deadline = time.time() + 120
final_status = None
while time.time() < deadline:
r = client.get(f"/projects/{project['id']}/workflows")
wfs = r.json()
wf = next((w for w in wfs if w["id"] == workflow["id"]), None)
if wf and wf["status"] in ("done", "failed"):
final_status = wf["status"]
break
time.sleep(3)
assert final_status in ("done", "failed"), f"Workflow stuck in '{final_status}' after feedback"
def test_comments_persisted(self, client: httpx.Client, workflow: dict):
r = client.get(f"/workflows/{workflow['id']}/comments")
assert r.status_code == 200
comments = r.json()
assert len(comments) >= 1
assert any("goodbye" in c["content"] for c in comments)