gitea-bot/main.py
2026-04-07 10:51:40 +01:00

269 lines
8.7 KiB
Python

import asyncio
import hashlib
import hmac
import logging
import os
from fastapi import FastAPI, Request, HTTPException
import config
from gitea import GiteaClient
from workspace import ensure_repo, repo_path, attachments_path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("gitea-bot")
gitea = GiteaClient()
ALLOWED_OWNER = "euphon"
MAX_RESPONSE_LEN = 60000
CLAUDE_TIMEOUT = 3600 # 60 minutes
app = FastAPI()
# --- Helpers ---
def verify_signature(payload: bytes, signature: str) -> bool:
if not config.WEBHOOK_SECRET:
return True
expected = hmac.new(
config.WEBHOOK_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def is_mentioned(text: str | None) -> bool:
return bool(text and f"@{config.BOT_USERNAME}" in text)
def build_prompt(
owner: str,
repo: str,
issue: dict,
comments: list[dict],
trigger_comment: dict | None,
attachments: list[dict],
pr_info: dict | None,
) -> str:
is_pr = pr_info is not None
kind = "Pull Request" if is_pr else "Issue"
parts = [f"## {kind} #{issue['number']}: {issue['title']}"]
parts.append(f"**Repository**: {owner}/{repo}")
parts.append(f"**Author**: @{issue['user']['login']}")
parts.append(f"**State**: {issue['state']}")
if is_pr:
parts.append(f"**Base**: {pr_info.get('base', {}).get('ref', 'main')}")
parts.append(f"**Head**: {pr_info.get('head', {}).get('ref', '?')}")
parts.append("")
parts.append("This is a pull request. Use `git diff` to see the changes. "
"The PR branch is already checked out.")
parts.append("")
parts.append("### Description")
parts.append(issue.get("body") or "(no description)")
if comments:
parts.append("\n---\n### Comments")
for c in comments:
user = c["user"]["login"]
if user == config.BOT_USERNAME:
parts.append(f"\n**@{user}** (you, earlier):")
else:
parts.append(f"\n**@{user}**:")
parts.append(c["body"])
if attachments:
parts.append("\n---\n### Attachments")
parts.append("The following files have been downloaded locally. "
"Use the Read tool to view images.")
for a in attachments:
parts.append(f"- `{a['path']}` ({a['name']})")
parts.append("\n---")
if trigger_comment:
parts.append(
f"You were mentioned by **@{trigger_comment['user']['login']}** "
f"in the latest comment. Respond to their request."
)
else:
parts.append(
"You were mentioned in the issue/PR body. Respond to the author's request."
)
return "\n".join(parts)
async def run_claude(prompt: str, cwd: str) -> str:
env = os.environ.copy()
env["GITEA_TOKEN"] = config.GITEA_TOKEN
env["GITEA_URL"] = config.GITEA_URL
cmd = [
config.CLAUDE_COMMAND,
"-p",
"--dangerously-skip-permissions",
"--output-format", "text",
"--model", config.CLAUDE_MODEL,
"--system-prompt", config.CLAUDE_SYSTEM_PROMPT,
]
log.info(f"Running claude in {cwd}")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=CLAUDE_TIMEOUT,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return f"*Claude timed out after {CLAUDE_TIMEOUT // 60} minutes.*"
if proc.returncode != 0:
err = stderr.decode().strip()
log.error(f"Claude exited {proc.returncode}: {err}")
return f"*Claude encountered an error (exit {proc.returncode}):*\n```\n{err[:2000]}\n```"
response = stdout.decode().strip()
if len(response) > MAX_RESPONSE_LEN:
response = response[:MAX_RESPONSE_LEN] + "\n\n*(response truncated)*"
return response
async def process_mention(
owner: str,
repo: str,
issue_number: int,
trigger_comment: dict | None,
comment_id: int | None,
):
wip_comment_id = None
try:
# Post "working on it" placeholder
wip = await gitea.create_comment(
owner, repo, issue_number, "⏳ Working on it…"
)
wip_comment_id = wip["id"]
# Set up workspace
rp = await ensure_repo(owner, repo, issue_number)
att = attachments_path(owner, repo, issue_number)
# Fetch issue/PR details
issue = await gitea.get_issue(owner, repo, issue_number)
comments = await gitea.get_comments(owner, repo, issue_number)
# Check if PR and checkout head branch
pr_info = None
pull_request = issue.get("pull_request")
if pull_request and pull_request.get("merged") is not None:
pr_info = await gitea.get_pull_request(owner, repo, issue_number)
head_branch = pr_info.get("head", {}).get("ref")
if head_branch:
proc = await asyncio.create_subprocess_exec(
"git", "fetch", "origin", head_branch,
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
proc = await asyncio.create_subprocess_exec(
"git", "checkout", head_branch,
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
# Download attachments
attachments = await gitea.collect_attachments(owner, repo, issue_number, comments, att)
# Build prompt and run claude
prompt = build_prompt(owner, repo, issue, comments, trigger_comment, attachments, pr_info)
response = await run_claude(prompt, str(rp))
# Edit placeholder with actual response
await gitea.edit_comment(owner, repo, wip_comment_id, response)
log.info(f"Responded to {owner}/{repo}#{issue_number}")
except Exception:
log.exception(f"Error processing {owner}/{repo}#{issue_number}")
error_msg = "*An error occurred while processing this request. Check bot logs for details.*"
try:
if wip_comment_id:
await gitea.edit_comment(owner, repo, wip_comment_id, error_msg)
else:
await gitea.create_comment(owner, repo, issue_number, error_msg)
except Exception:
log.exception("Failed to post error comment")
# --- Routes ---
@app.post("/webhook")
async def webhook(request: Request):
body = await request.body()
sig = request.headers.get("X-Gitea-Signature", "")
if not verify_signature(body, sig):
raise HTTPException(status_code=403, detail="bad signature")
event = request.headers.get("X-Gitea-Event", "")
payload = await request.json()
action = payload.get("action", "")
repo_data = payload.get("repository", {})
owner = repo_data.get("owner", {}).get("login", "")
repo = repo_data.get("name", "")
if owner != ALLOWED_OWNER:
return {"status": "skip", "reason": f"owner {owner} not allowed"}
if event == "issue_comment" and action == "created":
comment = payload["comment"]
if comment["user"]["login"] == config.BOT_USERNAME:
return {"status": "skip", "reason": "own comment"}
if not is_mentioned(comment.get("body", "")):
return {"status": "skip", "reason": "not mentioned"}
issue_number = payload["issue"]["number"]
log.info(f"Mentioned in {owner}/{repo}#{issue_number} (comment)")
asyncio.create_task(
process_mention(owner, repo, issue_number, comment, comment["id"])
)
return {"status": "processing"}
elif event == "issues" and action == "opened":
issue = payload["issue"]
if issue["user"]["login"] == config.BOT_USERNAME:
return {"status": "skip", "reason": "own issue"}
if not is_mentioned(issue.get("body", "")):
return {"status": "skip", "reason": "not mentioned"}
issue_number = issue["number"]
log.info(f"Mentioned in new issue {owner}/{repo}#{issue_number}")
asyncio.create_task(
process_mention(owner, repo, issue_number, None, None)
)
return {"status": "processing"}
return {"status": "skip", "reason": f"unhandled {event}/{action}"}
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.LISTEN_PORT)