#!/usr/bin/env python3 """ AI-Check 主入口 — Controller 层接口参数变更检测(纯 Python,无 Java 依赖) """ import argparse import sys from pathlib import Path from typing import Optional import yaml CHECKER_DIR = Path(__file__).resolve().parent sys.path.insert(0, str(CHECKER_DIR)) from change_logger import persist_change_log from comparator import compare_endpoints from controller_parser import ( endpoints_to_map, filter_endpoints_by_files, parse_endpoints_from_files, ) from git_utils import ( get_changed_java_controller_files, get_controller_files_diff, get_current_commit, get_file_content_at_commit, get_previous_commit_sha, ) from llm_reviewer import review_parameter_changes from notifier import send_parameter_change_notification def load_config(config_path: Path) -> dict: """加载 YAML 配置文件。""" if not config_path.exists(): print(f"[错误] 配置文件不存在: {config_path}") print("请在 .gitea/config.yaml 中填写配置并提交到仓库。") sys.exit(1) with open(config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} def _read_file_safe(path: Path) -> str: """读取文件内容。""" try: return path.read_text(encoding="utf-8", errors="ignore") except OSError as exc: print(f"[警告] 无法读取 {path}: {exc}") return "" def _load_version_contents( repo_root: Path, file_paths: list, commit_sha: Optional[str] = None, ) -> dict: """加载文件内容;commit_sha 为空则读工作区,否则 git show。""" contents = {} for fp in file_paths: norm = fp.replace("\\", "/") if commit_sha: text = get_file_content_at_commit(commit_sha, norm) if text is not None: contents[norm] = text else: text = _read_file_safe(repo_root / norm) if text: contents[norm] = text return contents def parse_changed_endpoints( repo_root: Path, source_subdir: str, changed_files: list, old_sha: str, label: str, ) -> dict: """解析变更 Controller 文件在新/旧版本的端点。""" if label == "new": contents = _load_version_contents(repo_root, changed_files) else: contents = _load_version_contents(repo_root, changed_files, commit_sha=old_sha) print(f"[AST] 解析 {label} 版本 {len(contents)} 个 Controller 文件") endpoints = parse_endpoints_from_files( repo_root, source_subdir, changed_files, contents ) print(f"[AST] {label} 版本共 {len(endpoints)} 个接口") return endpoints_to_map(endpoints) def main() -> int: """主流程入口。""" parser = argparse.ArgumentParser( description="AI-Check: Controller 接口参数变更检测" ) parser.add_argument( "--config", default=".gitea/config.yaml", help="配置文件路径" ) parser.add_argument("--repo-root", default=".", help="Git 仓库根目录") parser.add_argument("push_user", nargs="?", default=None, help="推送人") parser.add_argument("push_time", nargs="?", default=None, help="推送时间") args = parser.parse_args() repo_root = Path(args.repo_root).resolve() config_path = Path(args.config) if not config_path.is_absolute(): config_path = repo_root / config_path config = load_config(config_path) source_subdir = config.get("source_dir", "src/main/java") commit_info = get_current_commit() push_user = args.push_user or commit_info.author push_time = args.push_time or commit_info.commit_time print("Controller 接口参数变更检测(纯 Python)") print("=" * 40) print(f"推送人: {push_user}") print(f"推送时间: {push_time}") print(f"LLM 审核: {config.get('llm', {}).get('enabled', True)}") print(f"记录日志: {config.get('log', {}).get('enabled', False)}") print("=" * 40) prev_sha = get_previous_commit_sha() if prev_sha is None: print("[Git] 首次提交,无可对比版本,跳过。") return 0 changed_files = [f.replace("\\", "/") for f in get_changed_java_controller_files(prev_sha, commit_info.sha)] if not changed_files: print("[Git] 本次提交未变更 Controller 文件,跳过。") return 0 print(f"[Git] 变更 Controller 文件 {len(changed_files)} 个:") for f in changed_files: print(f" - {f}") git_diff = get_controller_files_diff(prev_sha, commit_info.sha, changed_files) new_map = parse_changed_endpoints( repo_root, source_subdir, changed_files, prev_sha, "new" ) old_map = parse_changed_endpoints( repo_root, source_subdir, changed_files, prev_sha, "old" ) new_filtered = endpoints_to_map( filter_endpoints_by_files(list(new_map.values()), changed_files) ) old_filtered = endpoints_to_map( filter_endpoints_by_files(list(old_map.values()), changed_files) ) reports = compare_endpoints(old_filtered, new_filtered) print(f"[对比] 检测到 {len(reports)} 个接口存在参数变更") llm_review = None if reports: llm_review = review_parameter_changes( reports, config, changed_files, git_diff ) if llm_review: print("[LLM] 参数变更审核完成") persist_change_log(reports, commit_info, config, llm_review) notify_cfg = config.get("notify", {}) if notify_cfg.get("only_on_change", True) and not reports: print("[通知] 无接口参数变更,跳过企微通知。") return 0 mentioned = notify_cfg.get("mentioned_users", "") mentioned_list = [u.strip() for u in mentioned.split(",") if u.strip()] or None send_parameter_change_notification( webhook_url=config.get("wecom", {}).get("webhook_url", ""), reports=reports, push_user=push_user, push_time=push_time, llm_review=llm_review, mentioned_users=mentioned_list, ) print("\n完成") return 0 if __name__ == "__main__": sys.exit(main())