Files
Fu-Jie_openwebui-extensions/scripts/openwebui_stats.py

1606 lines
59 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
OpenWebUI community stats utility.
Collect and summarize your published posts/plugins on openwebui.com.
Usage:
1. Set environment variables:
- OPENWEBUI_API_KEY: required
- OPENWEBUI_USER_ID: optional (auto-resolved from /api/v1/auths/ when missing)
2. Run: python scripts/openwebui_stats.py
How to get API key:
Visit https://openwebui.com/settings/api and create a key (starts with sk-).
How to get user ID (optional):
Read the `id` field from /api/v1/auths/, format example:
b15d1348-4347-42b4-b815-e053342d6cb0
"""
import os
import json
import requests
import zlib
import base64
import re
import subprocess
from datetime import datetime, timezone, timedelta
from typing import Optional
from pathlib import Path
# Beijing timezone (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
def get_beijing_time() -> datetime:
"""Get current time in Beijing timezone."""
return datetime.now(BEIJING_TZ)
# Try loading local .env file (if python-dotenv is installed)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
class OpenWebUIStats:
"""OpenWebUI community stats utility."""
BASE_URL = "https://api.openwebui.com/api/v1"
def __init__(
self,
api_key: str,
user_id: Optional[str] = None,
gist_token: Optional[str] = None,
gist_id: Optional[str] = None,
):
"""
Initialize the stats utility
Args:
api_key: OpenWebUI API Key (JWT Token)
user_id: User ID; if None, will be parsed from token
gist_token: GitHub Personal Access Token (for reading/writing Gist)
gist_id: GitHub Gist ID
"""
self.api_key = api_key
self.user_id = user_id or self._parse_user_id_from_token(api_key)
self.gist_token = gist_token
self.gist_id = gist_id or "db3d95687075a880af6f1fba76d679c6"
self.history_filename = "community-stats-history.json"
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"Content-Type": "application/json",
}
)
self.history_file = Path("docs/stats-history.json")
# Types considered downloadable (included in total view/download stats)
DOWNLOADABLE_TYPES = [
"action",
"filter",
"pipe",
"tool",
"function",
"prompt",
"model",
]
TYPE_ALIASES = {
"tools": "tool",
}
def _normalize_post_type(self, post_type: str) -> str:
"""Normalize post type to avoid synonym type splitting in statistics."""
normalized = str(post_type or "").strip().lower()
return self.TYPE_ALIASES.get(normalized, normalized)
def load_history(self) -> list:
"""Load history records (merge Gist + local file, keep the one with more records)"""
gist_history = []
local_history = []
# 1. Try loading from Gist
if self.gist_token and self.gist_id:
try:
url = f"https://api.github.com/gists/{self.gist_id}"
headers = {"Authorization": f"token {self.gist_token}"}
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
gist_data = resp.json()
file_info = gist_data.get("files", {}).get(self.history_filename)
if file_info:
content = file_info.get("content")
gist_history = json.loads(content)
print(
f"✅ Loaded history from Gist ({len(gist_history)} records)"
)
except Exception as e:
print(f"⚠️ Failed to load history from Gist: {e}")
# 2. Also load from local file
if self.history_file.exists():
try:
with open(self.history_file, "r", encoding="utf-8") as f:
local_history = json.load(f)
print(
f"✅ Loaded history from local file ({len(local_history)} records)"
)
except Exception as e:
print(f"⚠️ Failed to load local history: {e}")
# 3. Merge two sources (by date as key, keep newer when conflicts)
hist_dict = {}
for item in gist_history:
hist_dict[item["date"]] = item
for item in local_history:
hist_dict[item["date"]] = (
item # Local data overrides Gist (more likely to be latest)
)
history = sorted(hist_dict.values(), key=lambda x: x["date"])
print(f"📊 Merged history records: {len(history)}")
# 4. If merged data is still too short, try rebuilding from Git
if len(history) < 5 and os.path.isdir(".git"):
print("📉 History too short, attempting Git rebuild...")
git_history = self.rebuild_history_from_git()
if len(git_history) > len(history):
print(f"✅ Rebuilt history from Git: {len(git_history)} records")
for item in git_history:
if item["date"] not in hist_dict:
hist_dict[item["date"]] = item
history = sorted(hist_dict.values(), key=lambda x: x["date"])
# 5. If there is new data, sync back to Gist
if len(history) > len(gist_history) and self.gist_token and self.gist_id:
try:
url = f"https://api.github.com/gists/{self.gist_id}"
headers = {"Authorization": f"token {self.gist_token}"}
payload = {
"files": {
self.history_filename: {
"content": json.dumps(history, ensure_ascii=False, indent=2)
}
}
}
resp = requests.patch(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"✅ History synced to Gist ({len(history)} records)")
else:
print(f"⚠️ Gist sync failed: {resp.status_code}")
except Exception as e:
print(f"⚠️ Error syncing history to Gist: {e}")
return history
def save_history(self, stats: dict):
"""Save current snapshot to history (prioritize Gist, fallback to local)"""
history = self.load_history()
today = get_beijing_time().strftime("%Y-%m-%d")
# Build detailed snapshot (including each plugin's download count)
snapshot = {
"date": today,
"total_posts": stats["total_posts"],
"total_downloads": stats["total_downloads"],
"total_views": stats["total_views"],
"total_upvotes": stats["total_upvotes"],
"total_saves": stats["total_saves"],
"followers": stats.get("user", {}).get("followers", 0),
"points": stats.get("user", {}).get("total_points", 0),
"contributions": stats.get("user", {}).get("contributions", 0),
"posts": {p["slug"]: p["downloads"] for p in stats.get("posts", [])},
}
# Update or append data point
updated = False
for i, item in enumerate(history):
if item.get("date") == today:
history[i] = snapshot
updated = True
break
if not updated:
history.append(snapshot)
# Limit length (90 days)
history = history[-90:]
# Try saving to Gist
if self.gist_token and self.gist_id:
try:
url = f"https://api.github.com/gists/{self.gist_id}"
headers = {"Authorization": f"token {self.gist_token}"}
payload = {
"files": {
self.history_filename: {
"content": json.dumps(history, ensure_ascii=False, indent=2)
}
}
}
resp = requests.patch(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"✅ History synced to Gist ({self.gist_id})")
# If sync succeeds, do not save to local to reduce commit pressure
return
except Exception as e:
print(f"⚠️ Failed to sync to Gist: {e}")
# Fallback: save to local
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2)
print(f"✅ History updated to local ({today})")
def get_stat_delta(self, stats: dict) -> dict:
"""Calculate growth relative to last recorded snapshot (24h delta)"""
history = self.load_history()
if not history:
return {}
today = get_beijing_time().strftime("%Y-%m-%d")
prev = None
# Find last data point from a different day as baseline
for item in reversed(history):
if item.get("date") != today:
prev = item
break
if not prev:
return {}
return {
"downloads": stats["total_downloads"] - prev.get("total_downloads", 0),
"views": stats["total_views"] - prev.get("total_views", 0),
"upvotes": stats["total_upvotes"] - prev.get("total_upvotes", 0),
"saves": stats["total_saves"] - prev.get("total_saves", 0),
"followers": stats.get("user", {}).get("followers", 0)
- prev.get("followers", 0),
"points": stats.get("user", {}).get("total_points", 0)
- prev.get("points", 0),
"contributions": stats.get("user", {}).get("contributions", 0)
- prev.get("contributions", 0),
"posts": {
p["slug"]: p["downloads"]
- prev.get("posts", {}).get(p["slug"], p["downloads"])
for p in stats.get("posts", [])
},
}
def _resolve_post_type(self, post: dict) -> str:
"""Resolve the post category type"""
top_type = post.get("type")
function_data = post.get("data", {}) or {}
function_obj = function_data.get("function", {}) or {}
meta = function_obj.get("meta", {}) or {}
manifest = meta.get("manifest", {}) or {}
# Category identification priority:
if top_type == "review":
return "review"
post_type = "unknown"
if meta.get("type"):
post_type = meta.get("type")
elif function_obj.get("type"):
post_type = function_obj.get("type")
elif top_type:
post_type = top_type
elif not meta and not function_obj:
post_type = "post"
post_type = self._normalize_post_type(post_type)
# Unified and heuristic identification logic
if post_type == "unknown" and function_obj:
post_type = "action"
if post_type == "action" or post_type == "unknown":
all_metadata = (
post.get("title", "")
+ json.dumps(meta, ensure_ascii=False)
+ json.dumps(manifest, ensure_ascii=False)
).lower()
if "filter" in all_metadata:
post_type = "filter"
elif "pipe" in all_metadata:
post_type = "pipe"
elif "tool" in all_metadata:
post_type = "tool"
return self._normalize_post_type(post_type)
def rebuild_history_from_git(self) -> list:
"""Rebuild statistics from Git commit history"""
history = []
try:
# Rebuild from Git history of docs/community-stats.json (has richest history)
# Format: hash date
target = "docs/community-stats.json"
cmd = [
"git",
"log",
"--pretty=format:%H %ad",
"--date=short",
target,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
commits = result.stdout.strip().splitlines()
print(f"🔍 Found {len(commits)} commits modifying stats file")
seen_dates = set()
# Process from oldest to newest (git log defaults to newest first, so reverse)
# The order doesn't really matter as long as we sort at the end
for line in reversed(commits): # Process from oldest to newest
parts = line.split()
if len(parts) < 2:
continue
commit_hash = parts[0]
commit_date = parts[1] # YYYY-MM-DD
if commit_date in seen_dates:
continue
seen_dates.add(commit_date)
# Read file content at this commit
# Note: The file name in git show needs to be relative to the repo root
show_cmd = ["git", "show", f"{commit_hash}:{target}"]
show_res = subprocess.run(
show_cmd, capture_output=True, text=True, check=True
)
if show_res.returncode == 0:
try:
# Git history might contain the full history JSON, or just a single snapshot.
# We need to handle both cases.
content = show_res.stdout.strip()
if content.startswith("[") and content.endswith("]"):
# It's a full history list, take the last item
data_list = json.loads(content)
if data_list:
data = data_list[-1]
else:
continue
else:
# It's a single snapshot
data = json.loads(content)
# Ensure the date matches the commit date, or use the one from data if available
entry_date = data.get("date", commit_date)
if entry_date != commit_date:
print(
f"⚠️ Date mismatch for commit {commit_hash}: file date {entry_date}, commit date {commit_date}. Using commit date."
)
entry_date = commit_date
history.append(
{
"date": entry_date,
"total_downloads": data.get("total_downloads", 0),
"total_views": data.get("total_views", 0),
"total_upvotes": data.get("total_upvotes", 0),
"total_saves": data.get("total_saves", 0),
"followers": data.get("followers", 0),
"points": data.get("points", 0),
"contributions": data.get("contributions", 0),
"posts": data.get(
"posts", {}
), # Include individual post stats
}
)
except json.JSONDecodeError:
print(
f"⚠️ Could not decode JSON from commit {commit_hash} for {self.history_file}"
)
except Exception as e:
print(f"⚠️ Error processing commit {commit_hash}: {e}")
# Sort by date to ensure chronological order
history.sort(key=lambda x: x["date"])
return history
except subprocess.CalledProcessError as e:
print(
f"⚠️ Git command failed: {e.cmd}\nStdout: {e.stdout}\nStderr: {e.stderr}"
)
return []
except Exception as e:
print(f"⚠️ Error rebuilding history from git: {e}")
return []
def _parse_user_id_from_token(self, token: str) -> str:
"""Parse user ID from JWT Token"""
import base64
if not token or token.startswith("sk-"):
return ""
try:
# JWT format: header.payload.signature
payload = token.split(".")[1]
# Add padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
data = json.loads(decoded)
return data.get("id", "")
except Exception as e:
print(f"⚠️ Failed to parse user ID from token: {e}")
return ""
def resolve_user_id(self) -> str:
"""Auto-resolve current user ID via community API (for sk- type API keys)"""
if self.user_id:
return self.user_id
try:
resp = self.session.get(f"{self.BASE_URL}/auths/", timeout=20)
if resp.status_code == 200:
data = resp.json() if resp.text else {}
resolved = str(data.get("id", "")).strip()
if resolved:
self.user_id = resolved
return resolved
else:
print(f"⚠️ Failed to auto-resolve user ID: HTTP {resp.status_code}")
except Exception as e:
print(f"⚠️ Exception while auto-resolving user ID: {e}")
return ""
def generate_mermaid_chart(self, stats: dict = None, lang: str = "zh") -> str:
"""Generate dynamic Mermaid chart links with Kroki server-side rendering (zero commit)"""
history = self.load_history()
if not history:
return ""
# Multi-language labels
labels = {
"zh": {
"trend_title": "增长与趋势 (Last 14 Days)",
"trend_subtitle": "Engagement & Downloads Trend",
"legend": "蓝色: 总下载量 | 紫色: 总浏览量 (实时动态生成)",
"dist_title": "内容分类占比 (Distribution)",
"dist_subtitle": "Plugin Types Distribution",
},
"en": {
"trend_title": "Growth & Trends (Last 14 Days)",
"trend_subtitle": "Engagement & Downloads Trend",
"legend": "Blue: Downloads | Purple: Views (Real-time dynamic)",
"dist_title": "Content Distribution",
"dist_subtitle": "Plugin Types Distribution",
},
}
l = labels.get(lang, labels["en"])
def kroki_render(mermaid_code: str) -> str:
"""将 Mermaid 代码压缩并编码为 Kroki 链接"""
try:
compressed = zlib.compress(mermaid_code.encode("utf-8"), level=9)
encoded = base64.urlsafe_b64encode(compressed).decode("utf-8")
return f"https://kroki.io/mermaid/svg/{encoded}"
except:
return ""
charts = []
# 1. 增长趋势图 (XY Chart)
if len(history) >= 3:
data = history[-14:]
dates = [item["date"][-5:] for item in data]
dates_str = ", ".join([f'"{d}"' for d in dates])
dls = [str(item["total_downloads"]) for item in data]
vws = [str(item["total_views"]) for item in data]
mm = f"""xychart-beta
title "{l['trend_subtitle']}"
x-axis [{dates_str}]
y-axis "Total Counts"
line [{', '.join(dls)}]
line [{', '.join(vws)}]"""
charts.append(f"### 📈 {l['trend_title']}")
charts.append(f"![Trend]({kroki_render(mm)})")
charts.append(f"\n> *{l['legend']}*")
charts.append("")
# 2. 插件类型分布 (Pie Chart)
if stats and stats.get("by_type"):
pie_data = "\n".join(
[
f' "{p_type}" : {count}'
for p_type, count in stats["by_type"].items()
]
)
mm = f"pie title \"{l['dist_subtitle']}\"\n{pie_data}"
charts.append(f"### 📂 {l['dist_title']}")
charts.append(f"![Distribution]({kroki_render(mm)})")
charts.append("")
return "\n".join(charts)
def get_user_posts(self, sort: str = "new", page: int = 1) -> list:
"""
Fetch list of posts published by the user
Args:
sort: Sort order (new/top/hot)
page: Page number
Returns:
List of posts
"""
url = f"{self.BASE_URL}/posts/users/{self.user_id}"
params = {"sort": sort, "page": page}
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_all_posts(self, sort: str = "new") -> list:
"""Fetch all posts (automatic pagination)"""
all_posts = []
page = 1
while True:
posts = self.get_user_posts(sort=sort, page=page)
if not posts:
break
all_posts.extend(posts)
page += 1
return all_posts
def generate_stats(self, posts: list) -> dict:
"""Generate statistics"""
stats = {
"total_posts": len(posts),
"total_downloads": 0,
"total_views": 0,
"total_upvotes": 0,
"total_downvotes": 0,
"total_saves": 0,
"total_comments": 0,
"by_type": {},
"posts": [],
"user": {}, # User info
}
# Extract user info from first post
if posts and "user" in posts[0]:
user = posts[0]["user"]
stats["user"] = {
"username": user.get("username", ""),
"name": user.get("name", ""),
"profile_url": f"https://openwebui.com/u/{user.get('username', '')}",
"profile_image": user.get("profileImageUrl", ""),
"followers": user.get("followerCount", 0),
"following": user.get("followingCount", 0),
"total_points": user.get("totalPoints", 0),
"post_points": user.get("postPoints", 0),
"comment_points": user.get("commentPoints", 0),
"contributions": user.get("totalContributions", 0),
}
for post in posts:
post_type = self._resolve_post_type(post)
function_data = post.get("data", {}) or {}
function_obj = function_data.get("function", {}) or {}
meta = function_obj.get("meta", {}) or {}
manifest = meta.get("manifest", {}) or {}
# Accumulate statistics
post_downloads = post.get("downloads", 0)
post_views = post.get("views", 0)
stats["total_downloads"] += post_downloads
stats["total_upvotes"] += post.get("upvotes", 0)
stats["total_downvotes"] += post.get("downvotes", 0)
stats["total_saves"] += post.get("saveCount", 0)
stats["total_comments"] += post.get("commentCount", 0)
# Key: total views do not include non-downloadable types (e.g., post, review)
if post_type in self.DOWNLOADABLE_TYPES or post_downloads > 0:
stats["total_views"] += post_views
if post_type not in stats["by_type"]:
stats["by_type"][post_type] = 0
stats["by_type"][post_type] += 1
# Individual post information
created_at = datetime.fromtimestamp(post.get("createdAt", 0))
updated_at = datetime.fromtimestamp(post.get("updatedAt", 0))
stats["posts"].append(
{
"title": post.get("title", ""),
"slug": post.get("slug", ""),
"type": post_type,
"version": manifest.get("version", ""),
"author": manifest.get("author", ""),
"description": meta.get("description", ""),
"downloads": post.get("downloads", 0),
"views": post.get("views", 0),
"upvotes": post.get("upvotes", 0),
"saves": post.get("saveCount", 0),
"comments": post.get("commentCount", 0),
"created_at": created_at.strftime("%Y-%m-%d"),
"updated_at": updated_at.strftime("%Y-%m-%d"),
"url": f"https://openwebui.com/posts/{post.get('slug', '')}",
}
)
# Sort by download count
stats["posts"].sort(key=lambda x: x["downloads"], reverse=True)
return stats
def print_stats(self, stats: dict):
"""Print statistics report to terminal"""
print("\n" + "=" * 60)
print("📊 OpenWebUI Community Statistics Report")
print("=" * 60)
print(
f"📅 Generated (Beijing time): {get_beijing_time().strftime('%Y-%m-%d %H:%M')}"
)
print()
# Overview
print("📈 Overview")
print("-" * 40)
print(f" 📝 Posts: {stats['total_posts']}")
print(f" ⬇️ Total Downloads: {stats['total_downloads']}")
print(f" 👁️ Total Views: {stats['total_views']}")
print(f" 👍 Total Upvotes: {stats['total_upvotes']}")
print(f" 💾 Total Saves: {stats['total_saves']}")
print(f" 💬 Total Comments: {stats['total_comments']}")
print()
# By type
print("📂 By Type")
print("-" * 40)
for post_type, count in stats["by_type"].items():
print(f"{post_type}: {count}")
print()
# Detailed list
print("📋 Posts List (sorted by downloads)")
print("-" * 60)
# Header
print(f"{'Rank':<4} {'Title':<30} {'Downloads':<8} {'Views':<8} {'Upvotes':<6}")
print("-" * 60)
for i, post in enumerate(stats["posts"], 1):
title = (
post["title"][:28] + ".." if len(post["title"]) > 30 else post["title"]
)
print(
f"{i:<4} {title:<30} {post['downloads']:<8} {post['views']:<8} {post['upvotes']:<6}"
)
print("=" * 60)
def _safe_key(self, key: str) -> str:
"""Generate safe filename key (MD5 hash) to avoid Chinese character issues"""
import hashlib
return hashlib.md5(key.encode("utf-8")).hexdigest()
def generate_markdown(self, stats: dict, lang: str = "zh") -> str:
"""
Generate Markdown format report (fully dynamic badges and Kroki charts)
Args:
stats: Statistics data
lang: Language ("zh" Chinese, "en" English)
"""
# Get delta data
delta = self.get_stat_delta(stats)
# Bilingual text
texts = {
"zh": {
"title": "# 📊 OpenWebUI 社区统计报告",
"updated_label": "更新时间",
"overview_title": "## 📈 总览",
"overview_header": "| 指标 | 数值 |",
"posts": "📝 发布数量",
"downloads": "⬇️ 总下载量",
"views": "👁️ 总浏览量",
"upvotes": "👍 总点赞数",
"saves": "💾 总收藏数",
"comments": "💬 总评论数",
"author_points": "⭐ 作者总积分",
"author_followers": "👥 粉丝数量",
"type_title": "## 📂 按类型分类",
"list_title": "## 📋 发布列表",
"list_header": "| 排名 | 标题 | 类型 | 版本 | 下载 | 浏览 | 点赞 | 收藏 | 更新日期 |",
},
"en": {
"title": "# 📊 OpenWebUI Community Stats Report",
"updated_label": "Updated",
"overview_title": "## 📈 Overview",
"overview_header": "| Metric | Value |",
"posts": "📝 Total Posts",
"downloads": "⬇️ Total Downloads",
"views": "👁️ Total Views",
"upvotes": "👍 Total Upvotes",
"saves": "💾 Total Saves",
"comments": "💬 Total Comments",
"author_points": "⭐ Author Points",
"author_followers": "👥 Followers",
"type_title": "## 📂 By Type",
"list_title": "## 📋 Posts List",
"list_header": "| Rank | Title | Type | Version | Downloads | Views | Upvotes | Saves | Updated |",
},
}
t = texts.get(lang, texts["en"])
user = stats.get("user", {})
md = []
md.append(t["title"])
md.append("")
updated_key = "updated_zh" if lang == "zh" else "updated"
md.append(f"> {self.get_badge(updated_key, stats, user, delta)}")
md.append("")
# 插入趋势图 (使用 Kroki SVG 链接)
chart = self.generate_mermaid_chart(stats, lang=lang)
if chart:
md.append(chart)
md.append("")
# 总览
md.append(t["overview_title"])
md.append("")
md.append(t["overview_header"])
md.append("|------|------|")
md.append(f"| {t['posts']} | {self.get_badge('posts', stats, user, delta)} |")
md.append(
f"| {t['downloads']} | {self.get_badge('downloads', stats, user, delta)} |"
)
md.append(f"| {t['views']} | {self.get_badge('views', stats, user, delta)} |")
md.append(
f"| {t['upvotes']} | {self.get_badge('upvotes', stats, user, delta)} |"
)
md.append(f"| {t['saves']} | {self.get_badge('saves', stats, user, delta)} |")
# 作者信息
if user:
md.append(
f"| {t['author_points']} | {self.get_badge('points', stats, user, delta)} |"
)
md.append(
f"| {t['author_followers']} | {self.get_badge('followers', stats, user, delta)} |"
)
md.append("")
# 按类型分类 (使用徽章)
md.append(t["type_title"])
md.append("")
type_colors = {
"post": "blue",
"filter": "brightgreen",
"action": "orange",
"pipe": "blueviolet",
"tool": "teal",
"pipeline": "purple",
"review": "yellow",
"prompt": "lightgrey",
}
for post_type, count in stats["by_type"].items():
color = type_colors.get(post_type, "gray")
badge = f"![{post_type}](https://img.shields.io/badge/{post_type}-{count}-{color})"
md.append(f"- {badge}")
md.append("")
# 详细列表
md.append(t["list_title"])
md.append("")
md.append(t["list_header"])
md.append("|:---:|------|:---:|:---:|:---:|:---:|:---:|:---:|:---:|")
for i, post in enumerate(stats["posts"], 1):
title_link = f"[{post['title']}]({post['url']})"
slug_hash = self._safe_key(post["slug"])
# 使用针对每个帖子的动态徽章 (使用 Hash 保证文件名安全)
dl_badge = self.get_badge(
f"post_{slug_hash}_dl", stats, user, delta, is_post=True
)
vw_badge = self.get_badge(
f"post_{slug_hash}_vw", stats, user, delta, is_post=True
)
up_badge = self.get_badge(
f"post_{slug_hash}_up", stats, user, delta, is_post=True
)
sv_badge = self.get_badge(
f"post_{slug_hash}_sv", stats, user, delta, is_post=True
)
# 版本号使用动态 Shields.io 徽章 (由于列表太长,我们这次没给所有 post 生成单独的 version json)
# 不过实际上 upload_gist_badges 是给 top 6 生成的。
# 对于完整列表,还是暂时用静态吧,避免要把几百个 version json 都生成出来传到 Gist
ver = post["version"] if post["version"] else "N/A"
ver_color = "blue" if post["version"] else "gray"
ver_badge = (
f"![v](https://img.shields.io/badge/v-{ver}-{ver_color}?style=flat)"
)
md.append(
f"| {i} | {title_link} | {post['type']} | {ver_badge} | "
f"{dl_badge} | {vw_badge} | {up_badge} | "
f"{sv_badge} | {post['updated_at']} |"
)
md.append("")
return "\n".join(md)
def save_json(self, stats: dict, filepath: str):
"""Save data in JSON format"""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
print(f"✅ JSON data saved to: {filepath}")
def generate_shields_endpoints(self, stats: dict, output_dir: str = "docs/badges"):
"""
Generate Shields.io endpoint JSON files
Args:
stats: Statistics data
output_dir: Output directory
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
def format_number(n: int) -> str:
"""Format number to readable format"""
if n >= 1000000:
return f"{n/1000000:.1f}M"
elif n >= 1000:
return f"{n/1000:.1f}k"
return str(n)
# Badge data
badges = {
"downloads": {
"schemaVersion": 1,
"label": "downloads",
"message": format_number(stats["total_downloads"]),
"color": "blue",
"namedLogo": "openwebui",
},
"plugins": {
"schemaVersion": 1,
"label": "plugins",
"message": str(stats["total_posts"]),
"color": "green",
},
"followers": {
"schemaVersion": 1,
"label": "followers",
"message": format_number(stats.get("user", {}).get("followers", 0)),
"color": "blue",
},
"points": {
"schemaVersion": 1,
"label": "points",
"message": format_number(stats.get("user", {}).get("total_points", 0)),
"color": "orange",
},
"upvotes": {
"schemaVersion": 1,
"label": "upvotes",
"message": format_number(stats["total_upvotes"]),
"color": "brightgreen",
},
}
for name, data in badges.items():
filepath = Path(output_dir) / f"{name}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
print(f" 📊 Generated badge: {name}.json")
if self.gist_token and self.gist_id:
try:
# 构造并上传 Shields.io 徽章数据
self.upload_gist_badges(stats)
except Exception as e:
print(f"⚠️ Badge generation failed: {e}")
print(f"✅ Shields.io endpoints saved to: {output_dir}/")
def upload_gist_badges(self, stats: dict):
"""Generate and upload Gist badge data (for Shields.io Endpoint)"""
if not (self.gist_token and self.gist_id):
return
delta = self.get_stat_delta(stats)
# Define badge config {key: (label, value, color)}
badges_config = {
"downloads": ("Downloads", stats["total_downloads"], "brightgreen"),
"views": ("Views", stats["total_views"], "blue"),
"upvotes": ("Upvotes", stats["total_upvotes"], "orange"),
"saves": ("Saves", stats["total_saves"], "lightgrey"),
"followers": (
"Followers",
stats.get("user", {}).get("followers", 0),
"blueviolet",
),
"points": (
"Points",
stats.get("user", {}).get("total_points", 0),
"yellow",
),
"contributions": (
"Contributions",
stats.get("user", {}).get("contributions", 0),
"green",
),
"posts": ("Posts", stats["total_posts"], "informational"),
}
files_payload = {}
for key, (label, val, color) in badges_config.items():
diff = delta.get(key, 0)
if isinstance(diff, dict):
diff = 0 # Avoid dict vs int comparison error with 'posts' key
message = f"{val}"
if diff > 0:
message += f" (+{diff}🚀)"
elif diff < 0:
message += f" ({diff})"
# Build Shields.io endpoint JSON
# 参考: https://shields.io/badges/endpoint-badge
badge_data = {
"schemaVersion": 1,
"label": label,
"message": message,
"color": color,
}
filename = f"badge_{key}.json"
files_payload[filename] = {
"content": json.dumps(badge_data, ensure_ascii=False)
}
# Generate top 6 plugins badges (based on slots p1, p2...)
post_deltas = delta.get("posts", {})
for i, post in enumerate(stats.get("posts", [])[:6]):
idx = i + 1
diff = post_deltas.get(post["slug"], 0)
# Downloads badge
dl_msg = f"{post['downloads']}"
if diff > 0:
dl_msg += f" (+{diff}🚀)"
files_payload[f"badge_p{idx}_dl.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Downloads",
"message": dl_msg,
"color": "brightgreen",
}
)
}
# 浏览量徽章 (由于历史记录没记单个 post 浏览量,暂时只显总数)
files_payload[f"badge_p{idx}_vw.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Views",
"message": f"{post['views']}",
"color": "blue",
}
)
}
# 版本号徽章
ver = post.get("version", "N/A") or "N/A"
ver_color = "blue" if post.get("version") else "gray"
files_payload[f"badge_p{idx}_version.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "v",
"message": ver,
"color": ver_color,
}
)
}
# 生成所有帖子的个体徽章 (用于详细报表)
# 生成所有帖子的个体徽章 (用于详细报表)
for post in stats.get("posts", []):
slug_hash = self._safe_key(post["slug"])
diff = post_deltas.get(post["slug"], 0)
# 1. Downloads
dl_msg = f"{post['downloads']}"
if diff > 0:
dl_msg += f" (+{diff}🚀)"
files_payload[f"badge_post_{slug_hash}_dl.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Downloads",
"message": dl_msg,
"color": "brightgreen",
}
)
}
# 2. Views
files_payload[f"badge_post_{slug_hash}_vw.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Views",
"message": f"{post['views']}",
"color": "blue",
}
)
}
# 3. Upvotes
files_payload[f"badge_post_{slug_hash}_up.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Upvotes",
"message": f"{post['upvotes']}",
"color": "orange",
}
)
}
# 4. Saves
files_payload[f"badge_post_{slug_hash}_sv.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Saves",
"message": f"{post['saves']}",
"color": "lightgrey",
}
)
}
# 生成更新时间徽章
now_str = get_beijing_time().strftime("%Y-%m-%d %H:%M")
files_payload["badge_updated.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "Auto-updated",
"message": now_str,
"color": "gray",
}
)
}
files_payload["badge_updated_zh.json"] = {
"content": json.dumps(
{
"schemaVersion": 1,
"label": "自动更新于",
"message": now_str,
"color": "gray",
}
)
}
# 将生成的 Markdown 报告也作为一个普通 JSON 文件上传到 Gist
for lang in ["zh", "en"]:
report_content = self.generate_markdown(stats, lang=lang)
files_payload[f"report_{lang}.md"] = {"content": report_content}
# 批量上传到 Gist
url = f"https://api.github.com/gists/{self.gist_id}"
headers = {"Authorization": f"token {self.gist_token}"}
payload = {"files": files_payload}
resp = requests.patch(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"✅ 动态数据与报告已同步至 Gist ({len(files_payload)} files)")
else:
print(f"⚠️ Gist 同步失败: {resp.status_code} {resp.text}")
def get_badge(
self,
key: str,
stats: dict,
user: dict,
delta: dict,
is_post: bool = False,
style: str = "flat",
) -> str:
"""获取 Shields.io 徽章 URL (包含增量显示)"""
import urllib.parse
gist_user = "Fu-Jie"
def _fmt_delta(k: str) -> str:
val = delta.get(k, 0)
if isinstance(val, dict):
return ""
if val > 0:
return f" <br><sub>(+{val}🚀)</sub>"
return ""
if not self.gist_id:
if is_post:
return "**-**"
val = stats.get(f"total_{key}", 0)
if key == "followers":
val = user.get("followers", 0)
if key == "points":
val = user.get("total_points", 0)
if key == "contributions":
val = user.get("contributions", 0)
if key == "posts":
val = stats.get("total_posts", 0)
if key == "saves":
val = stats.get("total_saves", 0)
if key.startswith("updated"):
return f"🕐 {get_beijing_time().strftime('%Y-%m-%d %H:%M')}"
return f"**{val}**{_fmt_delta(key)}"
raw_url = f"https://gist.githubusercontent.com/{gist_user}/{self.gist_id}/raw/badge_{key}.json"
encoded_url = urllib.parse.quote(raw_url, safe="")
return (
f"![{key}](https://img.shields.io/endpoint?url={encoded_url}&style={style})"
)
def generate_readme_stats(self, stats: dict, lang: str = "zh") -> str:
"""
生成 README 统计区域 (精简版)
Args:
stats: 统计数据
lang: 语言 ("zh" 中文, "en" 英文)
"""
# 获取 Top 6 插件
top_plugins = stats["posts"][:6]
delta = self.get_stat_delta(stats)
def fmt_delta(key: str) -> str:
val = delta.get(key, 0)
if val > 0:
return f" <br><sub>(+{val}🚀)</sub>"
return ""
# 中英文文本
texts = {
"zh": {
"title": "## 📊 社区统计",
"author_header": "| 👤 作者 | 👥 粉丝 | ⭐ 积分 | 🏆 贡献 |",
"header": "| 📝 发布 | ⬇️ 下载 | 👁️ 浏览 | 👍 点赞 | 💾 收藏 |",
"top6_title": "### 🔥 热门插件 Top 6",
"top6_header": "| 排名 | 插件 | 版本 | 下载 | 浏览 | 📅 更新 |",
"full_stats": "*完整统计与趋势图请查看 [社区统计报告](./docs/community-stats.zh.md)*",
},
"en": {
"title": "## 📊 Community Stats",
"author_header": "| 👤 Author | 👥 Followers | ⭐ Points | 🏆 Contributions |",
"header": "| 📝 Posts | ⬇️ Downloads | 👁️ Views | 👍 Upvotes | 💾 Saves |",
"top6_title": "### 🔥 Top 6 Popular Plugins",
"top6_header": "| Rank | Plugin | Version | Downloads | Views | 📅 Updated |",
"full_stats": "*See full stats and charts in [Community Stats Report](./docs/community-stats.md)*",
},
}
t = texts.get(lang, texts["en"])
user = stats.get("user", {})
lines = []
lines.append("<!-- STATS_START -->")
lines.append(t["title"])
updated_key = "updated_zh" if lang == "zh" else "updated"
lines.append(f"> {self.get_badge(updated_key, stats, user, delta)}")
lines.append("")
delta = self.get_stat_delta(stats)
# 作者信息表格
if user:
username = user.get("username", "")
profile_url = user.get("profile_url", "")
lines.append(t["author_header"])
lines.append("| :---: | :---: | :---: | :---: |")
lines.append(
f"| [{username}]({profile_url}) | {self.get_badge('followers', stats, user, delta)} | "
f"{self.get_badge('points', stats, user, delta)} | {self.get_badge('contributions', stats, user, delta)} |"
)
lines.append("")
# 统计面板
lines.append(t["header"])
lines.append("| :---: | :---: | :---: | :---: | :---: |")
lines.append(
f"| {self.get_badge('posts', stats, user, delta)} | {self.get_badge('downloads', stats, user, delta)} | "
f"{self.get_badge('views', stats, user, delta)} | {self.get_badge('upvotes', stats, user, delta)} | {self.get_badge('saves', stats, user, delta)} |"
)
lines.append("")
lines.append("")
# Top 6 热门插件
lines.append(t["top6_title"])
lines.append(t["top6_header"])
lines.append("| :---: | :--- | :---: | :---: | :---: | :---: |")
medals = ["🥇", "🥈", "🥉", "4", "5", "6"]
for i, post in enumerate(top_plugins):
idx = i + 1
medal = medals[i] if i < len(medals) else str(idx)
dl_cell = self.get_badge(f"p{idx}_dl", stats, user, delta, is_post=True)
vw_cell = self.get_badge(f"p{idx}_vw", stats, user, delta, is_post=True)
# 版本号使用动态 Shields.io 徽章
ver_badge = self.get_badge(
f"p{idx}_version", stats, user, delta, is_post=True
)
# 更新时间使用静态 Shields.io 徽章
updated_str = post.get("updated_at", "")
updated_badge = ""
if updated_str:
# 替换 - 为 -- 用于 shields.io url
safe_date = updated_str.replace("-", "--")
updated_badge = f"![updated](https://img.shields.io/badge/{safe_date}-gray?style=flat)"
lines.append(
f"| {medal} | [{post['title']}]({post['url']}) | {ver_badge} | {dl_cell} | {vw_cell} | {updated_badge} |"
)
lines.append("")
# 插入全量趋势图 (Vega-Lite)
activity_chart = self.generate_activity_chart(lang)
if activity_chart:
lines.append(activity_chart)
lines.append("")
lines.append(t["full_stats"])
lines.append("<!-- STATS_END -->")
return "\n".join(lines)
def update_readme(self, stats: dict, readme_path: str, lang: str = "zh"):
"""
更新 README 文件中的统计区域
Args:
stats: 统计数据
readme_path: README 文件路径
lang: 语言 ("zh" 中文, "en" 英文)
"""
import re
# 读取现有内容
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
# 生成新的统计区域
new_stats = self.generate_readme_stats(stats, lang)
# 检查是否已有统计区域
pattern = r"<!-- STATS_START -->.*?<!-- STATS_END -->"
if re.search(pattern, content, re.DOTALL):
# 替换现有区域
content = re.sub(pattern, new_stats, content, flags=re.DOTALL)
else:
# 在简介段落之后插入统计区域
lines = content.split("\n")
insert_pos = 0
found_intro = False
for i, line in enumerate(lines):
if line.startswith("# "):
continue
if line.strip() == "":
continue
if ("English" in line or "中文" in line) and "|" in line:
continue
if not found_intro:
found_intro = True
continue
if line.strip() == "" or line.startswith("#"):
insert_pos = i
break
if insert_pos == 0:
insert_pos = 3
lines.insert(insert_pos, "")
lines.insert(insert_pos + 1, new_stats)
lines.insert(insert_pos + 2, "")
content = "\n".join(lines)
# 移除旧的底部图表 (如果有的话)
chart_pattern = r"<!-- ACTIVITY_CHART_START -->.*?<!-- ACTIVITY_CHART_END -->"
if re.search(chart_pattern, content, re.DOTALL):
content = re.sub(chart_pattern, "", content, flags=re.DOTALL)
# 清理可能产生的多余空行
content = re.sub(r"\n{3,}", "\n\n", content)
# 写回文件
with open(readme_path, "w", encoding="utf-8") as f:
f.write(content)
print(f"✅ README 已更新: {readme_path}")
def update_docs_chart(self, doc_path: str, lang: str = "zh"):
"""更新文档中的图表"""
import re
if not os.path.exists(doc_path):
return
with open(doc_path, "r", encoding="utf-8") as f:
content = f.read()
# 生成新的图表 Markdown
new_chart = self.generate_activity_chart(lang)
if not new_chart:
return
# 匹配 ### 📈 ... \n\n![...](...)
# 兼容 docs 中使用 Trend 或 Activity 作为 alt text
pattern = r"(### 📈.*?\n)(!\[.*?\]\(.*?\))"
if re.search(pattern, content, re.DOTALL):
# generate_activity_chart 返回的是完整块: ### 📈 Title\n![Activity](url)
# 我们直接用新块替换整个旧块
content = re.sub(pattern, new_chart, content, flags=re.DOTALL)
with open(doc_path, "w", encoding="utf-8") as f:
f.write(content)
print(f"✅ 文档图表已更新: {doc_path}")
def upload_chart_svg(self):
"""生成 Vega-Lite SVG 并上传到 Gist (作为独立文件)"""
print("🚀 Starting chart SVG generation process...")
if not (self.gist_token and self.gist_id):
print("⚠️ Skipping chart upload: GIST_TOKEN or GIST_ID missing")
return
history = self.load_history()
print(f"📊 History records loaded: {len(history)}")
if len(history) < 1:
print("⚠️ Skipping chart upload: no history")
return
# 准备数据点
values = []
for item in history:
values.append({"date": item["date"], "downloads": item["total_downloads"]})
# Vega-Lite Spec
vl_spec = {
"$schema": "https://vega.github.io/schema/vega-lite/v5.json",
"description": "Total Downloads Trend",
"width": 800,
"height": 200,
"padding": 5,
"background": "transparent",
"config": {
"view": {"stroke": "transparent"},
"axis": {"domain": False, "grid": False},
},
"data": {"values": values},
"mark": {
"type": "area",
"line": {"color": "#2563eb"},
"color": {
"x1": 1,
"y1": 1,
"x2": 1,
"y2": 0,
"gradient": "linear",
"stops": [
{"offset": 0, "color": "white"},
{"offset": 1, "color": "#2563eb"},
],
},
},
"encoding": {
"x": {
"field": "date",
"type": "temporal",
"axis": {"format": "%m-%d", "title": None, "labelColor": "#666"},
},
"y": {
"field": "downloads",
"type": "quantitative",
"axis": {"title": None, "labelColor": "#666"},
},
},
}
try:
# 1. 使用 POST 请求 Kroki (避免 URL 过长问题)
json_spec = json.dumps(vl_spec)
kroki_url = "https://kroki.io/vegalite/svg"
print(f"📥 Generating chart via Kroki (POST)...")
resp = requests.post(kroki_url, data=json_spec)
if resp.status_code != 200:
print(f"⚠️ Kroki request failed: {resp.status_code}")
# 尝试打印一点错误信息
print(f"Response: {resp.text[:200]}")
return
svg_content = resp.text
print(f"✅ Kroki SVG generated ({len(svg_content)} bytes)")
# 3. 上传到 Gist
url = f"https://api.github.com/gists/{self.gist_id}"
headers = {"Authorization": f"token {self.gist_token}"}
payload = {"files": {"chart.svg": {"content": svg_content}}}
resp = requests.patch(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"✅ 图表 SVG 已同步至 Gist: chart.svg")
else:
print(f"⚠️ Gist upload failed: {resp.status_code} {resp.text[:200]}")
except Exception as e:
print(f"⚠️ 上传图表失败: {e}")
def generate_activity_chart(self, lang: str = "zh") -> str:
"""生成 Markdown 图表链接 (使用 Gist Raw URL固定链接)"""
if not self.gist_id:
return ""
title = "Total Downloads Trend" if lang == "en" else "总下载量累计趋势"
# 使用不带 commit hash 的 raw 链接 (指向最新版)
# 添加时间戳参数避免 GitHub 缓存太久
# 注意README 中如果不加时间戳GitHub 可能会缓存图片。
# 但我们希望 README 不变。GitHub 的 camo 缓存机制比较激进。
# 这里的权衡是:要么每天 commit 改时间戳,要么忍受一定的缓存延迟。
# 实际上 GitHub 对 raw.githubusercontent.com 的缓存大概是 5 分钟 (对于 gist)。
# 而 camo (github user content proxy) 可能会缓存更久。
# 我们可以用 purge 缓存的方法,或者接受这个延迟。
# 对用户来说,昨天的图表和今天的图表区别不大,延迟一天都无所谓。
# 使用 cache-control: no-cache 的策略通常对 camo 无效。
# 最佳策略是:链接本身不带 query param (保证 README 文本不变)
# 相信 GitHub 会最终更新它。
gist_user = (
"Fu-Jie" # Replace with actual username if needed, or parse from somewhere
)
# 更好的方式是用 gist_id 直接访问 (不需要用户名,但 Raw 需要)
# 格式: https://gist.githubusercontent.com/<user>/<id>/raw/chart.svg
url = f"https://gist.githubusercontent.com/{gist_user}/{self.gist_id}/raw/chart.svg"
return f"### 📈 {title}\n![Activity]({url})"
def main():
"""CLI entry point."""
# Load runtime config
api_key = os.getenv("OPENWEBUI_API_KEY")
user_id = os.getenv("OPENWEBUI_USER_ID")
if not api_key:
print("❌ Error: OPENWEBUI_API_KEY is not set")
print("Please set environment variable:")
print(" export OPENWEBUI_API_KEY='your_api_key_here'")
return 1
if not user_id:
print(" OPENWEBUI_USER_ID not set, attempting auto-resolve via API key...")
# Gist config (optional, for badges/history sync)
gist_token = os.getenv("GIST_TOKEN")
gist_id = os.getenv("GIST_ID")
# Initialize client
stats_client = OpenWebUIStats(api_key, user_id, gist_token, gist_id)
if not stats_client.user_id:
stats_client.resolve_user_id()
if not stats_client.user_id:
print("❌ Error: failed to auto-resolve OPENWEBUI_USER_ID")
print("Please set environment variable:")
print(" export OPENWEBUI_USER_ID='your_user_id_here'")
print("\nTip: user id is the 'id' field returned by /api/v1/auths/")
print(" e.g. b15d1348-4347-42b4-b815-e053342d6cb0")
return 1
print(f"🔍 User ID: {stats_client.user_id}")
if gist_id:
print(f"📦 Gist storage enabled: {gist_id}")
# Fetch posts
print("📥 Fetching posts...")
posts = stats_client.get_all_posts()
print(f"✅ Retrieved {len(posts)} posts")
# Build stats
stats = stats_client.generate_stats(posts)
# Save history snapshot
stats_client.save_history(stats)
# Print terminal report
stats_client.print_stats(stats)
# Save markdown reports (zh/en)
script_dir = Path(__file__).parent.parent
# Chinese report
md_zh_path = script_dir / "docs" / "community-stats.zh.md"
md_zh_content = stats_client.generate_markdown(stats, lang="zh")
with open(md_zh_path, "w", encoding="utf-8") as f:
f.write(md_zh_content)
print(f"\n✅ Chinese report saved to: {md_zh_path}")
# English report
md_en_path = script_dir / "docs" / "community-stats.md"
md_en_content = stats_client.generate_markdown(stats, lang="en")
with open(md_en_path, "w", encoding="utf-8") as f:
f.write(md_en_content)
print(f"✅ English report saved to: {md_en_path}")
# Save JSON snapshot
json_path = script_dir / "docs" / "community-stats.json"
stats_client.save_json(stats, str(json_path))
# Generate Shields.io endpoint JSON (dynamic badges)
badges_dir = script_dir / "docs" / "badges"
# Generate badges
stats_client.generate_shields_endpoints(stats, str(badges_dir))
# Generate and upload SVG chart (if Gist is configured)
stats_client.upload_chart_svg()
# Update README files
readme_path = script_dir / "README.md"
readme_cn_path = script_dir / "README_CN.md"
stats_client.update_readme(stats, str(readme_path), lang="en")
stats_client.update_readme(stats, str(readme_cn_path), lang="zh")
# Update charts in docs pages
stats_client.update_docs_chart(str(md_en_path), lang="en")
stats_client.update_docs_chart(str(md_zh_path), lang="zh")
return 0
if __name__ == "__main__":
exit(main())