gitea-bot/main.py

294 lines
9.4 KiB
Python

import asyncio
import hashlib
import hmac
import logging
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
import config
from gitea import GiteaClient
from workspace import ensure_repo, repo_path, attachments_path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("gitea-bot")
app = FastAPI(lifespan=lifespan)
gitea = GiteaClient()
ALLOWED_OWNER = "fam"
MAX_RESPONSE_LEN = 60000
CLAUDE_TIMEOUT = 600 # 10 minutes
WEBHOOK_SYNC_INTERVAL = 300 # 5 minutes
WEBHOOK_URL = "http://100.65.168.42:9880/webhook"
def verify_signature(payload: bytes, signature: str) -> bool:
if not config.WEBHOOK_SECRET:
return True
expected = hmac.new(
config.WEBHOOK_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def is_mentioned(text: str | None) -> bool:
return bool(text and f"@{config.BOT_USERNAME}" in text)
def build_prompt(
owner: str,
repo: str,
issue: dict,
comments: list[dict],
trigger_comment: dict | None,
attachments: list[dict],
pr_info: dict | None,
) -> str:
is_pr = pr_info is not None
kind = "Pull Request" if is_pr else "Issue"
parts = [f"## {kind} #{issue['number']}: {issue['title']}"]
parts.append(f"**Repository**: {owner}/{repo}")
parts.append(f"**Author**: @{issue['user']['login']}")
parts.append(f"**State**: {issue['state']}")
if is_pr:
parts.append(f"**Base**: {pr_info.get('base', {}).get('ref', 'main')}")
parts.append(f"**Head**: {pr_info.get('head', {}).get('ref', '?')}")
parts.append("")
parts.append("This is a pull request. Use `git diff` to see the changes. "
"The PR branch is already checked out.")
parts.append("")
parts.append("### Description")
parts.append(issue.get("body") or "(no description)")
if comments:
parts.append("\n---\n### Comments")
for c in comments:
user = c["user"]["login"]
if user == config.BOT_USERNAME:
parts.append(f"\n**@{user}** (you, earlier):")
else:
parts.append(f"\n**@{user}**:")
parts.append(c["body"])
if attachments:
parts.append("\n---\n### Attachments")
parts.append("The following files have been downloaded locally. "
"Use the Read tool to view images.")
for a in attachments:
parts.append(f"- `{a['path']}` ({a['name']})")
parts.append("\n---")
if trigger_comment:
parts.append(
f"You were mentioned by **@{trigger_comment['user']['login']}** "
f"in the latest comment. Respond to their request."
)
else:
parts.append(
"You were mentioned in the issue/PR body. Respond to the author's request."
)
return "\n".join(parts)
async def run_claude(prompt: str, cwd: str) -> str:
env = os.environ.copy()
env["GITEA_TOKEN"] = config.GITEA_TOKEN
env["GITEA_URL"] = config.GITEA_URL
cmd = [
config.CLAUDE_COMMAND,
"-p",
"--dangerously-skip-permissions",
"--output-format", "text",
"--model", config.CLAUDE_MODEL,
"--system-prompt", config.CLAUDE_SYSTEM_PROMPT,
]
log.info(f"Running claude in {cwd}")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=CLAUDE_TIMEOUT,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return "*Claude timed out after 10 minutes.*"
if proc.returncode != 0:
err = stderr.decode().strip()
log.error(f"Claude exited {proc.returncode}: {err}")
return f"*Claude encountered an error (exit {proc.returncode}):*\n```\n{err[:2000]}\n```"
response = stdout.decode().strip()
if len(response) > MAX_RESPONSE_LEN:
response = response[:MAX_RESPONSE_LEN] + "\n\n*(response truncated)*"
return response
async def process_mention(
owner: str,
repo: str,
issue_number: int,
trigger_comment: dict | None,
comment_id: int | None,
):
try:
# React to acknowledge
if comment_id:
await gitea.add_reaction(owner, repo, comment_id, "eyes")
# Set up workspace
rp = await ensure_repo(owner, repo, issue_number)
att = attachments_path(owner, repo, issue_number)
# Fetch issue/PR details
issue = await gitea.get_issue(owner, repo, issue_number)
comments = await gitea.get_comments(owner, repo, issue_number)
# Check if PR and checkout head branch
pr_info = None
pull_request = issue.get("pull_request")
if pull_request and pull_request.get("merged") is not None:
# It's a PR - fetch full PR info
pr_info = await gitea.get_pull_request(owner, repo, issue_number)
head_branch = pr_info.get("head", {}).get("ref")
if head_branch:
proc = await asyncio.create_subprocess_exec(
"git", "fetch", "origin", head_branch,
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
proc = await asyncio.create_subprocess_exec(
"git", "checkout", head_branch,
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
# Download attachments
attachments = await gitea.collect_attachments(owner, repo, issue_number, comments, att)
# Build prompt and run claude
prompt = build_prompt(owner, repo, issue, comments, trigger_comment, attachments, pr_info)
response = await run_claude(prompt, str(rp))
# Post response
await gitea.create_comment(owner, repo, issue_number, response)
log.info(f"Responded to {owner}/{repo}#{issue_number}")
except Exception:
log.exception(f"Error processing {owner}/{repo}#{issue_number}")
try:
await gitea.create_comment(
owner, repo, issue_number,
"*An error occurred while processing this request. Check bot logs for details.*",
)
except Exception:
log.exception("Failed to post error comment")
@app.post("/webhook")
async def webhook(request: Request):
body = await request.body()
sig = request.headers.get("X-Gitea-Signature", "")
if not verify_signature(body, sig):
raise HTTPException(status_code=403, detail="bad signature")
event = request.headers.get("X-Gitea-Event", "")
payload = await request.json()
action = payload.get("action", "")
repo_data = payload.get("repository", {})
owner = repo_data.get("owner", {}).get("login", "")
repo = repo_data.get("name", "")
# Only process repos owned by fam
if owner != ALLOWED_OWNER:
return {"status": "skip", "reason": f"owner {owner} not allowed"}
if event == "issue_comment" and action == "created":
comment = payload["comment"]
if comment["user"]["login"] == config.BOT_USERNAME:
return {"status": "skip", "reason": "own comment"}
if not is_mentioned(comment.get("body", "")):
return {"status": "skip", "reason": "not mentioned"}
issue_number = payload["issue"]["number"]
log.info(f"Mentioned in {owner}/{repo}#{issue_number} (comment)")
asyncio.create_task(
process_mention(owner, repo, issue_number, comment, comment["id"])
)
return {"status": "processing"}
elif event == "issues" and action == "opened":
issue = payload["issue"]
if issue["user"]["login"] == config.BOT_USERNAME:
return {"status": "skip", "reason": "own issue"}
if not is_mentioned(issue.get("body", "")):
return {"status": "skip", "reason": "not mentioned"}
issue_number = issue["number"]
log.info(f"Mentioned in new issue {owner}/{repo}#{issue_number}")
asyncio.create_task(
process_mention(owner, repo, issue_number, None, None)
)
return {"status": "processing"}
return {"status": "skip", "reason": f"unhandled {event}/{action}"}
async def sync_webhooks():
"""Ensure all repos under ALLOWED_OWNER have our webhook."""
try:
repos = await gitea.list_user_repos(ALLOWED_OWNER)
for r in repos:
name = r["name"]
created = await gitea.ensure_webhook(ALLOWED_OWNER, name, WEBHOOK_URL)
if created:
log.info(f"Added webhook to {ALLOWED_OWNER}/{name}")
except Exception:
log.exception("Failed to sync webhooks")
async def webhook_sync_loop():
"""Periodically sync webhooks for new repos."""
while True:
await sync_webhooks()
await asyncio.sleep(WEBHOOK_SYNC_INTERVAL)
@asynccontextmanager
async def lifespan(app):
task = asyncio.create_task(webhook_sync_loop())
yield
task.cancel()
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.LISTEN_PORT)