Files
Fu-Jie_openwebui-extensions/scripts/openwebui_community_client.py

375 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenWebUI Community Client
统一封装所有与 OpenWebUI 官方社区 (openwebui.com) 的 API 交互。
功能:
- 获取用户发布的插件/帖子
- 更新插件内容和元数据
- 版本比较
- 同步插件 ID
使用方法:
from openwebui_community_client import OpenWebUICommunityClient
client = OpenWebUICommunityClient(api_key="your_api_key")
posts = client.get_all_posts()
"""
import os
import re
import json
import base64
import requests
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List, Any, Tuple
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
class OpenWebUICommunityClient:
"""OpenWebUI 官方社区 API 客户端"""
BASE_URL = "https://api.openwebui.com/api/v1"
def __init__(self, api_key: str, user_id: Optional[str] = None):
"""
初始化客户端
Args:
api_key: OpenWebUI API Key (JWT Token)
user_id: 用户 ID如果为 None 则从 token 中解析
"""
self.api_key = api_key
self.user_id = user_id or self._parse_user_id_from_token(api_key)
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _parse_user_id_from_token(self, token: str) -> Optional[str]:
"""从 JWT Token 中解析用户 ID"""
try:
parts = token.split(".")
if len(parts) >= 2:
payload = parts[1]
# 添加 padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
data = json.loads(decoded)
return data.get("id") or data.get("sub")
except Exception:
pass
return None
# ========== 帖子/插件获取 ==========
def get_user_posts(self, sort: str = "new", page: int = 1) -> List[Dict]:
"""
获取用户发布的帖子列表
Args:
sort: 排序方式 (new/top/hot)
page: 页码
Returns:
帖子列表
"""
url = f"{self.BASE_URL}/posts/user/{self.user_id}?sort={sort}&page={page}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_all_posts(self, sort: str = "new") -> List[Dict]:
"""获取所有帖子(自动分页)"""
all_posts = []
page = 1
while True:
posts = self.get_user_posts(sort=sort, page=page)
if not posts:
break
all_posts.extend(posts)
page += 1
return all_posts
def get_post(self, post_id: str) -> Optional[Dict]:
"""
获取单个帖子详情
Args:
post_id: 帖子 ID
Returns:
帖子数据,如果不存在返回 None
"""
try:
url = f"{self.BASE_URL}/posts/{post_id}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
# ========== 帖子/插件更新 ==========
def update_post(self, post_id: str, post_data: Dict) -> bool:
"""
更新帖子
Args:
post_id: 帖子 ID
post_data: 完整的帖子数据
Returns:
是否成功
"""
url = f"{self.BASE_URL}/posts/{post_id}/update"
response = requests.post(url, headers=self.headers, json=post_data)
response.raise_for_status()
return True
def update_plugin(
self,
post_id: str,
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> bool:
"""
更新插件(代码 + README + 元数据)
Args:
post_id: 帖子 ID
source_code: 插件源代码
readme_content: README 内容(用于社区页面展示)
metadata: 插件元数据title, version, description 等)
Returns:
是否成功
"""
post_data = self.get_post(post_id)
if not post_data:
return False
# 确保结构存在
if "data" not in post_data:
post_data["data"] = {}
if "function" not in post_data["data"]:
post_data["data"]["function"] = {}
if "meta" not in post_data["data"]["function"]:
post_data["data"]["function"]["meta"] = {}
if "manifest" not in post_data["data"]["function"]["meta"]:
post_data["data"]["function"]["meta"]["manifest"] = {}
# 更新源代码
post_data["data"]["function"]["content"] = source_code
# 更新 README社区页面展示内容
if readme_content:
post_data["content"] = readme_content
# 更新元数据
if metadata:
post_data["data"]["function"]["meta"]["manifest"].update(metadata)
if "title" in metadata:
post_data["title"] = metadata["title"]
post_data["data"]["function"]["name"] = metadata["title"]
if "description" in metadata:
post_data["data"]["function"]["meta"]["description"] = metadata[
"description"
]
return self.update_post(post_id, post_data)
# ========== 版本比较 ==========
def get_remote_version(self, post_id: str) -> Optional[str]:
"""
获取远程插件版本
Args:
post_id: 帖子 ID
Returns:
版本号,如果不存在返回 None
"""
post_data = self.get_post(post_id)
if not post_data:
return None
return (
post_data.get("data", {})
.get("function", {})
.get("meta", {})
.get("manifest", {})
.get("version")
)
def version_needs_update(self, post_id: str, local_version: str) -> bool:
"""
检查是否需要更新
Args:
post_id: 帖子 ID
local_version: 本地版本号
Returns:
如果本地版本与远程不同,返回 True
"""
remote_version = self.get_remote_version(post_id)
if not remote_version:
return True # 远程不存在,需要更新
return local_version != remote_version
# ========== 插件发布 ==========
def publish_plugin_from_file(
self, file_path: str, force: bool = False
) -> Tuple[bool, str]:
"""
从文件发布插件
Args:
file_path: 插件文件路径
force: 是否强制更新(忽略版本检查)
Returns:
(是否成功, 消息)
"""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
metadata = self._parse_frontmatter(content)
if not metadata:
return False, "No frontmatter found"
post_id = metadata.get("openwebui_id") or metadata.get("post_id")
if not post_id:
return False, "No openwebui_id found"
local_version = metadata.get("version")
# 版本检查
if not force and local_version:
if not self.version_needs_update(post_id, local_version):
return True, f"Skipped: version {local_version} matches remote"
# 查找 README
readme_content = self._find_readme(file_path)
# 更新
success = self.update_plugin(
post_id=post_id,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
)
if success:
return True, f"Updated to version {local_version}"
return False, "Update failed"
def _parse_frontmatter(self, content: str) -> Dict[str, str]:
"""解析插件文件的 frontmatter"""
match = re.search(r'^\s*"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
match = re.search(r'"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
return {}
frontmatter = match.group(1)
meta = {}
for line in frontmatter.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
meta[key.strip()] = value.strip()
return meta
def _find_readme(self, plugin_file_path: str) -> Optional[str]:
"""查找插件对应的 README 文件"""
plugin_dir = os.path.dirname(plugin_file_path)
base_name = os.path.basename(plugin_file_path).lower()
# 确定优先顺序
if base_name.endswith("_cn.py"):
readme_files = ["README_CN.md", "README.md"]
else:
readme_files = ["README.md", "README_CN.md"]
for readme_name in readme_files:
readme_path = os.path.join(plugin_dir, readme_name)
if os.path.exists(readme_path):
with open(readme_path, "r", encoding="utf-8") as f:
return f.read()
return None
# ========== 统计功能 ==========
def generate_stats(self, posts: List[Dict]) -> Dict:
"""
生成统计数据
Args:
posts: 帖子列表
Returns:
统计数据字典
"""
stats = {
"total_posts": len(posts),
"total_downloads": 0,
"total_likes": 0,
"posts_by_type": {},
"posts_detail": [],
"generated_at": datetime.now(BEIJING_TZ).isoformat(),
}
for post in posts:
downloads = post.get("downloadCount", 0)
likes = post.get("likeCount", 0)
post_type = post.get("type", "unknown")
stats["total_downloads"] += downloads
stats["total_likes"] += likes
stats["posts_by_type"][post_type] = (
stats["posts_by_type"].get(post_type, 0) + 1
)
stats["posts_detail"].append(
{
"id": post.get("id"),
"title": post.get("title"),
"type": post_type,
"downloads": downloads,
"likes": likes,
"created_at": post.get("createdAt"),
"updated_at": post.get("updatedAt"),
}
)
# 按下载量排序
stats["posts_detail"].sort(key=lambda x: x["downloads"], reverse=True)
return stats
# 便捷函数
def get_client(api_key: Optional[str] = None) -> OpenWebUICommunityClient:
"""
获取客户端实例
Args:
api_key: API Key如果为 None 则从环境变量获取
Returns:
OpenWebUICommunityClient 实例
"""
key = api_key or os.environ.get("OPENWEBUI_API_KEY")
if not key:
raise ValueError("OPENWEBUI_API_KEY not set")
return OpenWebUICommunityClient(key)