refactor: create OpenWebUICommunityClient class to unify API operations

This commit is contained in:
fujie
2026-01-08 00:44:25 +08:00
parent 54cc10bb41
commit cd3e7309a8
4 changed files with 455 additions and 282 deletions

View File

@@ -1,3 +1,8 @@
"""
Fetch remote plugin versions from OpenWebUI Community
获取远程插件版本信息
"""
import json import json
import os import os
import sys import sys
@@ -5,22 +10,17 @@ import sys
# Add current directory to path # Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try: from openwebui_community_client import get_client
from openwebui_stats import OpenWebUIStats
except ImportError:
print("Error: openwebui_stats.py not found.")
sys.exit(1)
def main(): def main():
# Try to get token from env try:
token = os.environ.get("OPENWEBUI_API_KEY") client = get_client()
if not token: except ValueError as e:
print("Error: OPENWEBUI_API_KEY environment variable not set.") print(f"Error: {e}")
sys.exit(1) sys.exit(1)
print("Fetching remote plugins from OpenWebUI...") print("Fetching remote plugins from OpenWebUI Community...")
client = OpenWebUIStats(token)
try: try:
posts = client.get_all_posts() posts = client.get_all_posts()
except Exception as e: except Exception as e:
@@ -29,9 +29,6 @@ def main():
formatted_plugins = [] formatted_plugins = []
for post in posts: for post in posts:
# Save the full raw post object to ensure we have "compliant update json data"
# We inject a 'type' field just for the comparison script to know it's remote,
# but otherwise keep the structure identical to the API response.
post["type"] = "remote_plugin" post["type"] = "remote_plugin"
formatted_plugins.append(post) formatted_plugins.append(post)

View File

