import base64 import requests import config def _headers() -> dict: return { "Authorization": f"token {config.GITEA_TOKEN}", "Content-Type": "application/json", } def get_file(owner: str, repo: str, path: str, branch: str) -> tuple[str, str]: """Fetch a file's decoded content and its SHA from Gitea. Returns: (content, sha) where content is the decoded UTF-8 string and sha is required for the subsequent update call. """ url = f"{config.GITEA_URL}/api/v1/repos/{owner}/{repo}/contents/{path}" response = requests.get(url, headers=_headers(), params={"ref": branch}, timeout=30) response.raise_for_status() data = response.json() content = base64.b64decode(data["content"]).decode("utf-8") return content, data["sha"] def update_file( owner: str, repo: str, path: str, content: str, sha: str, branch: str, commit_message: str | None = None, ) -> None: """Write updated file content back to Gitea.""" url = f"{config.GITEA_URL}/api/v1/repos/{owner}/{repo}/contents/{path}" payload = { "message": commit_message or f"BLIGHT: process triggers in {path}", "content": base64.b64encode(content.encode("utf-8")).decode("ascii"), "sha": sha, "branch": branch, } response = requests.put(url, headers=_headers(), json=payload, timeout=30) response.raise_for_status()