脚本修改
Some checks failed
API接口参数变更检测 / api-param-check (push) Has been cancelled

This commit is contained in:
2026-06-03 15:33:24 +08:00
parent 6db621a137
commit 2c20a26af8
15 changed files with 136 additions and 897 deletions

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple
from controller_parser import ApiEndpoint, ApiParameter
from models import ApiEndpoint, ApiParameter
class ChangeType(str, Enum):

View File

@@ -1,114 +1,43 @@
"""
Controller 端点解析模块。
调用 Java AST 解析器 JAR将 Java 源码转为结构化的 API 端点列表。
Controller 端点解析模块(纯 Python无需 Java
"""
import json
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List
@dataclass
class ApiParameter:
"""单个接口参数。"""
name: str
type: str
required: bool = True
source: str = "query"
description: Optional[str] = None
@dataclass
class ApiEndpoint:
"""单个 Controller 接口端点。"""
http_method: str
uri: str
controller_class: str
method_name: str
source_file: str
parameters: List[ApiParameter] = field(default_factory=list)
@property
def endpoint_key(self) -> str:
"""唯一标识HTTP 方法 + URI用于跨版本匹配。"""
return f"{self.http_method} {self.uri}"
def run_java_parser(source_dir: Path, jar_path: Path, output_json: Path) -> List[ApiEndpoint]:
"""
调用 JavaParser JAR 扫描源码目录,返回解析结果。
:param source_dir: Java 源码根目录
:param jar_path: controller-parser JAR 路径
:param output_json: 临时 JSON 输出路径
:return: ApiEndpoint 列表
:raises RuntimeError: Java 进程失败或 JAR 不存在
"""
if not jar_path.exists():
raise RuntimeError(
f"Java 解析器 JAR 不存在: {jar_path}\n"
"请先在 .gitea/java-parser 目录执行: mvn -q package"
)
cmd = ["java", "-jar", str(jar_path), str(source_dir), str(output_json)]
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8")
if result.returncode != 0:
raise RuntimeError(f"Java 解析器执行失败:\n{result.stderr}")
with open(output_json, "r", encoding="utf-8") as f:
raw = json.load(f)
return [_dict_to_endpoint(item) for item in raw]
def _dict_to_endpoint(data: dict) -> ApiEndpoint:
"""将 JSON 字典转换为 ApiEndpoint 对象。"""
params = [
ApiParameter(
name=p.get("name", ""),
type=p.get("type", ""),
required=p.get("required", True),
source=p.get("source", "query"),
description=p.get("description"),
)
for p in data.get("parameters", [])
]
return ApiEndpoint(
http_method=data.get("httpMethod", "GET"),
uri=data.get("uri", "/"),
controller_class=data.get("controllerClass", ""),
method_name=data.get("methodName", ""),
source_file=data.get("sourceFile", ""),
parameters=params,
)
from models import ApiEndpoint, ApiParameter
def endpoints_to_map(endpoints: List[ApiEndpoint]) -> Dict[str, ApiEndpoint]:
"""
将端点列表转为字典key 为 endpoint_key。
:param endpoints: 端点列表
:return: { "GET /api/users/{id}": ApiEndpoint, ... }
"""
"""端点列表转字典key 为 endpoint_key。"""
return {ep.endpoint_key: ep for ep in endpoints}
def filter_endpoints_by_files(
endpoints: List[ApiEndpoint], changed_files: List[str]
) -> List[ApiEndpoint]:
"""
仅保留源文件在变更列表中的端点(缩小对比范围)。
:param endpoints: 全部端点
:param changed_files: 变更文件相对路径列表
:return: 过滤后的端点
"""
"""仅保留变更文件中的端点。"""
if not changed_files:
return endpoints
changed_set = set(changed_files)
changed_set = {f.replace("\\", "/") for f in changed_files}
return [ep for ep in endpoints if ep.source_file in changed_set]
def parse_endpoints_from_files(
repo_root: Path,
source_subdir: str,
file_paths: List[str],
file_contents: Dict[str, str],
) -> List[ApiEndpoint]:
"""
解析指定 Controller 文件,提取接口参数(仅解析传入文件,不全量扫描)。
:param repo_root: 仓库根
:param source_subdir: 源码目录(相对仓库根)
:param file_paths: 文件路径列表
:param file_contents: 路径 -> 源码内容
:return: ApiEndpoint 列表
"""
from controller_ast_parser import parse_controller_files
return parse_controller_files(repo_root, source_subdir, file_paths, file_contents)

