#!/usr/bin/env python3 """gitea-bot — mochi 全站 issue/PR 自动处理。 双轨: - **webhook**: 主路径,gitea system hook → http://10.42.0.1:31390/hook, HMAC 签名验签,X-Gitea-Delivery 幂等,事件入 queue 由 worker 串行 dispatch。 - **polling**: 兜底,每 60s 拉全站 open issue/PR,把 webhook 漏的 enqueue。 dispatch = spawn `claude --dangerously-skip-permissions -p `,让 claude 用 mochi token 自己调 gitea API + git push 完成回复 / 提 PR。 """ import hashlib import hmac import json import os import queue import shlex import sqlite3 import subprocess import sys import threading import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path CONFIG = { "base_url": "https://famzheng.me/gitea", "username": "mochi", "token_file": Path.home() / ".gitea-mochi-token", "webhook_secret_file": Path.home() / ".gitea-webhook-secret", "state_dir": Path.home() / ".local/state/gitea-bot", "poll_interval_sec": 60, "webhook_listen": ("0.0.0.0", 31390), "claude_bin": "/home/fam/.local/bin/claude", "claude_timeout_sec": 1200, } # events from webhook + polling get pushed here; worker drains EVENT_Q: "queue.Queue[tuple[str, dict]]" = queue.Queue(maxsize=10000) def log(msg: str) -> None: ts = datetime.now().isoformat(timespec="seconds") print(f"[{ts}] {msg}", flush=True) def load_token() -> str: return CONFIG["token_file"].read_text().strip() def gitea_get(path: str, token: str, **params) -> object: url = f"{CONFIG['base_url']}/api/v1{path}" if params: url += "?" + urllib.parse.urlencode(params) req = urllib.request.Request( url, headers={ "Authorization": f"token {token}", "Accept": "application/json", "User-Agent": "gitea-bot/mochi", }, ) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) def list_all_repos(token: str) -> list[dict]: repos: list[dict] = [] page = 1 while True: batch = gitea_get("/repos/search", token, limit=50, page=page) items = batch.get("data", []) if isinstance(batch, dict) else [] if not items: break repos.extend(items) if len(items) < 50: break page += 1 return repos def list_open_issues(owner: str, repo: str, token: str, type_: str) -> list[dict]: """type_ in {'issues', 'pulls'}""" items: list[dict] = [] page = 1 while True: batch = gitea_get( f"/repos/{owner}/{repo}/issues", token, state="open", type=type_, page=page, limit=50, ) if not isinstance(batch, list) or not batch: break items.extend(batch) if len(batch) < 50: break page += 1 return items def list_comments(owner: str, repo: str, number: int, token: str) -> list[dict]: out = gitea_get(f"/repos/{owner}/{repo}/issues/{number}/comments", token) return out if isinstance(out, list) else [] def init_state(db_path: Path) -> sqlite3.Connection: db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path, check_same_thread=False) conn.execute( """ CREATE TABLE IF NOT EXISTS handled ( key TEXT PRIMARY KEY, handled_at TEXT NOT NULL, last_signal TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS deliveries ( id TEXT PRIMARY KEY, ts TEXT NOT NULL ) """ ) conn.commit() return conn def is_delivered(conn: sqlite3.Connection, delivery_id: str) -> bool: if not delivery_id: return False row = conn.execute( "SELECT 1 FROM deliveries WHERE id = ?", (delivery_id,) ).fetchone() return row is not None def mark_delivered(conn: sqlite3.Connection, delivery_id: str) -> None: if not delivery_id: return conn.execute( "INSERT OR IGNORE INTO deliveries(id, ts) VALUES (?, ?)", (delivery_id, datetime.now(timezone.utc).isoformat()), ) conn.commit() def get_handled_signal(conn: sqlite3.Connection, key: str) -> str | None: row = conn.execute( "SELECT last_signal FROM handled WHERE key = ?", (key,) ).fetchone() return row[0] if row else None def set_handled(conn: sqlite3.Connection, key: str, signal: str) -> None: conn.execute( """ INSERT INTO handled(key, handled_at, last_signal) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET handled_at = excluded.handled_at, last_signal = excluded.last_signal """, (key, datetime.now(timezone.utc).isoformat(), signal), ) conn.commit() def latest_signal(issue: dict, comments: list[dict]) -> tuple[str, str]: """Return (time, author_login) for the latest activity on this issue.""" cands: list[tuple[str, str]] = [ (issue["updated_at"], issue["user"]["login"]), ] for c in comments: cands.append((c["updated_at"], c["user"]["login"])) cands.sort(key=lambda x: x[0]) return cands[-1] def build_prompt(repo: dict, issue: dict, is_pr: bool, comments: list[dict]) -> str: typ = "pull request" if is_pr else "issue" full = repo["full_name"] labels = ", ".join(l["name"] for l in issue.get("labels") or []) or "(none)" body = (issue.get("body") or "(empty)").strip() if len(body) > 8000: body = body[:8000] + "\n…(truncated)" comments_tail = comments[-10:] if comments_tail: comments_str = "\n\n".join( f"### @{c['user']['login']} at {c['created_at']}\n{(c.get('body') or '').strip()}" for c in comments_tail ) else: comments_str = "(no comments yet)" pr_note = "" if is_pr: pr_note = ( f"\nThis is a PULL REQUEST. You can fetch the diff at " f"`GET /repos/{full}/pulls/{issue['number']}.diff` (raw text). " f"You may post review comments via the issues comments endpoint, " f"or full reviews via `POST /repos/{full}/pulls/{issue['number']}/reviews`.\n" ) return f"""You are **mochi** (麻薯), the autonomous Gitea bot running site-wide on {CONFIG['base_url']}. You are invoked once per "new activity" on an issue or PR and must finish your work in this single run. # Identity & access - Gitea username: `mochi` (you have admin on this instance). - Auth token: env var `MOCHI_TOKEN`. Send it as `Authorization: token $MOCHI_TOKEN`. - API base: `{CONFIG['base_url']}/api/v1` - SSH is disabled — git over HTTPS only. To clone/push: `git clone https://mochi:$MOCHI_TOKEN@famzheng.me/gitea//.git` - Configure git author when committing: `git -c user.name=mochi -c user.email=mochi@famzheng.me commit ...` - Work in `/tmp/gitea-bot-/`. Clean up when done. - You have full shell access (--dangerously-skip-permissions). Be careful but decisive. # Voice - Smart, concise senior-engineer tone. No customer-service fluff. - Reply in the **same language** as the issue body (中文 or English — detect it). - Do NOT use bracketed RP actions like `(笑)` `(歪头)` `(撒娇)`. Express tone with words. - Sign every comment you POST with this footer line on its own line: `🤖 mochi bot · automated · {CONFIG['base_url']}` # Target {typ} - Repo: `{full}` (default branch: `{repo.get('default_branch', 'main')}`) - Issue/PR #: {issue['number']} - Title: {issue['title']} - Author: @{issue['user']['login']} - URL: {issue['html_url']} - Labels: {labels} - State: {issue['state']} - Created: {issue['created_at']} · Updated: {issue['updated_at']} ## Body {body} ## Recent comments (oldest → newest, up to 10) {comments_str} {pr_note} # What to do 1. Re-read the issue/PR. If the **latest** comment is from `mochi` and there's no new question/instruction after it, exit silently — do not double-post. 2. Classify and act: - **Question / discussion / unclear ask** → post **one** helpful comment (≤ 8 lines). Ask clarifying questions if needed. Do not open a PR. - **Feature request** (clear, scoped, small enough that you can implement it in one PR): a. `git clone` the repo to a temp dir. b. `git switch -c mochi/issue-{issue['number']}` c. Implement the change. If the repo has obvious build/test commands (Makefile, package.json scripts, cargo, etc.), run them and ensure they pass. d. Commit with a clear message; author `mochi `. e. Push the branch. f. Open a PR via `POST /repos/{full}/pulls` with base = default branch, head = `mochi/issue-{issue['number']}`, title like `feat: (closes #{issue['number']})`, body referencing the issue. g. Post **one** short comment on the issue linking the PR. If the request is too large, ambiguous, or risky → instead post a comment with a proposed plan and ask for confirmation. Don't push. - **Bug report** → if the fix is obvious from reading the code, follow the same flow as feature request with title `fix: (closes #{issue['number']})`. Otherwise post a diagnostic comment asking for repro steps / logs. - **Pull request** → if the PR author is `mochi`, exit. Otherwise fetch the diff and post **one** brief review comment (correctness/style/obvious bugs). Don't push commits to other people's PRs. 3. Be conservative. When unsure, comment instead of pushing code. Begin now. When done, exit. """ def dispatch( repo: dict, issue: dict, is_pr: bool, comments: list[dict], token: str ) -> None: prompt = build_prompt(repo, issue, is_pr, comments) env = os.environ.copy() env["MOCHI_TOKEN"] = token # ensure claude can find git, curl, node (claude wraps node), etc. env["PATH"] = ( "/home/fam/.local/bin:/home/fam/.cargo/bin:" "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" ) env["HOME"] = "/home/fam" full = repo["full_name"] log( f"dispatch {full}#{issue['number']} " f"({'PR' if is_pr else 'issue'}): {issue['title']!r}" ) cmd = [CONFIG["claude_bin"], "--dangerously-skip-permissions", "-p", prompt] try: r = subprocess.run( cmd, env=env, capture_output=True, text=True, timeout=CONFIG["claude_timeout_sec"], cwd="/tmp", ) log(f" → exit={r.returncode}, stdout={len(r.stdout)}B, stderr={len(r.stderr)}B") if r.stdout.strip(): tail = r.stdout[-1500:] log(f" stdout tail:\n{tail}") if r.stderr.strip(): tail = r.stderr[-1500:] log(f" stderr tail:\n{tail}") except subprocess.TimeoutExpired: log(f" → TIMEOUT after {CONFIG['claude_timeout_sec']}s") except Exception as e: log(f" → dispatch error: {e!r}") def enqueue_issue(source: str, repo_full: str, issue_number: int) -> None: """Push (source, {repo_full, number}) onto EVENT_Q for the worker to drain. Worker re-fetches fresh state — no stale issue/comment payloads in queue.""" try: EVENT_Q.put_nowait((source, {"repo_full": repo_full, "number": issue_number})) except queue.Full: log(f"event queue FULL, dropping {source} {repo_full}#{issue_number}") def worker_loop(conn: sqlite3.Connection) -> None: log("dispatch worker started") while True: source, evt = EVENT_Q.get() try: handle_event(conn, source, evt) except Exception as e: log(f"worker error on {source} {evt}: {e!r}") def handle_event(conn: sqlite3.Connection, source: str, evt: dict) -> None: """Re-fetch repo + issue + comments, decide whether to dispatch claude.""" token = load_token() full = evt["repo_full"] num = evt["number"] key = f"{full}#{num}" try: repo = gitea_get(f"/repos/{full}", token) issue = gitea_get(f"/repos/{full}/issues/{num}", token) owner_login = full.split("/", 1)[0] name = full.split("/", 1)[1] comments = list_comments(owner_login, name, num, token) except Exception as e: log(f" ! refetch {key}: {e!r}") return if issue.get("state") != "open": return is_pr = "pull_request" in issue and issue["pull_request"] is not None latest_time, latest_user = latest_signal(issue, comments) if latest_user == CONFIG["username"]: set_handled(conn, key, latest_time) return if get_handled_signal(conn, key) == latest_time: return log(f"[{source}] handling {key}") dispatch(repo, issue, is_pr, comments, token) set_handled(conn, key, latest_time) def poll_once(conn: sqlite3.Connection, token: str) -> None: repos = list_all_repos(token) log(f"poll: {len(repos)} repos") for repo in repos: if repo.get("archived"): continue owner = repo["owner"].get("username") or repo["owner"].get("login") name = repo["name"] full = repo["full_name"] for type_ in ("issues", "pulls"): try: items = list_open_issues(owner, name, token, type_) except urllib.error.HTTPError as e: if e.code in (403, 404): continue log(f" ! list {full} {type_}: HTTP {e.code}") continue except Exception as e: log(f" ! list {full} {type_}: {e!r}") continue for issue in items: is_pr = "pull_request" in issue and issue["pull_request"] is not None if type_ == "issues" and is_pr: continue if type_ == "pulls" and not is_pr: continue num = issue["number"] try: comments = list_comments(owner, name, num, token) except Exception as e: log(f" ! comments {full}#{num}: {e!r}") continue latest_time, latest_user = latest_signal(issue, comments) key = f"{full}#{num}" if latest_user == CONFIG["username"]: set_handled(conn, key, latest_time) continue if get_handled_signal(conn, key) == latest_time: continue enqueue_issue("poll", full, num) # ─── webhook server ────────────────────────────────────────────────────────── class WebhookHandler(BaseHTTPRequestHandler): secret: bytes = b"" db_conn: sqlite3.Connection | None = None # silence default access log; we log selectively below def log_message(self, *args, **kwargs) -> None: # noqa: D401 return def _reply(self, code: int, body: bytes = b"") -> None: self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() if body: self.wfile.write(body) def do_GET(self) -> None: # noqa: N802 if self.path == "/healthz": self._reply(200, b'{"ok":true}\n') return self._reply(404) def do_POST(self) -> None: # noqa: N802 if self.path != "/hook": self._reply(404) return length = int(self.headers.get("Content-Length") or 0) body = self.rfile.read(length) if length else b"" if self.secret: sig = self.headers.get("X-Gitea-Signature", "") mac = hmac.new(self.secret, body, hashlib.sha256).hexdigest() if not hmac.compare_digest(mac, sig): log(f"webhook: bad signature from {self.client_address[0]}") self._reply(401, b'{"error":"bad signature"}\n') return delivery = self.headers.get("X-Gitea-Delivery", "") event = self.headers.get("X-Gitea-Event", "") try: payload = json.loads(body or b"{}") except Exception: self._reply(400, b'{"error":"bad json"}\n') return # idempotency if self.db_conn is not None and is_delivered(self.db_conn, delivery): self._reply(200, b'{"ok":true,"dup":true}\n') return action = payload.get("action", "") sender = (payload.get("sender") or {}).get("login", "") repo = (payload.get("repository") or {}).get("full_name") or "" # only events we care about targets = { "issues": ("issue", "number"), "issue_comment": ("issue", "number"), "pull_request": ("pull_request", "number"), "pull_request_review": ("pull_request", "number"), "pull_request_review_comment": ("pull_request", "number"), "pull_request_comment": ("pull_request", "number"), } # skip irrelevant or self-triggered events early if event not in targets or sender == CONFIG["username"]: if self.db_conn is not None: mark_delivered(self.db_conn, delivery) self._reply(200, b'{"ok":true,"skip":true}\n') return # only act on creating/updating actions, not closing/deleting if action in {"closed", "deleted", "label_cleared", "label_updated"}: if self.db_conn is not None: mark_delivered(self.db_conn, delivery) self._reply(200, b'{"ok":true,"skip_action":true}\n') return obj_key, num_key = targets[event] obj = payload.get(obj_key) or {} number = obj.get(num_key) if not repo or not number: self._reply(400, b'{"error":"missing repo/number"}\n') return log(f"webhook: {event}/{action} {repo}#{number} by @{sender}") if self.db_conn is not None: mark_delivered(self.db_conn, delivery) enqueue_issue("webhook", repo, int(number)) self._reply(202, b'{"ok":true}\n') def start_webhook_server(conn: sqlite3.Connection) -> None: secret_path = CONFIG["webhook_secret_file"] if secret_path.exists(): secret = secret_path.read_text().strip() WebhookHandler.secret = secret.encode() log(f"webhook secret loaded from {secret_path}") else: log( f"warning: no webhook secret at {secret_path} — accepting unsigned posts" ) WebhookHandler.db_conn = conn host, port = CONFIG["webhook_listen"] srv = ThreadingHTTPServer((host, port), WebhookHandler) log(f"webhook server listening on {host}:{port} (POST /hook, GET /healthz)") t = threading.Thread(target=srv.serve_forever, name="webhook", daemon=True) t.start() def main() -> None: state_dir = CONFIG["state_dir"] state_dir.mkdir(parents=True, exist_ok=True) conn = init_state(state_dir / "state.db") log( f"gitea-bot up; base={CONFIG['base_url']} " f"interval={CONFIG['poll_interval_sec']}s " f"claude={CONFIG['claude_bin']}" ) threading.Thread( target=worker_loop, args=(conn,), name="worker", daemon=True ).start() start_webhook_server(conn) while True: try: token = load_token() poll_once(conn, token) except Exception as e: log(f"poll error: {e!r}") time.sleep(CONFIG["poll_interval_sec"]) if __name__ == "__main__": main()