@@ -0,0 +1,374 @@
"""
OpenWebUI Community Client
统一封装所有与 OpenWebUI 官方社区 (openwebui.com) 的 API 交互。
功能:
- 获取用户发布的插件/帖子
- 更新插件内容和元数据
- 版本比较
- 同步插件 ID
使用方法:
from openwebui_community_client import OpenWebUICommunityClient
client = OpenWebUICommunityClient(api_key="your_api_key")
posts = client.get_all_posts()
"""
import os
import re
import json
import base64
import requests
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List, Any, Tuple
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
class OpenWebUICommunityClient:
"""OpenWebUI 官方社区 API 客户端"""
BASE_URL = "https://api.openwebui.com/api/v1"
def __init__(self, api_key: str, user_id: Optional[str] = None):
"""
初始化客户端
Args:
api_key: OpenWebUI API Key (JWT Token)
user_id: 用户 ID如果为 None 则从 token 中解析
"""
self.api_key = api_key
self.user_id = user_id or self._parse_user_id_from_token(api_key)
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _parse_user_id_from_token(self, token: str) -> Optional[str]:
"""从 JWT Token 中解析用户 ID"""
try:
parts = token.split(".")
if len(parts) >= 2:
payload = parts[1]
# 添加 padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
data = json.loads(decoded)
return data.get("id") or data.get("sub")
except Exception:
pass
return None
# ========== 帖子/插件获取 ==========
def get_user_posts(self, sort: str = "new", page: int = 1) -> List[Dict]:
"""
获取用户发布的帖子列表
Args:
sort: 排序方式 (new/top/hot)
page: 页码
Returns:
帖子列表
"""
url = f"{self.BASE_URL}/posts/user/{self.user_id}?sort={sort}&page={page}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_all_posts(self, sort: str = "new") -> List[Dict]:
"""获取所有帖子(自动分页)"""
all_posts = []
page = 1
while True:
posts = self.get_user_posts(sort=sort, page=page)
if not posts:
break
all_posts.extend(posts)
page += 1
return all_posts
def get_post(self, post_id: str) -> Optional[Dict]:
"""
获取单个帖子详情
Args:
post_id: 帖子 ID
Returns:
帖子数据,如果不存在返回 None
"""
try:
url = f"{self.BASE_URL}/posts/{post_id}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
# ========== 帖子/插件更新 ==========
def update_post(self, post_id: str, post_data: Dict) -> bool:
"""
更新帖子
Args:
post_id: 帖子 ID
post_data: 完整的帖子数据
Returns:
是否成功
"""
url = f"{self.BASE_URL}/posts/{post_id}/update"
response = requests.post(url, headers=self.headers, json=post_data)
response.raise_for_status()
return True
def update_plugin(
self,
post_id: str,
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> bool:
"""
更新插件(代码 + README + 元数据)
Args:
post_id: 帖子 ID
source_code: 插件源代码
readme_content: README 内容(用于社区页面展示)
metadata: 插件元数据title, version, description 等)
Returns:
是否成功
"""
post_data = self.get_post(post_id)
if not post_data:
return False
# 确保结构存在
if "data" not in post_data:
post_data["data"] = {}
if "function" not in post_data["data"]:
post_data["data"]["function"] = {}
if "meta" not in post_data["data"]["function"]:
post_data["data"]["function"]["meta"] = {}
if "manifest" not in post_data["data"]["function"]["meta"]:
post_data["data"]["function"]["meta"]["manifest"] = {}
# 更新源代码
post_data["data"]["function"]["content"] = source_code
# 更新 README社区页面展示内容
if readme_content:
post_data["content"] = readme_content
# 更新元数据
if metadata:
post_data["data"]["function"]["meta"]["manifest"].update(metadata)
if "title" in metadata:
post_data["title"] = metadata["title"]
post_data["data"]["function"]["name"] = metadata["title"]
if "description" in metadata:
post_data["data"]["function"]["meta"]["description"] = metadata[
"description"
]
return self.update_post(post_id, post_data)
# ========== 版本比较 ==========
def get_remote_version(self, post_id: str) -> Optional[str]:
"""
获取远程插件版本
Args:
post_id: 帖子 ID
Returns:
版本号,如果不存在返回 None
"""
post_data = self.get_post(post_id)
if not post_data:
return None
return (
post_data.get("data", {})
.get("function", {})
.get("meta", {})
.get("manifest", {})
.get("version")
)
def version_needs_update(self, post_id: str, local_version: str) -> bool:
"""
检查是否需要更新
Args:
post_id: 帖子 ID
local_version: 本地版本号
Returns:
如果本地版本与远程不同,返回 True
"""
remote_version = self.get_remote_version(post_id)
if not remote_version:
return True # 远程不存在,需要更新
return local_version != remote_version
# ========== 插件发布 ==========
def publish_plugin_from_file(
self, file_path: str, force: bool = False
) -> Tuple[bool, str]:
"""
从文件发布插件
Args:
file_path: 插件文件路径
force: 是否强制更新(忽略版本检查)
Returns:
(是否成功, 消息)
"""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
metadata = self._parse_frontmatter(content)
if not metadata:
return False, "No frontmatter found"
post_id = metadata.get("openwebui_id") or metadata.get("post_id")
if not post_id:
return False, "No openwebui_id found"
local_version = metadata.get("version")
# 版本检查
if not force and local_version:
if not self.version_needs_update(post_id, local_version):
return True, f"Skipped: version {local_version} matches remote"
# 查找 README
readme_content = self._find_readme(file_path)
# 更新
success = self.update_plugin(
post_id=post_id,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
)
if success:
return True, f"Updated to version {local_version}"
return False, "Update failed"
def _parse_frontmatter(self, content: str) -> Dict[str, str]:
"""解析插件文件的 frontmatter"""
match = re.search(r'^\s*"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
match = re.search(r'"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
return {}
frontmatter = match.group(1)
meta = {}
for line in frontmatter.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
meta[key.strip()] = value.strip()
return meta
def _find_readme(self, plugin_file_path: str) -> Optional[str]:
"""查找插件对应的 README 文件"""
plugin_dir = os.path.dirname(plugin_file_path)
base_name = os.path.basename(plugin_file_path).lower()
# 确定优先顺序
if base_name.endswith("_cn.py"):
readme_files = ["README_CN.md", "README.md"]
else:
readme_files = ["README.md", "README_CN.md"]
for readme_name in readme_files:
readme_path = os.path.join(plugin_dir, readme_name)
if os.path.exists(readme_path):
with open(readme_path, "r", encoding="utf-8") as f:
return f.read()
return None
# ========== 统计功能 ==========
def generate_stats(self, posts: List[Dict]) -> Dict:
"""
生成统计数据
Args:
posts: 帖子列表
Returns:
统计数据字典
"""
stats = {
"total_posts": len(posts),
"total_downloads": 0,
"total_likes": 0,
"posts_by_type": {},
"posts_detail": [],
"generated_at": datetime.now(BEIJING_TZ).isoformat(),
}
for post in posts:
downloads = post.get("downloadCount", 0)
likes = post.get("likeCount", 0)
post_type = post.get("type", "unknown")
stats["total_downloads"] += downloads
stats["total_likes"] += likes
stats["posts_by_type"][post_type] = (
stats["posts_by_type"].get(post_type, 0) + 1
)
stats["posts_detail"].append(
{
"id": post.get("id"),
"title": post.get("title"),
"type": post_type,
"downloads": downloads,
"likes": likes,
"created_at": post.get("createdAt"),
"updated_at": post.get("updatedAt"),
}
)
# 按下载量排序
stats["posts_detail"].sort(key=lambda x: x["downloads"], reverse=True)
return stats
# 便捷函数
def get_client(api_key: Optional[str] = None) -> OpenWebUICommunityClient:
"""
获取客户端实例
Args:
api_key: API Key如果为 None 则从环境变量获取
Returns:
OpenWebUICommunityClient 实例
"""
key = api_key or os.environ.get("OPENWEBUI_API_KEY")
if not key:
raise ValueError("OPENWEBUI_API_KEY not set")
return OpenWebUICommunityClient(key)

View File

@@ -1,244 +1,41 @@
"""
Publish plugins to OpenWebUI Community
使用 OpenWebUICommunityClient 发布插件到官方社区
用法:
python scripts/publish_plugin.py # 只更新有版本变化的插件
python scripts/publish_plugin.py --force # 强制更新所有插件
"""
import os import os
import sys import sys
import json
import requests
import re import re
def parse_frontmatter(content):
"""Extracts metadata from the python file docstring."""
# Allow leading whitespace and handle potential shebangs
match = re.search(r'^\s*"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
# Fallback for files starting with comments or shebangs
match = re.search(r'"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
return {}
frontmatter = match.group(1)
meta = {}
for line in frontmatter.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
meta[key.strip()] = value.strip()
return meta
def sync_frontmatter(file_path, content, meta, post_data):
"""Syncs remote metadata back to local file frontmatter."""
changed = False
new_meta = meta.copy()
# 1. Sync ID
if "openwebui_id" not in new_meta and "post_id" not in new_meta:
new_meta["openwebui_id"] = post_data.get("id")
changed = True
# 2. Sync Icon URL (often set in UI)
manifest = (
post_data.get("data", {})
.get("function", {})
.get("meta", {})
.get("manifest", {})
)
if "icon_url" not in new_meta and manifest.get("icon_url"):
new_meta["icon_url"] = manifest.get("icon_url")
changed = True
# 3. Sync other fields if missing locally
for field in ["author", "author_url", "funding_url"]:
if field not in new_meta and manifest.get(field):
new_meta[field] = manifest.get(field)
changed = True
if changed:
print(f" Syncing metadata back to {os.path.basename(file_path)}...")
# Reconstruct frontmatter
# We need to replace the content inside the first """ ... """
# This is a bit fragile with regex but sufficient for standard files
def replacement(match):
lines = []
# Keep existing description or comments if we can't parse them easily?
# Actually, let's just reconstruct the key-values we know
# and try to preserve the description if it was at the end
# Simple approach: Rebuild the whole block based on new_meta
# This might lose comments inside the frontmatter, but standard format is simple keys
# Try to preserve order: title, author, ..., version, ..., description
ordered_keys = [
"title",
"author",
"author_url",
"funding_url",
"version",
"openwebui_id",
"icon_url",
"requirements",
"description",
]
block = ['"""']
# Add known keys in order
for k in ordered_keys:
if k in new_meta:
block.append(f"{k}: {new_meta[k]}")
# Add any other custom keys
for k, v in new_meta.items():
if k not in ordered_keys:
block.append(f"{k}: {v}")
block.append('"""')
return "\n".join(block)
new_content = re.sub(
r'^"""\n(.*?)\n"""', replacement, content, count=1, flags=re.DOTALL
)
# If regex didn't match (e.g. leading whitespace), try with whitespace
if new_content == content:
new_content = re.sub(
r'^\s*"""\n(.*?)\n"""', replacement, content, count=1, flags=re.DOTALL
)
if new_content != content:
with open(file_path, "w", encoding="utf-8") as f:
f.write(new_content)
return new_content # Return updated content
return content
import argparse import argparse
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def update_plugin(file_path, post_id, token, force=False): from openwebui_community_client import OpenWebUICommunityClient, get_client
print(f"Processing {os.path.basename(file_path)} (ID: {post_id})...")
def find_plugins_with_id(plugins_dir: str) -> list:
"""查找所有带 openwebui_id 的插件文件"""
plugins = []
for root, _, files in os.walk(plugins_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, "r", encoding="utf-8") as f: with open(file_path, "r", encoding="utf-8") as f:
content = f.read() content = f.read(2000) # 只读前 2000 字符检查 ID
meta = parse_frontmatter(content) id_match = re.search(
if not meta: r"(?:openwebui_id|post_id):\s*([a-z0-9-]+)", content
print(f" Skipping: No frontmatter found.")
return False
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
# 1. Fetch existing post
try:
response = requests.get(
f"https://api.openwebui.com/api/v1/posts/{post_id}", headers=headers
) )
response.raise_for_status() if id_match:
post_data = response.json() plugins.append(
except Exception as e: {"file_path": file_path, "post_id": id_match.group(1).strip()}
print(f" Error fetching post: {e}")
return False
# 2. Check Version (Skip if unchanged)
if not force:
remote_manifest = (
post_data.get("data", {})
.get("function", {})
.get("meta", {})
.get("manifest", {})
) )
remote_version = remote_manifest.get("version") return plugins
local_version = meta.get("version")
if local_version and remote_version and local_version == remote_version:
print(f" ⏭️ Skipping: Local version ({local_version}) matches remote.")
return True
# 3. Sync Metadata back to local file (Optional, mostly for local dev)
try:
content = sync_frontmatter(file_path, content, meta, post_data)
# Re-parse meta in case it changed
meta = parse_frontmatter(content)
except Exception as e:
print(f" Warning: Failed to sync local metadata: {e}")
# 4. Update ONLY Content and Manifest
try:
# Ensure structure exists before populating nested fields
if "data" not in post_data:
post_data["data"] = {}
if "function" not in post_data["data"]:
post_data["data"]["function"] = {}
if "meta" not in post_data["data"]["function"]:
post_data["data"]["function"]["meta"] = {}
if "manifest" not in post_data["data"]["function"]["meta"]:
post_data["data"]["function"]["meta"]["manifest"] = {}
# Update 1: The Source Code (Inner Content)
post_data["data"]["function"]["content"] = content
# Update 2: The Post Body/README (Outer Content)
# Try to find a matching README file
plugin_dir = os.path.dirname(file_path)
base_name = os.path.basename(file_path).lower()
readme_content = None
# Determine preferred README filename
readme_files = []
if base_name.endswith("_cn.py"):
readme_files = ["README_CN.md", "README.md"]
else:
readme_files = ["README.md", "README_CN.md"]
for readme_name in readme_files:
readme_path = os.path.join(plugin_dir, readme_name)
if os.path.exists(readme_path):
try:
with open(readme_path, "r", encoding="utf-8") as f:
readme_content = f.read()
print(f" Using README: {readme_name}")
break
except Exception as e:
print(f" Error reading {readme_name}: {e}")
if readme_content:
post_data["content"] = readme_content
elif "description" in meta:
post_data["content"] = meta["description"]
else:
post_data["content"] = ""
# Update Manifest (Metadata)
post_data["data"]["function"]["meta"]["manifest"].update(meta)
# Sync top-level fields for consistency
if "title" in meta:
post_data["title"] = meta["title"]
post_data["data"]["function"]["name"] = meta["title"]
if "description" in meta:
post_data["data"]["function"]["meta"]["description"] = meta["description"]
except Exception as e:
print(f" Error preparing update: {e}")
return False
# 5. Submit Update
try:
response = requests.post(
f"https://api.openwebui.com/api/v1/posts/{post_id}/update",
headers=headers,
json=post_data,
)
response.raise_for_status()
print(f" ✅ Success! Updated to version {meta.get('version', 'unknown')}")
return True
except Exception as e:
print(f" ❌ Failed: {e}")
return False
def main(): def main():
@@ -248,45 +45,44 @@ def main():
) )
args = parser.parse_args() args = parser.parse_args()
token = os.environ.get("OPENWEBUI_API_KEY") try:
if not token: client = get_client()
print("Error: OPENWEBUI_API_KEY not set.") except ValueError as e:
print(f"Error: {e}")
sys.exit(1) sys.exit(1)
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
plugins_dir = os.path.join(base_dir, "plugins") plugins_dir = os.path.join(base_dir, "plugins")
count = 0 plugins = find_plugins_with_id(plugins_dir)
print(f"Found {len(plugins)} plugins with OpenWebUI ID.\n")
updated = 0
skipped = 0 skipped = 0
# Walk through plugins directory failed = 0
for root, _, files in os.walk(plugins_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
# Check for ID in file content without full parse first for plugin in plugins:
with open(file_path, "r", encoding="utf-8") as f: file_path = plugin["file_path"]
content = f.read( file_name = os.path.basename(file_path)
2000 post_id = plugin["post_id"]
) # Read first 2000 chars is enough for frontmatter
# Simple regex to find ID print(f"Processing {file_name} (ID: {post_id})...")
id_match = re.search(
r"(?:openwebui_id|post_id):\s*([a-z0-9-]+)", content
)
if id_match: success, message = client.publish_plugin_from_file(file_path, force=args.force)
post_id = id_match.group(1).strip()
if update_plugin(file_path, post_id, token, force=args.force): if success:
count += 1 if "Skipped" in message:
print(f" ⏭️ {message}")
skipped += 1
else: else:
# If update_plugin returns False (error) or True (skip), we might want to track differently print(f"{message}")
# But current update_plugin returns True for Skip too. updated += 1
# Let's refine update_plugin return value if needed, but for now else:
# we can just rely on the logs. print(f"{message}")
pass failed += 1
print(f"\nFinished processing plugins.") print(f"\n{'='*50}")
print(f"Finished: {updated} updated, {skipped} skipped, {failed} failed")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,3 +1,8 @@
"""
Sync OpenWebUI Post IDs to local plugin files
同步远程插件 ID 到本地文件
"""
import os import os
import sys import sys
import re import re
@@ -6,11 +11,12 @@ import difflib
# Add current directory to path # Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from openwebui_community_client import get_client
try: try:
from openwebui_stats import OpenWebUIStats
from extract_plugin_versions import scan_plugins_directory from extract_plugin_versions import scan_plugins_directory
except ImportError: except ImportError:
print("Error: Helper scripts not found.") print("Error: extract_plugin_versions.py not found.")
sys.exit(1) sys.exit(1)
@@ -60,13 +66,13 @@ def insert_id_into_file(file_path, post_id):
def main(): def main():
token = os.environ.get("OPENWEBUI_API_KEY") try:
if not token: client = get_client()
print("Error: OPENWEBUI_API_KEY environment variable not set.") except ValueError as e:
print(f"Error: {e}")
sys.exit(1) sys.exit(1)
print("Fetching remote posts...") print("Fetching remote posts from OpenWebUI Community...")
client = OpenWebUIStats(token)
remote_posts = client.get_all_posts() remote_posts = client.get_all_posts()
print(f"Fetched {len(remote_posts)} remote posts.") print(f"Fetched {len(remote_posts)} remote posts.")