View File

@@ -122,6 +122,20 @@ def get_controller_files_diff(base_sha: str, head_sha: str, changed_files: List[
return ""
def get_file_content_at_commit(commit_sha: str, file_path: str) -> Optional[str]:
"""
读取指定 commit 下某个文件的内容(无需 git worktree更快
:param commit_sha: commit SHA
:param file_path: 相对仓库根目录的文件路径
:return: 文件内容;该 commit 中不存在则返回 None
"""
try:
return run_git(["show", f"{commit_sha}:{file_path}"])
except RuntimeError:
return None
def prepare_worktrees(repo_root: Path) -> tuple:
"""
准备新旧两个版本的代码工作目录,供 AST 解析器分别扫描。

View File

@@ -1,23 +1,12 @@
#!/usr/bin/env python3
"""
AI-Check 主入口 — Controller 层接口参数变更检测
流程(对齐第一版需求):
1. Git 检出新旧代码
2. JavaParser AST 解析 Controller 方法签名与参数
3. 对比增 / 删 / 改 / 重命名
4. 可选LLM 审核参数变更结果
5. (可选)写入日志
6. 企业微信通知URI + 参数变更明细)
用法:
python .gitea/checker/main.py [--config .gitea/config.yaml] [--repo-root .] [推送人] [推送时间]
AI-Check 主入口 — Controller 层接口参数变更检测(纯 Python无 Java 依赖)
"""
import argparse
import sys
import tempfile
from pathlib import Path
from typing import Optional
import yaml
@@ -29,14 +18,14 @@ from comparator import compare_endpoints
from controller_parser import (
endpoints_to_map,
filter_endpoints_by_files,
run_java_parser,
parse_endpoints_from_files,
)
from git_utils import (
get_changed_java_controller_files,
get_controller_files_diff,
get_current_commit,
get_file_content_at_commit,
get_previous_commit_sha,
prepare_worktrees,
)
from llm_reviewer import review_parameter_changes
from notifier import send_parameter_change_notification
@@ -53,20 +42,53 @@ def load_config(config_path: Path) -> dict:
return yaml.safe_load(f) or {}
def parse_endpoints_for_version(
def _read_file_safe(path: Path) -> str:
"""读取文件内容。"""
try:
return path.read_text(encoding="utf-8", errors="ignore")
except OSError as exc:
print(f"[警告] 无法读取 {path}: {exc}")
return ""
def _load_version_contents(
repo_root: Path,
file_paths: list,
commit_sha: Optional[str] = None,
) -> dict:
"""加载文件内容commit_sha 为空则读工作区,否则 git show。"""
contents = {}
for fp in file_paths:
norm = fp.replace("\\", "/")
if commit_sha:
text = get_file_content_at_commit(commit_sha, norm)
if text is not None:
contents[norm] = text
else:
text = _read_file_safe(repo_root / norm)
if text:
contents[norm] = text
return contents
def parse_changed_endpoints(
repo_root: Path,
source_subdir: str,
jar_path: Path,
tmp_dir: Path,
changed_files: list,
old_sha: str,
label: str,
) -> dict:
"""对指定版本源码运行 Java AST 解析,提取 Controller 接口参数"""
source_dir = repo_root / source_subdir
output_json = tmp_dir / f"endpoints_{label}.json"
"""解析变更 Controller 文件在新/旧版本的端点"""
if label == "new":
contents = _load_version_contents(repo_root, changed_files)
else:
contents = _load_version_contents(repo_root, changed_files, commit_sha=old_sha)
print(f"[AST] 扫描 {label} 版本: {source_dir}")
endpoints = run_java_parser(source_dir, jar_path, output_json)
print(f"[AST] {label} 版本共 {len(endpoints)} 个 Controller 接口")
print(f"[AST] 解析 {label} 版本 {len(contents)} 个 Controller 文件")
endpoints = parse_endpoints_from_files(
repo_root, source_subdir, changed_files, contents
)
print(f"[AST] {label} 版本共 {len(endpoints)} 个接口")
return endpoints_to_map(endpoints)
@@ -75,25 +97,27 @@ def main() -> int:
parser = argparse.ArgumentParser(
description="AI-Check: Controller 接口参数变更检测"
)
parser.add_argument("--config", default=".gitea/config.yaml", help="配置文件路径(相对仓库根目录)")
parser.add_argument(
"--config", default=".gitea/config.yaml", help="配置文件路径"
)
parser.add_argument("--repo-root", default=".", help="Git 仓库根目录")
parser.add_argument("push_user", nargs="?", default=None, help="推送人CI 传入)")
parser.add_argument("push_time", nargs="?", default=None, help="推送时间CI 传入)")
parser.add_argument("push_user", nargs="?", default=None, help="推送人")
parser.add_argument("push_time", nargs="?", default=None, help="推送时间")
args = parser.parse_args()
repo_root = Path(args.repo_root).resolve()
config = load_config(repo_root / args.config)
config_path = Path(args.config)
if not config_path.is_absolute():
config_path = repo_root / config_path
config = load_config(config_path)
source_subdir = config.get("source_dir", "src/main/java")
jar_path = repo_root / config.get(
"java_parser_jar", ".gitea/java-parser/target/controller-parser-1.0.0.jar"
)
commit_info = get_current_commit()
push_user = args.push_user or commit_info.author
push_time = args.push_time or commit_info.commit_time
print("Controller 接口参数变更检测")
print("Controller 接口参数变更检测(纯 Python")
print("=" * 40)
print(f"推送人: {push_user}")
print(f"推送时间: {push_time}")
@@ -116,57 +140,44 @@ def main() -> int:
print(f" - {f}")
git_diff = get_controller_files_diff(prev_sha, commit_info.sha, changed_files)
current_dir, prev_dir, _ = prepare_worktrees(repo_root)
reports = []
new_map = parse_changed_endpoints(
repo_root, source_subdir, changed_files, prev_sha, "new"
)
old_map = parse_changed_endpoints(
repo_root, source_subdir, changed_files, prev_sha, "old"
)
new_filtered = endpoints_to_map(
filter_endpoints_by_files(list(new_map.values()), changed_files)
)
old_filtered = endpoints_to_map(
filter_endpoints_by_files(list(old_map.values()), changed_files)
)
reports = compare_endpoints(old_filtered, new_filtered)
print(f"[对比] 检测到 {len(reports)} 个接口存在参数变更")
llm_review = None
with tempfile.TemporaryDirectory(prefix="ai-check-") as tmp:
tmp_dir = Path(tmp)
# 1. AST 解析 + 参数对比
new_map = parse_endpoints_for_version(
current_dir, source_subdir, jar_path, tmp_dir, "new"
)
old_map = parse_endpoints_for_version(
prev_dir, source_subdir, jar_path, tmp_dir, "old"
if reports:
llm_review = review_parameter_changes(
reports, config, changed_files, git_diff
)
if llm_review:
print("[LLM] 参数变更审核完成")
new_filtered = endpoints_to_map(
filter_endpoints_by_files(list(new_map.values()), changed_files)
)
old_filtered = endpoints_to_map(
filter_endpoints_by_files(list(old_map.values()), changed_files)
)
reports = compare_endpoints(old_filtered, new_filtered)
print(f"[对比] 检测到 {len(reports)} 个接口存在参数变更")
# 2. LLM 审核接口参数变更(非代码审查)
if reports:
llm_review = review_parameter_changes(
reports, config, changed_files, git_diff
)
if llm_review:
print(f"[LLM] 参数变更审核完成")
# 3. 写日志(开关控制)
persist_change_log(reports, commit_info, config, llm_review)
# 4. 企微通知
notify_cfg = config.get("notify", {})
only_on_change = notify_cfg.get("only_on_change", True)
if only_on_change and not reports:
if notify_cfg.get("only_on_change", True) and not reports:
print("[通知] 无接口参数变更,跳过企微通知。")
return 0
mentioned = notify_cfg.get("mentioned_users", "")
mentioned_list = [u.strip() for u in mentioned.split(",") if u.strip()] or None
wecom_cfg = config.get("wecom", {})
send_parameter_change_notification(
webhook_url=wecom_cfg.get("webhook_url", ""),
webhook_url=config.get("wecom", {}).get("webhook_url", ""),
reports=reports,
push_user=push_user,
push_time=push_time,

View File

@@ -1,3 +1,4 @@
# Python 依赖(CI 主流程
# Python 依赖(纯 Python AST 解析,无需 Java
PyYAML>=6.0.1
requests>=2.31.0
javalang>=0.13.0