Files
Fu-Jie_openwebui-extensions/scripts/openwebui_community_client.py

650 lines
20 KiB
Python
Raw Normal View History

"""
OpenWebUI Community Client
统一封装所有与 OpenWebUI 官方社区 (openwebui.com) API 交互
功能
- 获取用户发布的插件/帖子
- 更新插件内容和元数据
- 版本比较
- 同步插件 ID
使用方法
from openwebui_community_client import OpenWebUICommunityClient
client = OpenWebUICommunityClient(api_key="your_api_key")
posts = client.get_all_posts()
"""
import os
import re
import json
import base64
import requests
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List, Any, Tuple
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
class OpenWebUICommunityClient:
"""OpenWebUI 官方社区 API 客户端"""
BASE_URL = "https://api.openwebui.com/api/v1"
def __init__(self, api_key: str, user_id: Optional[str] = None):
"""
初始化客户端
Args:
api_key: OpenWebUI API Key (JWT Token)
user_id: 用户 ID如果为 None 则从 token 中解析
"""
self.api_key = api_key
self.user_id = user_id or self._parse_user_id_from_token(api_key)
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
# 如果没有 user_id尝试通过 API 获取
if not self.user_id:
self.user_id = self._get_user_id_from_api()
def _parse_user_id_from_token(self, token: str) -> Optional[str]:
"""从 JWT Token 中解析用户 ID"""
# sk- 开头的是 API Key无法解析用户 ID
if token.startswith("sk-"):
return None
try:
parts = token.split(".")
if len(parts) >= 2:
payload = parts[1]
# 添加 padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
data = json.loads(decoded)
return data.get("id") or data.get("sub")
except Exception:
pass
return None
def _get_user_id_from_api(self) -> Optional[str]:
"""通过 API 获取当前用户 ID"""
try:
url = f"{self.BASE_URL}/auths/"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
return data.get("id")
except Exception:
return None
# ========== 帖子/插件获取 ==========
def get_user_posts(self, sort: str = "new", page: int = 1) -> List[Dict]:
"""
获取用户发布的帖子列表
Args:
sort: 排序方式 (new/top/hot)
page: 页码
Returns:
帖子列表
"""
url = f"{self.BASE_URL}/posts/users/{self.user_id}?sort={sort}&page={page}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_all_posts(self, sort: str = "new") -> List[Dict]:
"""获取所有帖子(自动分页)"""
all_posts = []
page = 1
while True:
posts = self.get_user_posts(sort=sort, page=page)
if not posts:
break
all_posts.extend(posts)
page += 1
return all_posts
def get_post(self, post_id: str) -> Optional[Dict]:
"""
获取单个帖子详情
Args:
post_id: 帖子 ID
Returns:
帖子数据如果不存在返回 None
"""
try:
url = f"{self.BASE_URL}/posts/{post_id}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
# ========== 帖子/插件创建 ==========
def create_post(
self,
title: str,
content: str,
post_type: str = "function",
data: Optional[Dict] = None,
media: Optional[List[str]] = None,
) -> Optional[Dict]:
"""
创建新帖子
Args:
title: 帖子标题
content: 帖子内容README/描述
post_type: 帖子类型 (function/tool/filter/pipeline)
data: 插件数据结构
media: 图片 URL 列表
Returns:
创建成功返回帖子数据失败返回 None
"""
try:
url = f"{self.BASE_URL}/posts/create"
payload = {
"title": title,
"content": content,
"type": post_type,
"data": data or {},
"media": media or [],
}
print(f" [DEBUG] Payload keys: {list(payload.keys())}")
print(
f" [DEBUG] data.function keys: {list(payload.get('data', {}).get('function', {}).keys()) if payload.get('data') else 'N/A'}"
)
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code != 200:
print(f" [DEBUG] Response status: {response.status_code}")
print(f" [DEBUG] Response body: {response.text[:500]}")
response.raise_for_status()
return response.json()
except Exception as e:
print(f" Error creating post: {e}")
return None
def create_plugin(
self,
title: str,
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
media_urls: Optional[List[str]] = None,
plugin_type: str = "action",
) -> Optional[str]:
"""
创建新插件帖子
Args:
title: 插件标题
source_code: 插件源代码
readme_content: README 内容
metadata: 插件元数据
media_urls: 图片 URL 列表
plugin_type: 插件类型 (action/filter/pipe)
Returns:
创建成功返回帖子 ID失败返回 None
"""
# 构建 function 数据结构
function_data = {
"id": "", # 服务器会生成
"name": title,
"type": plugin_type,
"content": source_code,
"meta": {
"description": metadata.get("description", "") if metadata else "",
"manifest": metadata or {},
},
}
data = {"function": function_data}
result = self.create_post(
title=title,
content=(
readme_content or metadata.get("description", "") if metadata else ""
),
post_type="function",
data=data,
media=media_urls,
)
if result:
return result.get("id")
return None
# ========== 帖子/插件更新 ==========
def update_post(self, post_id: str, post_data: Dict) -> bool:
"""
更新帖子
Args:
post_id: 帖子 ID
post_data: 完整的帖子数据
Returns:
是否成功
"""
url = f"{self.BASE_URL}/posts/{post_id}/update"
response = requests.post(url, headers=self.headers, json=post_data)
response.raise_for_status()
return True
def update_plugin(
self,
post_id: str,
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
media_urls: Optional[List[str]] = None,
) -> bool:
"""
更新插件代码 + README + 元数据 + 图片
Args:
post_id: 帖子 ID
source_code: 插件源代码
readme_content: README 内容用于社区页面展示
metadata: 插件元数据title, version, description
media_urls: 图片 URL 列表
Returns:
是否成功
"""
post_data = self.get_post(post_id)
if not post_data:
return False
# 确保结构存在
if "data" not in post_data:
post_data["data"] = {}
if "function" not in post_data["data"]:
post_data["data"]["function"] = {}
if "meta" not in post_data["data"]["function"]:
post_data["data"]["function"]["meta"] = {}
if "manifest" not in post_data["data"]["function"]["meta"]:
post_data["data"]["function"]["meta"]["manifest"] = {}
# 更新源代码
post_data["data"]["function"]["content"] = source_code
# 更新 README社区页面展示内容
if readme_content:
post_data["content"] = readme_content
# 更新元数据
if metadata:
post_data["data"]["function"]["meta"]["manifest"].update(metadata)
if "title" in metadata:
post_data["title"] = metadata["title"]
post_data["data"]["function"]["name"] = metadata["title"]
if "description" in metadata:
post_data["data"]["function"]["meta"]["description"] = metadata[
"description"
]
# 更新图片
if media_urls:
post_data["media"] = media_urls
return self.update_post(post_id, post_data)
# ========== 图片上传 ==========
def upload_image(self, file_path: str) -> Optional[str]:
"""
上传图片到 OpenWebUI 社区
Args:
file_path: 图片文件路径
Returns:
上传成功后的图片 URL失败返回 None
"""
if not os.path.exists(file_path):
return None
# 获取文件信息
filename = os.path.basename(file_path)
# 根据文件扩展名确定 MIME 类型
ext = os.path.splitext(filename)[1].lower()
mime_types = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
}
content_type = mime_types.get(ext, "application/octet-stream")
try:
with open(file_path, "rb") as f:
files = {"file": (filename, f, content_type)}
# 上传时不使用 JSON Content-Type
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
response = requests.post(
f"{self.BASE_URL}/files/",
headers=headers,
files=files,
)
response.raise_for_status()
result = response.json()
# 返回图片 URL
return result.get("url")
except Exception as e:
print(f" Warning: Failed to upload image: {e}")
return None
# ========== 版本比较 ==========
def get_remote_version(self, post_id: str) -> Optional[str]:
"""
获取远程插件版本
Args:
post_id: 帖子 ID
Returns:
版本号如果不存在返回 None
"""
post_data = self.get_post(post_id)
if not post_data:
return None
return (
post_data.get("data", {})
.get("function", {})
.get("meta", {})
.get("manifest", {})
.get("version")
)
def version_needs_update(self, post_id: str, local_version: str) -> bool:
"""
检查是否需要更新
Args:
post_id: 帖子 ID
local_version: 本地版本号
Returns:
如果本地版本与远程不同返回 True
"""
remote_version = self.get_remote_version(post_id)
if not remote_version:
return True # 远程不存在,需要更新
return local_version != remote_version
# ========== 插件发布 ==========
def publish_plugin_from_file(
self, file_path: str, force: bool = False, auto_create: bool = True
) -> Tuple[bool, str]:
"""
从文件发布插件支持首次创建和更新
Args:
file_path: 插件文件路径
force: 是否强制更新忽略版本检查
auto_create: 如果没有 openwebui_id是否自动创建新帖子
Returns:
(是否成功, 消息)
"""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
metadata = self._parse_frontmatter(content)
if not metadata:
return False, "No frontmatter found"
title = metadata.get("title")
if not title:
return False, "No title in frontmatter"
post_id = metadata.get("openwebui_id") or metadata.get("post_id")
local_version = metadata.get("version")
# 查找 README
readme_content = self._find_readme(file_path)
# 查找并上传图片
media_urls = None
image_path = self._find_image(file_path)
if image_path:
print(f" Found image: {os.path.basename(image_path)}")
image_url = self.upload_image(image_path)
if image_url:
print(f" Uploaded image: {image_url}")
media_urls = [image_url]
# 如果没有 post_id尝试创建新帖子
if not post_id:
if not auto_create:
return False, "No openwebui_id found and auto_create is disabled"
print(f" Creating new post for: {title}")
new_post_id = self.create_plugin(
title=title,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
media_urls=media_urls,
)
if new_post_id:
# 将新 ID 写回本地文件
self._inject_id_to_file(file_path, new_post_id)
return True, f"Created new post (ID: {new_post_id})"
return False, "Failed to create new post"
# 版本检查(仅对更新有效)
if not force and local_version:
if not self.version_needs_update(post_id, local_version):
return True, f"Skipped: version {local_version} matches remote"
# 更新
success = self.update_plugin(
post_id=post_id,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
media_urls=media_urls,
)
if success:
return True, f"Updated to version {local_version}"
return False, "Update failed"
def _parse_frontmatter(self, content: str) -> Dict[str, str]:
"""解析插件文件的 frontmatter"""
match = re.search(r'^\s*"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
match = re.search(r'"""\n(.*?)\n"""', content, re.DOTALL)
if not match:
return {}
frontmatter = match.group(1)
meta = {}
for line in frontmatter.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
meta[key.strip()] = value.strip()
return meta
def _find_readme(self, plugin_file_path: str) -> Optional[str]:
"""查找插件对应的 README 文件"""
plugin_dir = os.path.dirname(plugin_file_path)
base_name = os.path.basename(plugin_file_path).lower()
# 确定优先顺序
if base_name.endswith("_cn.py"):
readme_files = ["README_CN.md", "README.md"]
else:
readme_files = ["README.md", "README_CN.md"]
for readme_name in readme_files:
readme_path = os.path.join(plugin_dir, readme_name)
if os.path.exists(readme_path):
with open(readme_path, "r", encoding="utf-8") as f:
return f.read()
return None
def _find_image(self, plugin_file_path: str) -> Optional[str]:
"""
查找插件对应的图片文件
图片名称需要和插件文件名一致不含扩展名
例如
export_to_word.py -> export_to_word.png / export_to_word.jpg
"""
plugin_dir = os.path.dirname(plugin_file_path)
plugin_name = os.path.splitext(os.path.basename(plugin_file_path))[0]
# 支持的图片格式
image_extensions = [".png", ".jpg", ".jpeg", ".gif", ".webp"]
for ext in image_extensions:
image_path = os.path.join(plugin_dir, plugin_name + ext)
if os.path.exists(image_path):
return image_path
return None
def _inject_id_to_file(self, file_path: str, post_id: str) -> bool:
"""
将新创建的帖子 ID 写回本地插件文件的 frontmatter
Args:
file_path: 插件文件路径
post_id: 新创建的帖子 ID
Returns:
是否成功
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
inserted = False
in_frontmatter = False
for line in lines:
# Check for start/end of frontmatter
if line.strip() == '"""':
if not in_frontmatter:
in_frontmatter = True
else:
in_frontmatter = False
new_lines.append(line)
# Insert after version line
if (
in_frontmatter
and not inserted
and line.strip().startswith("version:")
):
new_lines.append(f"openwebui_id: {post_id}\n")
inserted = True
print(f" Injected openwebui_id: {post_id}")
if inserted:
with open(file_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
return True
print(f" Warning: Could not inject ID (no version line found)")
return False
except Exception as e:
print(f" Error injecting ID to file: {e}")
return False
# ========== 统计功能 ==========
def generate_stats(self, posts: List[Dict]) -> Dict:
"""
生成统计数据
Args:
posts: 帖子列表
Returns:
统计数据字典
"""
stats = {
"total_posts": len(posts),
"total_downloads": 0,
"total_likes": 0,
"posts_by_type": {},
"posts_detail": [],
"generated_at": datetime.now(BEIJING_TZ).isoformat(),
}
for post in posts:
downloads = post.get("downloadCount", 0)
likes = post.get("likeCount", 0)
post_type = post.get("type", "unknown")
stats["total_downloads"] += downloads
stats["total_likes"] += likes
stats["posts_by_type"][post_type] = (
stats["posts_by_type"].get(post_type, 0) + 1
)
stats["posts_detail"].append(
{
"id": post.get("id"),
"title": post.get("title"),
"type": post_type,
"downloads": downloads,
"likes": likes,
"created_at": post.get("createdAt"),
"updated_at": post.get("updatedAt"),
}
)
# 按下载量排序
stats["posts_detail"].sort(key=lambda x: x["downloads"], reverse=True)
return stats
# 便捷函数
def get_client(api_key: Optional[str] = None) -> OpenWebUICommunityClient:
"""
获取客户端实例
Args:
api_key: API Key如果为 None 则从环境变量获取
Returns:
OpenWebUICommunityClient 实例
"""
key = api_key or os.environ.get("OPENWEBUI_API_KEY")
if not key:
raise ValueError("OPENWEBUI_API_KEY not set")
return OpenWebUICommunityClient(key)