From 0ce5fa990b5e1ce5ca75213fc6d4e5ab9c375db8 Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Tue, 5 May 2026 10:22:20 +0100 Subject: [PATCH] add webhook server alongside polling - POST /hook with X-Gitea-Signature HMAC-SHA256 verification - X-Gitea-Delivery dedupe via sqlite deliveries table - threaded HTTP server + single dispatch worker draining a queue - polling (60s) kept as fallback; both paths enqueue to the same worker - gitea system webhook URL: http://10.42.0.1:31390/hook (cni0 gateway) --- bot.py | 225 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 214 insertions(+), 11 deletions(-) diff --git a/bot.py b/bot.py index cf2c157..40facd6 100644 --- a/bot.py +++ b/bot.py @@ -1,34 +1,47 @@ #!/usr/bin/env python3 """gitea-bot — mochi 全站 issue/PR 自动处理。 -轮询 famzheng.me/gitea 所有 repo 的 open issue/PR; -每条"最新活动不是 mochi 自己"的条目,spawn 一个 -`claude --dangerously-skip-permissions -p `, -让 claude 用 mochi token 直接调 gitea API + git push 来回复 / 提 PR。 +双轨: +- **webhook**: 主路径,gitea system hook → http://10.42.0.1:31390/hook, + HMAC 签名验签,X-Gitea-Delivery 幂等,事件入 queue 由 worker 串行 dispatch。 +- **polling**: 兜底,每 60s 拉全站 open issue/PR,把 webhook 漏的 enqueue。 + +dispatch = spawn `claude --dangerously-skip-permissions -p `,让 claude +用 mochi token 自己调 gitea API + git push 完成回复 / 提 PR。 """ +import hashlib +import hmac import json import os +import queue import shlex import sqlite3 import subprocess import sys +import threading import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path CONFIG = { "base_url": "https://famzheng.me/gitea", "username": "mochi", "token_file": Path.home() / ".gitea-mochi-token", + "webhook_secret_file": Path.home() / ".gitea-webhook-secret", "state_dir": Path.home() / ".local/state/gitea-bot", "poll_interval_sec": 60, + "webhook_listen": ("0.0.0.0", 31390), "claude_bin": "/home/fam/.local/bin/claude", "claude_timeout_sec": 1200, } +# events from webhook + polling get pushed here; worker drains +EVENT_Q: "queue.Queue[tuple[str, dict]]" = queue.Queue(maxsize=10000) + def log(msg: str) -> None: ts = datetime.now().isoformat(timespec="seconds") @@ -99,7 +112,7 @@ def list_comments(owner: str, repo: str, number: int, token: str) -> list[dict]: def init_state(db_path: Path) -> sqlite3.Connection: db_path.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(db_path) + conn = sqlite3.connect(db_path, check_same_thread=False) conn.execute( """ CREATE TABLE IF NOT EXISTS handled ( @@ -109,10 +122,37 @@ def init_state(db_path: Path) -> sqlite3.Connection: ) """ ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS deliveries ( + id TEXT PRIMARY KEY, + ts TEXT NOT NULL + ) + """ + ) conn.commit() return conn +def is_delivered(conn: sqlite3.Connection, delivery_id: str) -> bool: + if not delivery_id: + return False + row = conn.execute( + "SELECT 1 FROM deliveries WHERE id = ?", (delivery_id,) + ).fetchone() + return row is not None + + +def mark_delivered(conn: sqlite3.Connection, delivery_id: str) -> None: + if not delivery_id: + return + conn.execute( + "INSERT OR IGNORE INTO deliveries(id, ts) VALUES (?, ?)", + (delivery_id, datetime.now(timezone.utc).isoformat()), + ) + conn.commit() + + def get_handled_signal(conn: sqlite3.Connection, key: str) -> str | None: row = conn.execute( "SELECT last_signal FROM handled WHERE key = ?", (key,) @@ -293,13 +333,61 @@ def dispatch( log(f" → dispatch error: {e!r}") +def enqueue_issue(source: str, repo_full: str, issue_number: int) -> None: + """Push (source, {repo_full, number}) onto EVENT_Q for the worker to drain. + Worker re-fetches fresh state — no stale issue/comment payloads in queue.""" + try: + EVENT_Q.put_nowait((source, {"repo_full": repo_full, "number": issue_number})) + except queue.Full: + log(f"event queue FULL, dropping {source} {repo_full}#{issue_number}") + + +def worker_loop(conn: sqlite3.Connection) -> None: + log("dispatch worker started") + while True: + source, evt = EVENT_Q.get() + try: + handle_event(conn, source, evt) + except Exception as e: + log(f"worker error on {source} {evt}: {e!r}") + + +def handle_event(conn: sqlite3.Connection, source: str, evt: dict) -> None: + """Re-fetch repo + issue + comments, decide whether to dispatch claude.""" + token = load_token() + full = evt["repo_full"] + num = evt["number"] + key = f"{full}#{num}" + try: + repo = gitea_get(f"/repos/{full}", token) + issue = gitea_get(f"/repos/{full}/issues/{num}", token) + owner_login = full.split("/", 1)[0] + name = full.split("/", 1)[1] + comments = list_comments(owner_login, name, num, token) + except Exception as e: + log(f" ! refetch {key}: {e!r}") + return + if issue.get("state") != "open": + return + is_pr = "pull_request" in issue and issue["pull_request"] is not None + latest_time, latest_user = latest_signal(issue, comments) + if latest_user == CONFIG["username"]: + set_handled(conn, key, latest_time) + return + if get_handled_signal(conn, key) == latest_time: + return + log(f"[{source}] handling {key}") + dispatch(repo, issue, is_pr, comments, token) + set_handled(conn, key, latest_time) + + def poll_once(conn: sqlite3.Connection, token: str) -> None: repos = list_all_repos(token) - log(f"polling {len(repos)} repos") + log(f"poll: {len(repos)} repos") for repo in repos: if repo.get("archived"): continue - owner = repo["owner"]["username"] if "username" in repo["owner"] else repo["owner"]["login"] + owner = repo["owner"].get("username") or repo["owner"].get("login") name = repo["name"] full = repo["full_name"] for type_ in ("issues", "pulls"): @@ -324,17 +412,126 @@ def poll_once(conn: sqlite3.Connection, token: str) -> None: comments = list_comments(owner, name, num, token) except Exception as e: log(f" ! comments {full}#{num}: {e!r}") - comments = [] + continue latest_time, latest_user = latest_signal(issue, comments) key = f"{full}#{num}" if latest_user == CONFIG["username"]: - # last activity is mochi's own — record & skip set_handled(conn, key, latest_time) continue if get_handled_signal(conn, key) == latest_time: continue - dispatch(repo, issue, is_pr, comments, token) - set_handled(conn, key, latest_time) + enqueue_issue("poll", full, num) + + +# ─── webhook server ────────────────────────────────────────────────────────── + + +class WebhookHandler(BaseHTTPRequestHandler): + secret: bytes = b"" + db_conn: sqlite3.Connection | None = None + + # silence default access log; we log selectively below + def log_message(self, *args, **kwargs) -> None: # noqa: D401 + return + + def _reply(self, code: int, body: bytes = b"") -> None: + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + if body: + self.wfile.write(body) + + def do_GET(self) -> None: # noqa: N802 + if self.path == "/healthz": + self._reply(200, b'{"ok":true}\n') + return + self._reply(404) + + def do_POST(self) -> None: # noqa: N802 + if self.path != "/hook": + self._reply(404) + return + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length) if length else b"" + + if self.secret: + sig = self.headers.get("X-Gitea-Signature", "") + mac = hmac.new(self.secret, body, hashlib.sha256).hexdigest() + if not hmac.compare_digest(mac, sig): + log(f"webhook: bad signature from {self.client_address[0]}") + self._reply(401, b'{"error":"bad signature"}\n') + return + + delivery = self.headers.get("X-Gitea-Delivery", "") + event = self.headers.get("X-Gitea-Event", "") + try: + payload = json.loads(body or b"{}") + except Exception: + self._reply(400, b'{"error":"bad json"}\n') + return + + # idempotency + if self.db_conn is not None and is_delivered(self.db_conn, delivery): + self._reply(200, b'{"ok":true,"dup":true}\n') + return + + action = payload.get("action", "") + sender = (payload.get("sender") or {}).get("login", "") + repo = (payload.get("repository") or {}).get("full_name") or "" + + # only events we care about + targets = { + "issues": ("issue", "number"), + "issue_comment": ("issue", "number"), + "pull_request": ("pull_request", "number"), + "pull_request_review": ("pull_request", "number"), + "pull_request_review_comment": ("pull_request", "number"), + "pull_request_comment": ("pull_request", "number"), + } + # skip irrelevant or self-triggered events early + if event not in targets or sender == CONFIG["username"]: + if self.db_conn is not None: + mark_delivered(self.db_conn, delivery) + self._reply(200, b'{"ok":true,"skip":true}\n') + return + # only act on creating/updating actions, not closing/deleting + if action in {"closed", "deleted", "label_cleared", "label_updated"}: + if self.db_conn is not None: + mark_delivered(self.db_conn, delivery) + self._reply(200, b'{"ok":true,"skip_action":true}\n') + return + + obj_key, num_key = targets[event] + obj = payload.get(obj_key) or {} + number = obj.get(num_key) + if not repo or not number: + self._reply(400, b'{"error":"missing repo/number"}\n') + return + + log(f"webhook: {event}/{action} {repo}#{number} by @{sender}") + if self.db_conn is not None: + mark_delivered(self.db_conn, delivery) + enqueue_issue("webhook", repo, int(number)) + self._reply(202, b'{"ok":true}\n') + + +def start_webhook_server(conn: sqlite3.Connection) -> None: + secret_path = CONFIG["webhook_secret_file"] + if secret_path.exists(): + secret = secret_path.read_text().strip() + WebhookHandler.secret = secret.encode() + log(f"webhook secret loaded from {secret_path}") + else: + log( + f"warning: no webhook secret at {secret_path} — accepting unsigned posts" + ) + WebhookHandler.db_conn = conn + host, port = CONFIG["webhook_listen"] + srv = ThreadingHTTPServer((host, port), WebhookHandler) + log(f"webhook server listening on {host}:{port} (POST /hook, GET /healthz)") + t = threading.Thread(target=srv.serve_forever, name="webhook", daemon=True) + t.start() def main() -> None: @@ -346,6 +543,12 @@ def main() -> None: f"interval={CONFIG['poll_interval_sec']}s " f"claude={CONFIG['claude_bin']}" ) + + threading.Thread( + target=worker_loop, args=(conn,), name="worker", daemon=True + ).start() + start_webhook_server(conn) + while True: try: token = load_token()