import asyncio import hashlib import hmac import logging import os from fastapi import FastAPI, Request, HTTPException import config from gitea import GiteaClient from workspace import ensure_repo, repo_path, attachments_path logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("gitea-bot") gitea = GiteaClient() MAX_RESPONSE_LEN = 60000 CLAUDE_TIMEOUT = 3600 # 60 minutes app = FastAPI() # --- Helpers --- def verify_signature(payload: bytes, signature: str) -> bool: if not config.WEBHOOK_SECRET: return True expected = hmac.new( config.WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) def is_mentioned(text: str | None) -> bool: return bool(text and f"@{config.BOT_USERNAME}" in text) def build_prompt( owner: str, repo: str, issue: dict, comments: list[dict], trigger_comment: dict | None, attachments: list[dict], pr_info: dict | None, ) -> str: is_pr = pr_info is not None kind = "Pull Request" if is_pr else "Issue" parts = [f"## {kind} #{issue['number']}: {issue['title']}"] parts.append(f"**Repository**: {owner}/{repo}") parts.append(f"**Author**: @{issue['user']['login']}") parts.append(f"**State**: {issue['state']}") if is_pr: parts.append(f"**Base**: {pr_info.get('base', {}).get('ref', 'main')}") parts.append(f"**Head**: {pr_info.get('head', {}).get('ref', '?')}") parts.append("") parts.append("This is a pull request. Use `git diff` to see the changes. " "The PR branch is already checked out.") parts.append("") parts.append("### Description") parts.append(issue.get("body") or "(no description)") if comments: parts.append("\n---\n### Comments") for c in comments: user = c["user"]["login"] if user == config.BOT_USERNAME: parts.append(f"\n**@{user}** (you, earlier):") else: parts.append(f"\n**@{user}**:") parts.append(c["body"]) if attachments: parts.append("\n---\n### Attachments") parts.append("The following files have been downloaded locally. " "Use the Read tool to view images.") for a in attachments: parts.append(f"- `{a['path']}` ({a['name']})") parts.append("\n---") if trigger_comment: parts.append( f"You were mentioned by **@{trigger_comment['user']['login']}** " f"in the latest comment. Respond to their request." ) else: parts.append( "You were mentioned in the issue/PR body. Respond to the author's request." ) return "\n".join(parts) async def run_claude(prompt: str, cwd: str) -> str: env = os.environ.copy() env["GITEA_TOKEN"] = config.GITEA_TOKEN env["GITEA_URL"] = config.GITEA_URL cmd = [ config.CLAUDE_COMMAND, "-p", "--dangerously-skip-permissions", "--output-format", "text", "--model", config.CLAUDE_MODEL, "--system-prompt", config.CLAUDE_SYSTEM_PROMPT, ] log.info(f"Running claude in {cwd}") proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=prompt.encode()), timeout=CLAUDE_TIMEOUT, ) except asyncio.TimeoutError: proc.kill() await proc.wait() return f"*Claude timed out after {CLAUDE_TIMEOUT // 60} minutes.*" if proc.returncode != 0: err = stderr.decode().strip() log.error(f"Claude exited {proc.returncode}: {err}") return f"*Claude encountered an error (exit {proc.returncode}):*\n```\n{err[:2000]}\n```" response = stdout.decode().strip() if len(response) > MAX_RESPONSE_LEN: response = response[:MAX_RESPONSE_LEN] + "\n\n*(response truncated)*" return response async def process_mention( owner: str, repo: str, issue_number: int, trigger_comment: dict | None, comment_id: int | None, ): wip_comment_id = None try: # Post "working on it" placeholder wip = await gitea.create_comment( owner, repo, issue_number, "⏳ Working on it…" ) wip_comment_id = wip["id"] # Set up workspace rp = await ensure_repo(owner, repo, issue_number) att = attachments_path(owner, repo, issue_number) # Fetch issue/PR details issue = await gitea.get_issue(owner, repo, issue_number) comments = await gitea.get_comments(owner, repo, issue_number) # Check if PR and checkout head branch pr_info = None pull_request = issue.get("pull_request") if pull_request and pull_request.get("merged") is not None: pr_info = await gitea.get_pull_request(owner, repo, issue_number) head_branch = pr_info.get("head", {}).get("ref") if head_branch: proc = await asyncio.create_subprocess_exec( "git", "fetch", "origin", head_branch, cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await proc.communicate() proc = await asyncio.create_subprocess_exec( "git", "checkout", head_branch, cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await proc.communicate() # Download attachments attachments = await gitea.collect_attachments(owner, repo, issue_number, comments, att) # Build prompt and run claude prompt = build_prompt(owner, repo, issue, comments, trigger_comment, attachments, pr_info) response = await run_claude(prompt, str(rp)) # Edit placeholder with actual response await gitea.edit_comment(owner, repo, wip_comment_id, response) log.info(f"Responded to {owner}/{repo}#{issue_number}") except Exception: log.exception(f"Error processing {owner}/{repo}#{issue_number}") error_msg = "*An error occurred while processing this request. Check bot logs for details.*" try: if wip_comment_id: await gitea.edit_comment(owner, repo, wip_comment_id, error_msg) else: await gitea.create_comment(owner, repo, issue_number, error_msg) except Exception: log.exception("Failed to post error comment") # --- Routes --- @app.post("/webhook") async def webhook(request: Request): body = await request.body() sig = request.headers.get("X-Gitea-Signature", "") if not verify_signature(body, sig): raise HTTPException(status_code=403, detail="bad signature") event = request.headers.get("X-Gitea-Event", "") payload = await request.json() action = payload.get("action", "") repo_data = payload.get("repository", {}) owner = repo_data.get("owner", {}).get("login", "") repo = repo_data.get("name", "") if config.ALLOWED_OWNERS and owner not in config.ALLOWED_OWNERS: return {"status": "skip", "reason": f"owner {owner} not allowed"} if event == "issue_comment" and action == "created": comment = payload["comment"] if comment["user"]["login"] == config.BOT_USERNAME: return {"status": "skip", "reason": "own comment"} if not is_mentioned(comment.get("body", "")): return {"status": "skip", "reason": "not mentioned"} issue_number = payload["issue"]["number"] log.info(f"Mentioned in {owner}/{repo}#{issue_number} (comment)") asyncio.create_task( process_mention(owner, repo, issue_number, comment, comment["id"]) ) return {"status": "processing"} elif event == "issues" and action == "opened": issue = payload["issue"] if issue["user"]["login"] == config.BOT_USERNAME: return {"status": "skip", "reason": "own issue"} if not config.AUTO_RESPOND_NEW_ISSUES and not is_mentioned(issue.get("body", "")): return {"status": "skip", "reason": "not mentioned"} issue_number = issue["number"] log.info(f"New issue {owner}/{repo}#{issue_number}") asyncio.create_task( process_mention(owner, repo, issue_number, None, None) ) return {"status": "processing"} return {"status": "skip", "reason": f"unhandled {event}/{action}"} @app.get("/health") async def health(): return {"status": "ok"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=config.LISTEN_PORT)