178 lines
6.8 KiB
Python
178 lines
6.8 KiB
Python
import re
|
|
import httpx
|
|
from pathlib import Path
|
|
|
|
from config import GITEA_URL, GITEA_TOKEN
|
|
|
|
|
|
class GiteaClient:
|
|
def __init__(self):
|
|
self.base = GITEA_URL + "/api/v1"
|
|
self.headers = {
|
|
"Authorization": f"token {GITEA_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async def get_issue(self, owner: str, repo: str, number: int) -> dict:
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/{number}",
|
|
headers=self.headers,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
async def get_comments(self, owner: str, repo: str, number: int) -> list[dict]:
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/{number}/comments",
|
|
headers=self.headers,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
async def create_comment(self, owner: str, repo: str, number: int, body: str):
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.post(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/{number}/comments",
|
|
headers=self.headers,
|
|
json={"body": body},
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
async def add_reaction(self, owner: str, repo: str, comment_id: int, reaction: str):
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.post(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/comments/{comment_id}/reactions",
|
|
headers=self.headers,
|
|
json={"content": reaction},
|
|
)
|
|
return r.status_code < 400
|
|
|
|
async def get_pull_request(self, owner: str, repo: str, number: int) -> dict:
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/pulls/{number}",
|
|
headers=self.headers,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
async def get_issue_attachments(self, owner: str, repo: str, number: int) -> list[dict]:
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/{number}/assets",
|
|
headers=self.headers,
|
|
)
|
|
if r.status_code != 200:
|
|
return []
|
|
return r.json() or []
|
|
|
|
async def get_comment_attachments(self, owner: str, repo: str, comment_id: int) -> list[dict]:
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/issues/comments/{comment_id}/assets",
|
|
headers=self.headers,
|
|
)
|
|
if r.status_code != 200:
|
|
return []
|
|
return r.json() or []
|
|
|
|
async def download_attachment(self, url: str, dest: Path):
|
|
async with httpx.AsyncClient(follow_redirects=True) as c:
|
|
r = await c.get(url, headers={"Authorization": f"token {GITEA_TOKEN}"})
|
|
r.raise_for_status()
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(r.content)
|
|
|
|
async def collect_attachments(
|
|
self, owner: str, repo: str, number: int, comments: list[dict], dest_dir: Path
|
|
) -> list[dict]:
|
|
"""Download all attachments for an issue and its comments. Returns metadata list."""
|
|
downloaded = []
|
|
|
|
# Issue-level attachments
|
|
assets = await self.get_issue_attachments(owner, repo, number)
|
|
for a in assets:
|
|
fpath = dest_dir / a["name"]
|
|
await self.download_attachment(a["browser_download_url"], fpath)
|
|
downloaded.append({"name": a["name"], "path": str(fpath)})
|
|
|
|
# Comment-level attachments
|
|
for c in comments:
|
|
c_assets = await self.get_comment_attachments(owner, repo, c["id"])
|
|
for a in c_assets:
|
|
fpath = dest_dir / f"comment_{c['id']}_{a['name']}"
|
|
await self.download_attachment(a["browser_download_url"], fpath)
|
|
downloaded.append({"name": a["name"], "path": str(fpath)})
|
|
|
|
# Also find inline image URLs in issue body / comments and download them
|
|
texts = [c.get("body", "") for c in comments]
|
|
inline = await self._download_inline_images(texts, dest_dir)
|
|
downloaded.extend(inline)
|
|
|
|
return downloaded
|
|
|
|
async def list_org_repos(self, org: str) -> list[dict]:
|
|
repos = []
|
|
page = 1
|
|
async with httpx.AsyncClient() as c:
|
|
while True:
|
|
r = await c.get(
|
|
f"{self.base}/orgs/{org}/repos?page={page}&limit=50",
|
|
headers=self.headers,
|
|
)
|
|
r.raise_for_status()
|
|
batch = r.json()
|
|
if not batch:
|
|
break
|
|
repos.extend(batch)
|
|
page += 1
|
|
return repos
|
|
|
|
async def ensure_webhook(self, owner: str, repo: str, webhook_url: str):
|
|
"""Ensure a webhook exists for this repo pointing to webhook_url."""
|
|
async with httpx.AsyncClient() as c:
|
|
r = await c.get(
|
|
f"{self.base}/repos/{owner}/{repo}/hooks",
|
|
headers=self.headers,
|
|
)
|
|
if r.status_code == 200:
|
|
for h in r.json():
|
|
if h.get("config", {}).get("url") == webhook_url:
|
|
return False # already exists
|
|
r = await c.post(
|
|
f"{self.base}/repos/{owner}/{repo}/hooks",
|
|
headers=self.headers,
|
|
json={
|
|
"type": "gitea",
|
|
"active": True,
|
|
"events": ["issues", "issue_comment", "pull_request", "pull_request_comment"],
|
|
"config": {"url": webhook_url, "content_type": "json", "secret": ""},
|
|
},
|
|
)
|
|
r.raise_for_status()
|
|
return True # created
|
|
|
|
async def _download_inline_images(self, texts: list[str], dest_dir: Path) -> list[dict]:
|
|
downloaded = []
|
|
seen = set()
|
|
pattern = re.compile(r"!\[.*?\]\(((?:/attachments/|" + re.escape(GITEA_URL) + r"/attachments/)([a-f0-9\-]+))\)")
|
|
for text in texts:
|
|
for m in pattern.finditer(text or ""):
|
|
url = m.group(1)
|
|
uid = m.group(2)
|
|
if uid in seen:
|
|
continue
|
|
seen.add(uid)
|
|
if url.startswith("/"):
|
|
url = GITEA_URL + url
|
|
fpath = dest_dir / f"inline_{uid}"
|
|
try:
|
|
await self.download_attachment(url, fpath)
|
|
downloaded.append({"name": uid, "path": str(fpath)})
|
|
except Exception:
|
|
pass
|
|
return downloaded
|