add webhook server alongside polling

- POST /hook with X-Gitea-Signature HMAC-SHA256 verification
- X-Gitea-Delivery dedupe via sqlite deliveries table
- threaded HTTP server + single dispatch worker draining a queue
- polling (60s) kept as fallback; both paths enqueue to the same worker
- gitea system webhook URL: http://10.42.0.1:31390/hook (cni0 gateway)
This commit is contained in:
Fam Zheng
2026-05-05 10:22:20 +01:00
parent f83ebf5854
commit 0ce5fa990b
+214 -11
View File
@@ -1,34 +1,47 @@
#!/usr/bin/env python3
"""gitea-bot — mochi 全站 issue/PR 自动处理。
轮询 famzheng.me/gitea 所有 repo 的 open issue/PR
每条"最新活动不是 mochi 自己"的条目,spawn 一个
`claude --dangerously-skip-permissions -p <prompt>`
让 claude 用 mochi token 直接调 gitea API + git push 来回复 / 提 PR
双轨:
- **webhook**: 主路径,gitea system hook → http://10.42.0.1:31390/hook
HMAC 签名验签,X-Gitea-Delivery 幂等,事件入 queue 由 worker 串行 dispatch。
- **polling**: 兜底,每 60s 拉全站 open issue/PR,把 webhook 漏的 enqueue
dispatch = spawn `claude --dangerously-skip-permissions -p <prompt>`,让 claude
用 mochi token 自己调 gitea API + git push 完成回复 / 提 PR。
"""
import hashlib
import hmac
import json
import os
import queue
import shlex
import sqlite3
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
CONFIG = {
"base_url": "https://famzheng.me/gitea",
"username": "mochi",
"token_file": Path.home() / ".gitea-mochi-token",
"webhook_secret_file": Path.home() / ".gitea-webhook-secret",
"state_dir": Path.home() / ".local/state/gitea-bot",
"poll_interval_sec": 60,
"webhook_listen": ("0.0.0.0", 31390),
"claude_bin": "/home/fam/.local/bin/claude",
"claude_timeout_sec": 1200,
}
# events from webhook + polling get pushed here; worker drains
EVENT_Q: "queue.Queue[tuple[str, dict]]" = queue.Queue(maxsize=10000)
def log(msg: str) -> None:
ts = datetime.now().isoformat(timespec="seconds")
@@ -99,7 +112,7 @@ def list_comments(owner: str, repo: str, number: int, token: str) -> list[dict]:
def init_state(db_path: Path) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(db_path, check_same_thread=False)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS handled (
@@ -109,10 +122,37 @@ def init_state(db_path: Path) -> sqlite3.Connection:
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS deliveries (
id TEXT PRIMARY KEY,
ts TEXT NOT NULL
)
"""
)
conn.commit()
return conn
def is_delivered(conn: sqlite3.Connection, delivery_id: str) -> bool:
if not delivery_id:
return False
row = conn.execute(
"SELECT 1 FROM deliveries WHERE id = ?", (delivery_id,)
).fetchone()
return row is not None
def mark_delivered(conn: sqlite3.Connection, delivery_id: str) -> None:
if not delivery_id:
return
conn.execute(
"INSERT OR IGNORE INTO deliveries(id, ts) VALUES (?, ?)",
(delivery_id, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
def get_handled_signal(conn: sqlite3.Connection, key: str) -> str | None:
row = conn.execute(
"SELECT last_signal FROM handled WHERE key = ?", (key,)
@@ -293,13 +333,61 @@ def dispatch(
log(f" → dispatch error: {e!r}")
def enqueue_issue(source: str, repo_full: str, issue_number: int) -> None:
"""Push (source, {repo_full, number}) onto EVENT_Q for the worker to drain.
Worker re-fetches fresh state — no stale issue/comment payloads in queue."""
try:
EVENT_Q.put_nowait((source, {"repo_full": repo_full, "number": issue_number}))
except queue.Full:
log(f"event queue FULL, dropping {source} {repo_full}#{issue_number}")
def worker_loop(conn: sqlite3.Connection) -> None:
log("dispatch worker started")
while True:
source, evt = EVENT_Q.get()
try:
handle_event(conn, source, evt)
except Exception as e:
log(f"worker error on {source} {evt}: {e!r}")
def handle_event(conn: sqlite3.Connection, source: str, evt: dict) -> None:
"""Re-fetch repo + issue + comments, decide whether to dispatch claude."""
token = load_token()
full = evt["repo_full"]
num = evt["number"]
key = f"{full}#{num}"
try:
repo = gitea_get(f"/repos/{full}", token)
issue = gitea_get(f"/repos/{full}/issues/{num}", token)
owner_login = full.split("/", 1)[0]
name = full.split("/", 1)[1]
comments = list_comments(owner_login, name, num, token)
except Exception as e:
log(f" ! refetch {key}: {e!r}")
return
if issue.get("state") != "open":
return
is_pr = "pull_request" in issue and issue["pull_request"] is not None
latest_time, latest_user = latest_signal(issue, comments)
if latest_user == CONFIG["username"]:
set_handled(conn, key, latest_time)
return
if get_handled_signal(conn, key) == latest_time:
return
log(f"[{source}] handling {key}")
dispatch(repo, issue, is_pr, comments, token)
set_handled(conn, key, latest_time)
def poll_once(conn: sqlite3.Connection, token: str) -> None:
repos = list_all_repos(token)
log(f"polling {len(repos)} repos")
log(f"poll: {len(repos)} repos")
for repo in repos:
if repo.get("archived"):
continue
owner = repo["owner"]["username"] if "username" in repo["owner"] else repo["owner"]["login"]
owner = repo["owner"].get("username") or repo["owner"].get("login")
name = repo["name"]
full = repo["full_name"]
for type_ in ("issues", "pulls"):
@@ -324,17 +412,126 @@ def poll_once(conn: sqlite3.Connection, token: str) -> None:
comments = list_comments(owner, name, num, token)
except Exception as e:
log(f" ! comments {full}#{num}: {e!r}")
comments = []
continue
latest_time, latest_user = latest_signal(issue, comments)
key = f"{full}#{num}"
if latest_user == CONFIG["username"]:
# last activity is mochi's own — record & skip
set_handled(conn, key, latest_time)
continue
if get_handled_signal(conn, key) == latest_time:
continue
dispatch(repo, issue, is_pr, comments, token)
set_handled(conn, key, latest_time)
enqueue_issue("poll", full, num)
# ─── webhook server ──────────────────────────────────────────────────────────
class WebhookHandler(BaseHTTPRequestHandler):
secret: bytes = b""
db_conn: sqlite3.Connection | None = None
# silence default access log; we log selectively below
def log_message(self, *args, **kwargs) -> None: # noqa: D401
return
def _reply(self, code: int, body: bytes = b"") -> None:
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
if body:
self.wfile.write(body)
def do_GET(self) -> None: # noqa: N802
if self.path == "/healthz":
self._reply(200, b'{"ok":true}\n')
return
self._reply(404)
def do_POST(self) -> None: # noqa: N802
if self.path != "/hook":
self._reply(404)
return
length = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(length) if length else b""
if self.secret:
sig = self.headers.get("X-Gitea-Signature", "")
mac = hmac.new(self.secret, body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(mac, sig):
log(f"webhook: bad signature from {self.client_address[0]}")
self._reply(401, b'{"error":"bad signature"}\n')
return
delivery = self.headers.get("X-Gitea-Delivery", "")
event = self.headers.get("X-Gitea-Event", "")
try:
payload = json.loads(body or b"{}")
except Exception:
self._reply(400, b'{"error":"bad json"}\n')
return
# idempotency
if self.db_conn is not None and is_delivered(self.db_conn, delivery):
self._reply(200, b'{"ok":true,"dup":true}\n')
return
action = payload.get("action", "")
sender = (payload.get("sender") or {}).get("login", "")
repo = (payload.get("repository") or {}).get("full_name") or ""
# only events we care about
targets = {
"issues": ("issue", "number"),
"issue_comment": ("issue", "number"),
"pull_request": ("pull_request", "number"),
"pull_request_review": ("pull_request", "number"),
"pull_request_review_comment": ("pull_request", "number"),
"pull_request_comment": ("pull_request", "number"),
}
# skip irrelevant or self-triggered events early
if event not in targets or sender == CONFIG["username"]:
if self.db_conn is not None:
mark_delivered(self.db_conn, delivery)
self._reply(200, b'{"ok":true,"skip":true}\n')
return
# only act on creating/updating actions, not closing/deleting
if action in {"closed", "deleted", "label_cleared", "label_updated"}:
if self.db_conn is not None:
mark_delivered(self.db_conn, delivery)
self._reply(200, b'{"ok":true,"skip_action":true}\n')
return
obj_key, num_key = targets[event]
obj = payload.get(obj_key) or {}
number = obj.get(num_key)
if not repo or not number:
self._reply(400, b'{"error":"missing repo/number"}\n')
return
log(f"webhook: {event}/{action} {repo}#{number} by @{sender}")
if self.db_conn is not None:
mark_delivered(self.db_conn, delivery)
enqueue_issue("webhook", repo, int(number))
self._reply(202, b'{"ok":true}\n')
def start_webhook_server(conn: sqlite3.Connection) -> None:
secret_path = CONFIG["webhook_secret_file"]
if secret_path.exists():
secret = secret_path.read_text().strip()
WebhookHandler.secret = secret.encode()
log(f"webhook secret loaded from {secret_path}")
else:
log(
f"warning: no webhook secret at {secret_path} — accepting unsigned posts"
)
WebhookHandler.db_conn = conn
host, port = CONFIG["webhook_listen"]
srv = ThreadingHTTPServer((host, port), WebhookHandler)
log(f"webhook server listening on {host}:{port} (POST /hook, GET /healthz)")
t = threading.Thread(target=srv.serve_forever, name="webhook", daemon=True)
t.start()
def main() -> None:
@@ -346,6 +543,12 @@ def main() -> None:
f"interval={CONFIG['poll_interval_sec']}s "
f"claude={CONFIG['claude_bin']}"
)
threading.Thread(
target=worker_loop, args=(conn,), name="worker", daemon=True
).start()
start_webhook_server(conn)
while True:
try:
token = load_token()