265 lines
8.5 KiB
Python
265 lines
8.5 KiB
Python
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import logging
|
|
import os
|
|
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
|
|
import config
|
|
from gitea import GiteaClient
|
|
from workspace import ensure_repo, repo_path, attachments_path
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
log = logging.getLogger("gitea-bot")
|
|
|
|
gitea = GiteaClient()
|
|
|
|
ALLOWED_OWNER = "euphon"
|
|
MAX_RESPONSE_LEN = 60000
|
|
CLAUDE_TIMEOUT = 600 # 10 minutes
|
|
app = FastAPI()
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
def verify_signature(payload: bytes, signature: str) -> bool:
|
|
if not config.WEBHOOK_SECRET:
|
|
return True
|
|
expected = hmac.new(
|
|
config.WEBHOOK_SECRET.encode(), payload, hashlib.sha256
|
|
).hexdigest()
|
|
return hmac.compare_digest(f"sha256={expected}", signature)
|
|
|
|
|
|
def is_mentioned(text: str | None) -> bool:
|
|
return bool(text and f"@{config.BOT_USERNAME}" in text)
|
|
|
|
|
|
def build_prompt(
|
|
owner: str,
|
|
repo: str,
|
|
issue: dict,
|
|
comments: list[dict],
|
|
trigger_comment: dict | None,
|
|
attachments: list[dict],
|
|
pr_info: dict | None,
|
|
) -> str:
|
|
is_pr = pr_info is not None
|
|
kind = "Pull Request" if is_pr else "Issue"
|
|
|
|
parts = [f"## {kind} #{issue['number']}: {issue['title']}"]
|
|
parts.append(f"**Repository**: {owner}/{repo}")
|
|
parts.append(f"**Author**: @{issue['user']['login']}")
|
|
parts.append(f"**State**: {issue['state']}")
|
|
|
|
if is_pr:
|
|
parts.append(f"**Base**: {pr_info.get('base', {}).get('ref', 'main')}")
|
|
parts.append(f"**Head**: {pr_info.get('head', {}).get('ref', '?')}")
|
|
parts.append("")
|
|
parts.append("This is a pull request. Use `git diff` to see the changes. "
|
|
"The PR branch is already checked out.")
|
|
|
|
parts.append("")
|
|
parts.append("### Description")
|
|
parts.append(issue.get("body") or "(no description)")
|
|
|
|
if comments:
|
|
parts.append("\n---\n### Comments")
|
|
for c in comments:
|
|
user = c["user"]["login"]
|
|
if user == config.BOT_USERNAME:
|
|
parts.append(f"\n**@{user}** (you, earlier):")
|
|
else:
|
|
parts.append(f"\n**@{user}**:")
|
|
parts.append(c["body"])
|
|
|
|
if attachments:
|
|
parts.append("\n---\n### Attachments")
|
|
parts.append("The following files have been downloaded locally. "
|
|
"Use the Read tool to view images.")
|
|
for a in attachments:
|
|
parts.append(f"- `{a['path']}` ({a['name']})")
|
|
|
|
parts.append("\n---")
|
|
if trigger_comment:
|
|
parts.append(
|
|
f"You were mentioned by **@{trigger_comment['user']['login']}** "
|
|
f"in the latest comment. Respond to their request."
|
|
)
|
|
else:
|
|
parts.append(
|
|
"You were mentioned in the issue/PR body. Respond to the author's request."
|
|
)
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
async def run_claude(prompt: str, cwd: str) -> str:
|
|
env = os.environ.copy()
|
|
env["GITEA_TOKEN"] = config.GITEA_TOKEN
|
|
env["GITEA_URL"] = config.GITEA_URL
|
|
|
|
cmd = [
|
|
config.CLAUDE_COMMAND,
|
|
"-p",
|
|
"--dangerously-skip-permissions",
|
|
"--output-format", "text",
|
|
"--model", config.CLAUDE_MODEL,
|
|
"--system-prompt", config.CLAUDE_SYSTEM_PROMPT,
|
|
]
|
|
|
|
log.info(f"Running claude in {cwd}")
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=cwd,
|
|
env=env,
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(
|
|
proc.communicate(input=prompt.encode()),
|
|
timeout=CLAUDE_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return "*Claude timed out after 10 minutes.*"
|
|
|
|
if proc.returncode != 0:
|
|
err = stderr.decode().strip()
|
|
log.error(f"Claude exited {proc.returncode}: {err}")
|
|
return f"*Claude encountered an error (exit {proc.returncode}):*\n```\n{err[:2000]}\n```"
|
|
|
|
response = stdout.decode().strip()
|
|
if len(response) > MAX_RESPONSE_LEN:
|
|
response = response[:MAX_RESPONSE_LEN] + "\n\n*(response truncated)*"
|
|
|
|
return response
|
|
|
|
|
|
async def process_mention(
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
trigger_comment: dict | None,
|
|
comment_id: int | None,
|
|
):
|
|
try:
|
|
# React to acknowledge
|
|
if comment_id:
|
|
await gitea.add_reaction(owner, repo, comment_id, "eyes")
|
|
|
|
# Set up workspace
|
|
rp = await ensure_repo(owner, repo, issue_number)
|
|
att = attachments_path(owner, repo, issue_number)
|
|
|
|
# Fetch issue/PR details
|
|
issue = await gitea.get_issue(owner, repo, issue_number)
|
|
comments = await gitea.get_comments(owner, repo, issue_number)
|
|
|
|
# Check if PR and checkout head branch
|
|
pr_info = None
|
|
pull_request = issue.get("pull_request")
|
|
if pull_request and pull_request.get("merged") is not None:
|
|
pr_info = await gitea.get_pull_request(owner, repo, issue_number)
|
|
head_branch = pr_info.get("head", {}).get("ref")
|
|
if head_branch:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", "fetch", "origin", head_branch,
|
|
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
await proc.communicate()
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git", "checkout", head_branch,
|
|
cwd=rp, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
await proc.communicate()
|
|
|
|
# Download attachments
|
|
attachments = await gitea.collect_attachments(owner, repo, issue_number, comments, att)
|
|
|
|
# Build prompt and run claude
|
|
prompt = build_prompt(owner, repo, issue, comments, trigger_comment, attachments, pr_info)
|
|
response = await run_claude(prompt, str(rp))
|
|
|
|
# Post response
|
|
await gitea.create_comment(owner, repo, issue_number, response)
|
|
log.info(f"Responded to {owner}/{repo}#{issue_number}")
|
|
|
|
except Exception:
|
|
log.exception(f"Error processing {owner}/{repo}#{issue_number}")
|
|
try:
|
|
await gitea.create_comment(
|
|
owner, repo, issue_number,
|
|
"*An error occurred while processing this request. Check bot logs for details.*",
|
|
)
|
|
except Exception:
|
|
log.exception("Failed to post error comment")
|
|
|
|
|
|
# --- Routes ---
|
|
|
|
@app.post("/webhook")
|
|
async def webhook(request: Request):
|
|
body = await request.body()
|
|
|
|
sig = request.headers.get("X-Gitea-Signature", "")
|
|
if not verify_signature(body, sig):
|
|
raise HTTPException(status_code=403, detail="bad signature")
|
|
|
|
event = request.headers.get("X-Gitea-Event", "")
|
|
payload = await request.json()
|
|
action = payload.get("action", "")
|
|
|
|
repo_data = payload.get("repository", {})
|
|
owner = repo_data.get("owner", {}).get("login", "")
|
|
repo = repo_data.get("name", "")
|
|
|
|
if owner != ALLOWED_OWNER:
|
|
return {"status": "skip", "reason": f"owner {owner} not allowed"}
|
|
|
|
if event == "issue_comment" and action == "created":
|
|
comment = payload["comment"]
|
|
if comment["user"]["login"] == config.BOT_USERNAME:
|
|
return {"status": "skip", "reason": "own comment"}
|
|
if not is_mentioned(comment.get("body", "")):
|
|
return {"status": "skip", "reason": "not mentioned"}
|
|
|
|
issue_number = payload["issue"]["number"]
|
|
log.info(f"Mentioned in {owner}/{repo}#{issue_number} (comment)")
|
|
asyncio.create_task(
|
|
process_mention(owner, repo, issue_number, comment, comment["id"])
|
|
)
|
|
return {"status": "processing"}
|
|
|
|
elif event == "issues" and action == "opened":
|
|
issue = payload["issue"]
|
|
if issue["user"]["login"] == config.BOT_USERNAME:
|
|
return {"status": "skip", "reason": "own issue"}
|
|
if not is_mentioned(issue.get("body", "")):
|
|
return {"status": "skip", "reason": "not mentioned"}
|
|
|
|
issue_number = issue["number"]
|
|
log.info(f"Mentioned in new issue {owner}/{repo}#{issue_number}")
|
|
asyncio.create_task(
|
|
process_mention(owner, repo, issue_number, None, None)
|
|
)
|
|
return {"status": "processing"}
|
|
|
|
return {"status": "skip", "reason": f"unhandled {event}/{action}"}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=config.LISTEN_PORT)
|