#!/usr/bin/env python3 """ OpenWebUI 社区统计工具 获取并统计你在 openwebui.com 上发布的插件/帖子数据。 使用方法: 1. 设置环境变量: - OPENWEBUI_API_KEY: 你的 API Key - OPENWEBUI_USER_ID: 你的用户 ID 2. 运行: python scripts/openwebui_stats.py 获取 API Key: 访问 https://openwebui.com/settings/api 创建 API Key (sk-开头) 获取 User ID: 从个人主页的 API 请求中获取,格式如: b15d1348-4347-42b4-b815-e053342d6cb0 """ import os import json import requests from datetime import datetime, timezone, timedelta from typing import Optional from pathlib import Path # 北京时区 (UTC+8) BEIJING_TZ = timezone(timedelta(hours=8)) def get_beijing_time() -> datetime: """获取当前北京时间""" return datetime.now(BEIJING_TZ) # 尝试加载 .env 文件 try: from dotenv import load_dotenv load_dotenv() except ImportError: pass class OpenWebUIStats: """OpenWebUI 社区统计工具""" BASE_URL = "https://api.openwebui.com/api/v1" def __init__(self, api_key: str, user_id: Optional[str] = None): """ 初始化统计工具 Args: api_key: OpenWebUI API Key (JWT Token) user_id: 用户 ID,如果为 None 则从 token 中解析 """ self.api_key = api_key self.user_id = user_id or self._parse_user_id_from_token(api_key) self.session = requests.Session() self.session.headers.update( { "Authorization": f"Bearer {api_key}", "Accept": "application/json", "Content-Type": "application/json", } ) self.history_file = Path("docs/stats-history.json") # 定义下载类别的判定(这些类别会计入总浏览量/下载量统计) DOWNLOADABLE_TYPES = [ "action", "filter", "pipe", "toolkit", "function", "prompt", "model", ] def load_history(self) -> list: """从文件加载历史记录""" if self.history_file.exists(): try: with open(self.history_file, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"⚠️ 无法加载历史记录: {e}") return [] def save_history(self, stats: dict): """保存当前快照到历史记录""" history = self.load_history() today = get_beijing_time().strftime("%Y-%m-%d") # 构造快照 snapshot = { "date": today, "total_posts": stats["total_posts"], "total_downloads": stats["total_downloads"], "total_views": stats["total_views"], "total_upvotes": stats["total_upvotes"], "followers": stats.get("user", {}).get("followers", 0), "points": stats.get("user", {}).get("total_points", 0), } # 如果今天已存在,则更新;否则追加 for i, item in enumerate(history): if item.get("date") == today: history[i] = snapshot break else: history.append(snapshot) # 只保留最近 90 天的历史 history = history[-90:] with open(self.history_file, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) print(f"✅ 历史快照已更新 ({today})") def get_stat_delta(self, stats: dict) -> dict: """计算相对于上次记录的增长""" history = self.load_history() if len(history) < 2: return {} # 获取上一次的快照(倒数第二个,因为当前可能已经存入倒数第一个) # 或者如果还没存入,就是倒数第一个 today = get_beijing_time().strftime("%Y-%m-%d") prev = None for item in reversed(history): if item.get("date") != today: prev = item break if not prev: return {} return { "downloads": stats["total_downloads"] - prev.get("total_downloads", 0), "views": stats["total_views"] - prev.get("total_views", 0), "upvotes": stats["total_upvotes"] - prev.get("total_upvotes", 0), "followers": stats.get("user", {}).get("followers", 0) - prev.get("followers", 0), "points": stats.get("user", {}).get("total_points", 0) - prev.get("points", 0), } def _resolve_post_type(self, post: dict) -> str: """解析帖子类别""" top_type = post.get("type") function_data = post.get("data", {}) or {} function_obj = function_data.get("function", {}) or {} meta = function_obj.get("meta", {}) or {} manifest = meta.get("manifest", {}) or {} # 类别识别优先级: if top_type == "review": return "review" post_type = "unknown" if meta.get("type"): post_type = meta.get("type") elif function_obj.get("type"): post_type = function_obj.get("type") elif top_type: post_type = top_type elif not meta and not function_obj: post_type = "post" # 统一和启发式识别逻辑 if post_type == "unknown" and function_obj: post_type = "action" if post_type == "action" or post_type == "unknown": all_metadata = ( post.get("title", "") + json.dumps(meta, ensure_ascii=False) + json.dumps(manifest, ensure_ascii=False) ).lower() if "filter" in all_metadata: post_type = "filter" elif "pipe" in all_metadata: post_type = "pipe" elif "toolkit" in all_metadata: post_type = "toolkit" return post_type def _parse_user_id_from_token(self, token: str) -> str: """从 JWT Token 中解析用户 ID""" import base64 try: # JWT 格式: header.payload.signature payload = token.split(".")[1] # 添加 padding padding = 4 - len(payload) % 4 if padding != 4: payload += "=" * padding decoded = base64.urlsafe_b64decode(payload) data = json.loads(decoded) return data.get("id", "") except Exception as e: print(f"⚠️ 无法从 Token 解析用户 ID: {e}") return "" def generate_mermaid_chart(self) -> str: """生成 Mermaid 增长趋势图""" history = self.load_history() if len(history) < 3: # 数据太少不显示图表 return "" # 只取最近 14 天的数据用于展示 data = history[-14:] dates = [item["date"][-5:] for item in data] # 只取 MM-DD downloads = [str(item["total_downloads"]) for item in data] mm = [] mm.append("### 📈 增长趋势 (14天)") mm.append("") mm.append("```mermaid") mm.append("xychart-beta") mm.append(f' title "Downloads Trend"') mm.append(f" x-axis [{', '.join(f'\"{d}\"' for d in dates)}]") mm.append(f' y-axis "Downloads"') mm.append(f" line [{', '.join(downloads)}]") mm.append("```") mm.append("") return "\n".join(mm) def get_user_posts(self, sort: str = "new", page: int = 1) -> list: """ 获取用户发布的帖子列表 Args: sort: 排序方式 (new/top/hot) page: 页码 Returns: 帖子列表 """ url = f"{self.BASE_URL}/posts/users/{self.user_id}" params = {"sort": sort, "page": page} response = self.session.get(url, params=params) response.raise_for_status() return response.json() def get_all_posts(self, sort: str = "new") -> list: """获取所有帖子(自动分页)""" all_posts = [] page = 1 while True: posts = self.get_user_posts(sort=sort, page=page) if not posts: break all_posts.extend(posts) page += 1 return all_posts def generate_stats(self, posts: list) -> dict: """生成统计数据""" stats = { "total_posts": len(posts), "total_downloads": 0, "total_views": 0, "total_upvotes": 0, "total_downvotes": 0, "total_saves": 0, "total_comments": 0, "by_type": {}, "posts": [], "user": {}, # 用户信息 } # 从第一个帖子中提取用户信息 if posts and "user" in posts[0]: user = posts[0]["user"] stats["user"] = { "username": user.get("username", ""), "name": user.get("name", ""), "profile_url": f"https://openwebui.com/u/{user.get('username', '')}", "profile_image": user.get("profileImageUrl", ""), "followers": user.get("followerCount", 0), "following": user.get("followingCount", 0), "total_points": user.get("totalPoints", 0), "post_points": user.get("postPoints", 0), "comment_points": user.get("commentPoints", 0), "contributions": user.get("totalContributions", 0), } for post in posts: post_type = self._resolve_post_type(post) function_data = post.get("data", {}) or {} function_obj = function_data.get("function", {}) or {} meta = function_obj.get("meta", {}) or {} manifest = meta.get("manifest", {}) or {} # 累计统计 post_downloads = post.get("downloads", 0) post_views = post.get("views", 0) stats["total_downloads"] += post_downloads stats["total_upvotes"] += post.get("upvotes", 0) stats["total_downvotes"] += post.get("downvotes", 0) stats["total_saves"] += post.get("saveCount", 0) stats["total_comments"] += post.get("commentCount", 0) # 关键:总浏览量不包括不可以下载的类型 (如 post, review) if post_type in self.DOWNLOADABLE_TYPES or post_downloads > 0: stats["total_views"] += post_views if post_type not in stats["by_type"]: stats["by_type"][post_type] = 0 stats["by_type"][post_type] += 1 # 单个帖子信息 created_at = datetime.fromtimestamp(post.get("createdAt", 0)) updated_at = datetime.fromtimestamp(post.get("updatedAt", 0)) stats["posts"].append( { "title": post.get("title", ""), "slug": post.get("slug", ""), "type": post_type, "version": manifest.get("version", ""), "author": manifest.get("author", ""), "description": meta.get("description", ""), "downloads": post.get("downloads", 0), "views": post.get("views", 0), "upvotes": post.get("upvotes", 0), "saves": post.get("saveCount", 0), "comments": post.get("commentCount", 0), "created_at": created_at.strftime("%Y-%m-%d"), "updated_at": updated_at.strftime("%Y-%m-%d"), "url": f"https://openwebui.com/posts/{post.get('slug', '')}", } ) # 按下载量排序 stats["posts"].sort(key=lambda x: x["downloads"], reverse=True) return stats def print_stats(self, stats: dict): """打印统计报告到终端""" print("\n" + "=" * 60) print("📊 OpenWebUI 社区统计报告") print("=" * 60) print(f"📅 生成时间 (北京): {get_beijing_time().strftime('%Y-%m-%d %H:%M')}") print() # 总览 print("📈 总览") print("-" * 40) print(f" 📝 发布数量: {stats['total_posts']}") print(f" ⬇️ 总下载量: {stats['total_downloads']}") print(f" 👁️ 总浏览量: {stats['total_views']}") print(f" 👍 总点赞数: {stats['total_upvotes']}") print(f" 💾 总收藏数: {stats['total_saves']}") print(f" 💬 总评论数: {stats['total_comments']}") print() # 按类型分类 print("📂 按类型分类") print("-" * 40) for post_type, count in stats["by_type"].items(): print(f" • {post_type}: {count}") print() # 详细列表 print("📋 发布列表 (按下载量排序)") print("-" * 60) # 表头 print(f"{'排名':<4} {'标题':<30} {'下载':<8} {'浏览':<8} {'点赞':<6}") print("-" * 60) for i, post in enumerate(stats["posts"], 1): title = ( post["title"][:28] + ".." if len(post["title"]) > 30 else post["title"] ) print( f"{i:<4} {title:<30} {post['downloads']:<8} {post['views']:<8} {post['upvotes']:<6}" ) print("=" * 60) def generate_markdown(self, stats: dict, lang: str = "zh") -> str: """ 生成 Markdown 格式报告 Args: stats: 统计数据 lang: 语言 ("zh" 中文, "en" 英文) """ # 获取增量数据 delta = self.get_stat_delta(stats) # 中英文文本 texts = { "zh": { "title": "# 📊 OpenWebUI 社区统计报告", "updated": f"> 📅 更新时间: {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "overview_title": "## 📈 总览", "overview_header": "| 指标 | 数值 | 增长 (24h) |", "posts": "📝 发布数量", "downloads": "⬇️ 总下载量", "views": "👁️ 总浏览量", "upvotes": "👍 总点赞数", "saves": "💾 总收藏数", "comments": "💬 总评论数", "author_points": "⭐ 作者总积分", "author_followers": "👥 粉丝数量", "type_title": "## 📂 按类型分类", "list_title": "## 📋 发布列表", "list_header": "| 排名 | 标题 | 类型 | 版本 | 下载 | 浏览 | 点赞 | 收藏 | 更新日期 |", }, "en": { "title": "# 📊 OpenWebUI Community Stats Report", "updated": f"> 📅 Updated: {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "overview_title": "## 📈 Overview", "overview_header": "| Metric | Value | Growth (24h) |", "posts": "📝 Total Posts", "downloads": "⬇️ Total Downloads", "views": "👁️ Total Views", "upvotes": "👍 Total Upvotes", "saves": "💾 Total Saves", "comments": "💬 Total Comments", "author_points": "⭐ Author Points", "author_followers": "👥 Followers", "type_title": "## 📂 By Type", "list_title": "## 📋 Posts List", "list_header": "| Rank | Title | Type | Version | Downloads | Views | Upvotes | Saves | Updated |", }, } t = texts.get(lang, texts["en"]) md = [] md.append(t["title"]) md.append("") md.append(t["updated"]) md.append("") # 插入趋势图 chart = self.generate_mermaid_chart() if chart: md.append(chart) md.append("") # 总览 def fmt_delta(key: str) -> str: val = delta.get(key, 0) if val > 0: return f"**+{val}** 🚀" return "-" md.append(t["overview_title"]) md.append("") md.append(t["overview_header"]) md.append("|------|------|:---:|") md.append(f"| {t['posts']} | {stats['total_posts']} | - |") md.append( f"| {t['downloads']} | {stats['total_downloads']} | {fmt_delta('downloads')} |" ) md.append(f"| {t['views']} | {stats['total_views']} | {fmt_delta('views')} |") md.append( f"| {t['upvotes']} | {stats['total_upvotes']} | {fmt_delta('upvotes')} |" ) md.append(f"| {t['saves']} | {stats['total_saves']} | - |") md.append(f"| {t['comments']} | {stats['total_comments']} | - |") # 作者信息 user = stats.get("user", {}) if user: md.append( f"| {t['author_points']} | {user.get('total_points', 0)} | {fmt_delta('points')} |" ) md.append( f"| {t['author_followers']} | {user.get('followers', 0)} | {fmt_delta('followers')} |" ) md.append("") # 按类型分类 md.append(t["type_title"]) md.append("") for post_type, count in stats["by_type"].items(): md.append(f"- **{post_type}**: {count}") md.append("") # 详细列表 md.append(t["list_title"]) md.append("") md.append(t["list_header"]) md.append("|:---:|------|:---:|:---:|:---:|:---:|:---:|:---:|:---:|") for i, post in enumerate(stats["posts"], 1): title_link = f"[{post['title']}]({post['url']})" md.append( f"| {i} | {title_link} | {post['type']} | {post['version']} | " f"{post['downloads']} | {post['views']} | {post['upvotes']} | " f"{post['saves']} | {post['updated_at']} |" ) md.append("") return "\n".join(md) def save_json(self, stats: dict, filepath: str): """保存 JSON 格式数据""" with open(filepath, "w", encoding="utf-8") as f: json.dump(stats, f, ensure_ascii=False, indent=2) print(f"✅ JSON 数据已保存到: {filepath}") def generate_shields_endpoints(self, stats: dict, output_dir: str = "docs/badges"): """ 生成 Shields.io endpoint JSON 文件 Args: stats: 统计数据 output_dir: 输出目录 """ Path(output_dir).mkdir(parents=True, exist_ok=True) def format_number(n: int) -> str: """格式化数字为易读格式""" if n >= 1000000: return f"{n/1000000:.1f}M" elif n >= 1000: return f"{n/1000:.1f}k" return str(n) # 各种徽章数据 badges = { "downloads": { "schemaVersion": 1, "label": "downloads", "message": format_number(stats["total_downloads"]), "color": "blue", "namedLogo": "openwebui", }, "plugins": { "schemaVersion": 1, "label": "plugins", "message": str(stats["total_posts"]), "color": "green", }, "followers": { "schemaVersion": 1, "label": "followers", "message": format_number(stats.get("user", {}).get("followers", 0)), "color": "blue", }, "points": { "schemaVersion": 1, "label": "points", "message": format_number(stats.get("user", {}).get("total_points", 0)), "color": "orange", }, "upvotes": { "schemaVersion": 1, "label": "upvotes", "message": format_number(stats["total_upvotes"]), "color": "brightgreen", }, } for name, data in badges.items(): filepath = Path(output_dir) / f"{name}.json" with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) print(f" 📊 Generated badge: {name}.json") print(f"✅ Shields.io endpoints saved to: {output_dir}/") def generate_readme_stats(self, stats: dict, lang: str = "zh") -> str: """ 生成 README 统计徽章区域 Args: stats: 统计数据 lang: 语言 ("zh" 中文, "en" 英文) """ # 获取 Top 6 插件 top_plugins = stats["posts"][:6] # 中英文文本 texts = { "zh": { "title": "## 📊 社区统计", "updated": f"> 🕐 自动更新于 {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "author_header": "| 👤 作者 | 👥 粉丝 | ⭐ 积分 | 🏆 贡献 |", "header": "| 📝 发布 | ⬇️ 下载 | 👁️ 浏览 | 👍 点赞 | 💾 收藏 |", "top6_title": "### 🔥 热门插件 Top 6", "top6_updated": f"> 🕐 自动更新于 {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "top6_header": "| 排名 | 插件 | 版本 | 下载 | 浏览 | 更新日期 |", "full_stats": "*完整统计请查看 [社区统计报告](./docs/community-stats.zh.md)*", }, "en": { "title": "## 📊 Community Stats", "updated": f"> 🕐 Auto-updated: {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "author_header": "| 👤 Author | 👥 Followers | ⭐ Points | 🏆 Contributions |", "header": "| 📝 Posts | ⬇️ Downloads | 👁️ Views | 👍 Upvotes | 💾 Saves |", "top6_title": "### 🔥 Top 6 Popular Plugins", "top6_updated": f"> 🕐 Auto-updated: {get_beijing_time().strftime('%Y-%m-%d %H:%M')}", "top6_header": "| Rank | Plugin | Version | Downloads | Views | Updated |", "full_stats": "*See full stats in [Community Stats Report](./docs/community-stats.md)*", }, } t = texts.get(lang, texts["en"]) user = stats.get("user", {}) lines = [] lines.append("") lines.append(t["title"]) lines.append("") lines.append(t["updated"]) lines.append("") # 作者信息表格 if user: username = user.get("username", "") profile_url = user.get("profile_url", "") lines.append(t["author_header"]) lines.append("| :---: | :---: | :---: | :---: |") lines.append( f"| [{username}]({profile_url}) | **{user.get('followers', 0)}** | " f"**{user.get('total_points', 0)}** | **{user.get('contributions', 0)}** |" ) lines.append("") # 统计徽章表格 lines.append(t["header"]) lines.append("| :---: | :---: | :---: | :---: | :---: |") lines.append( f"| **{stats['total_posts']}** | **{stats['total_downloads']}** | " f"**{stats['total_views']}** | **{stats['total_upvotes']}** | **{stats['total_saves']}** |" ) lines.append("") # Top 6 热门插件 lines.append(t["top6_title"]) lines.append("") lines.append(t["top6_updated"]) lines.append("") lines.append(t["top6_header"]) lines.append("| :---: | :--- | :---: | :---: | :---: | :---: |") medals = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣", "6️⃣"] for i, post in enumerate(top_plugins): medal = medals[i] if i < len(medals) else str(i + 1) lines.append( f"| {medal} | [{post['title']}]({post['url']}) | {post['version']} | {post['downloads']} | {post['views']} | {post['updated_at']} |" ) lines.append("") lines.append(t["full_stats"]) lines.append("") return "\n".join(lines) def update_readme(self, stats: dict, readme_path: str, lang: str = "zh"): """ 更新 README 文件中的统计区域 Args: stats: 统计数据 readme_path: README 文件路径 lang: 语言 ("zh" 中文, "en" 英文) """ import re # 读取现有内容 with open(readme_path, "r", encoding="utf-8") as f: content = f.read() # 生成新的统计区域 new_stats = self.generate_readme_stats(stats, lang) # 检查是否已有统计区域 pattern = r".*?" if re.search(pattern, content, re.DOTALL): # 替换现有区域 new_content = re.sub(pattern, new_stats, content, flags=re.DOTALL) else: # 在简介段落之后插入统计区域 # 查找模式:标题 -> 语言切换行 -> 简介段落 -> 插入位置 lines = content.split("\n") insert_pos = 0 found_intro = False for i, line in enumerate(lines): # 跳过标题 if line.startswith("# "): continue # 跳过空行 if line.strip() == "": continue # 跳过语言切换行 (如 "English | [中文]" 或 "[English] | 中文") if ("English" in line or "中文" in line) and "|" in line: continue # 找到第一个非空、非标题、非语言切换的段落(简介) if not found_intro: found_intro = True # 继续到这个段落结束 continue # 简介段落后的空行或下一个标题就是插入位置 if line.strip() == "" or line.startswith("#"): insert_pos = i break # 如果没找到合适位置,就放在第3行(标题和语言切换后) if insert_pos == 0: insert_pos = 3 # 在适当位置插入 lines.insert(insert_pos, "") lines.insert(insert_pos + 1, new_stats) lines.insert(insert_pos + 2, "") new_content = "\n".join(lines) # 写回文件 with open(readme_path, "w", encoding="utf-8") as f: f.write(new_content) print(f"✅ README 已更新: {readme_path}") def main(): """主函数""" # 获取配置 api_key = os.getenv("OPENWEBUI_API_KEY") user_id = os.getenv("OPENWEBUI_USER_ID") if not api_key: print("❌ 错误: 未设置 OPENWEBUI_API_KEY 环境变量") print("请设置环境变量:") print(" export OPENWEBUI_API_KEY='your_api_key_here'") return 1 if not user_id: print("❌ 错误: 未设置 OPENWEBUI_USER_ID 环境变量") print("请设置环境变量:") print(" export OPENWEBUI_USER_ID='your_user_id_here'") print("\n提示: 用户 ID 可以从之前的 curl 请求中获取") print(" 例如: b15d1348-4347-42b4-b815-e053342d6cb0") return 1 # 初始化 stats_client = OpenWebUIStats(api_key, user_id) print(f"🔍 用户 ID: {stats_client.user_id}") # 获取所有帖子 print("📥 正在获取帖子数据...") posts = stats_client.get_all_posts() print(f"✅ 获取到 {len(posts)} 个帖子") # 生成统计 stats = stats_client.generate_stats(posts) # 保存历史快照 stats_client.save_history(stats) # 打印到终端 stats_client.print_stats(stats) # 保存 Markdown 报告 (中英文双版本) script_dir = Path(__file__).parent.parent # 中文报告 md_zh_path = script_dir / "docs" / "community-stats.zh.md" md_zh_content = stats_client.generate_markdown(stats, lang="zh") with open(md_zh_path, "w", encoding="utf-8") as f: f.write(md_zh_content) print(f"\n✅ 中文报告已保存到: {md_zh_path}") # 英文报告 md_en_path = script_dir / "docs" / "community-stats.md" md_en_content = stats_client.generate_markdown(stats, lang="en") with open(md_en_path, "w", encoding="utf-8") as f: f.write(md_en_content) print(f"✅ 英文报告已保存到: {md_en_path}") # 保存 JSON 数据 json_path = script_dir / "docs" / "community-stats.json" stats_client.save_json(stats, str(json_path)) # 生成 Shields.io endpoint JSON (用于动态徽章) badges_dir = script_dir / "docs" / "badges" stats_client.generate_shields_endpoints(stats, str(badges_dir)) # 更新 README 文件 readme_path = script_dir / "README.md" readme_cn_path = script_dir / "README_CN.md" stats_client.update_readme(stats, str(readme_path), lang="en") stats_client.update_readme(stats, str(readme_cn_path), lang="zh") return 0 if __name__ == "__main__": exit(main())