chore: cleanup legacy plugins and add plugin assets

- Remove deprecated summary plugin (replaced by deep-dive)
- Remove js-render-poc experimental plugin
- Add plugin preview images
- Update publish scripts with create_plugin support
This commit is contained in:
fujie
2026-01-08 08:39:21 +08:00
parent 3cc4478dd9
commit 322bd6e167
20 changed files with 584 additions and 3386 deletions

View File

@@ -0,0 +1,133 @@
"""
Download plugin images from OpenWebUI Community
下载远程插件图片到本地目录
"""
import os
import sys
import re
import requests
from urllib.parse import urlparse
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from openwebui_community_client import get_client
def find_local_plugin_by_id(plugins_dir: str, post_id: str) -> str | None:
"""根据 post_id 查找本地插件文件"""
for root, _, files in os.walk(plugins_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000)
id_match = re.search(
r"(?:openwebui_id|post_id):\s*([a-z0-9-]+)", content
)
if id_match and id_match.group(1).strip() == post_id:
return file_path
return None
def download_image(url: str, save_path: str) -> bool:
"""下载图片"""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
with open(save_path, "wb") as f:
f.write(response.content)
return True
except Exception as e:
print(f" Error downloading: {e}")
return False
def get_image_extension(url: str) -> str:
"""从 URL 获取图片扩展名"""
parsed = urlparse(url)
path = parsed.path
ext = os.path.splitext(path)[1].lower()
if ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
return ext
return ".png" # 默认
def main():
try:
client = get_client()
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
plugins_dir = os.path.join(base_dir, "plugins")
print("Fetching remote posts from OpenWebUI Community...")
posts = client.get_all_posts()
print(f"Found {len(posts)} remote posts.\n")
downloaded = 0
skipped = 0
not_found = 0
for post in posts:
post_id = post.get("id")
title = post.get("title", "Unknown")
media = post.get("media", [])
if not media:
continue
# 只取第一张图片
first_media = media[0] if isinstance(media, list) else media
# 处理字典格式 {'url': '...', 'type': 'image'}
if isinstance(first_media, dict):
image_url = first_media.get("url")
else:
image_url = first_media
if not image_url:
continue
print(f"Processing: {title}")
print(f" Image URL: {image_url}")
# 查找对应的本地插件
local_plugin = find_local_plugin_by_id(plugins_dir, post_id)
if not local_plugin:
print(f" ⚠️ No local plugin found for ID: {post_id}")
not_found += 1
continue
# 确定保存路径
plugin_dir = os.path.dirname(local_plugin)
plugin_name = os.path.splitext(os.path.basename(local_plugin))[0]
ext = get_image_extension(image_url)
save_path = os.path.join(plugin_dir, plugin_name + ext)
# 检查是否已存在
if os.path.exists(save_path):
print(f" ⏭️ Image already exists: {os.path.basename(save_path)}")
skipped += 1
continue
# 下载
print(f" Downloading to: {save_path}")
if download_image(image_url, save_path):
print(f" ✅ Downloaded: {os.path.basename(save_path)}")
downloaded += 1
else:
print(f" ❌ Failed to download")
print(f"\n{'='*50}")
print(
f"Finished: {downloaded} downloaded, {skipped} skipped, {not_found} not found locally"
)
if __name__ == "__main__":
main()

View File

@@ -47,9 +47,15 @@ class OpenWebUICommunityClient:
"Content-Type": "application/json",
"Accept": "application/json",
}
# 如果没有 user_id尝试通过 API 获取
if not self.user_id:
self.user_id = self._get_user_id_from_api()
def _parse_user_id_from_token(self, token: str) -> Optional[str]:
"""从 JWT Token 中解析用户 ID"""
# sk- 开头的是 API Key无法解析用户 ID
if token.startswith("sk-"):
return None
try:
parts = token.split(".")
if len(parts) >= 2:
@@ -65,6 +71,17 @@ class OpenWebUICommunityClient:
pass
return None
def _get_user_id_from_api(self) -> Optional[str]:
"""通过 API 获取当前用户 ID"""
try:
url = f"{self.BASE_URL}/auths/"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
return data.get("id")
except Exception:
return None
# ========== 帖子/插件获取 ==========
def get_user_posts(self, sort: str = "new", page: int = 1) -> List[Dict]:
@@ -78,7 +95,7 @@ class OpenWebUICommunityClient:
Returns:
帖子列表
"""
url = f"{self.BASE_URL}/posts/user/{self.user_id}?sort={sort}&page={page}"
url = f"{self.BASE_URL}/posts/users/{self.user_id}?sort={sort}&page={page}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
@@ -115,6 +132,96 @@ class OpenWebUICommunityClient:
return None
raise
# ========== 帖子/插件创建 ==========
def create_post(
self,
title: str,
content: str,
post_type: str = "function",
data: Optional[Dict] = None,
media: Optional[List[str]] = None,
) -> Optional[Dict]:
"""
创建新帖子
Args:
title: 帖子标题
content: 帖子内容README/描述)
post_type: 帖子类型 (function/tool/filter/pipeline)
data: 插件数据结构
media: 图片 URL 列表
Returns:
创建成功返回帖子数据,失败返回 None
"""
try:
url = f"{self.BASE_URL}/posts/create"
payload = {
"title": title,
"content": content,
"type": post_type,
"data": data or {},
"media": media or [],
}
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
print(f" Error creating post: {e}")
return None
def create_plugin(
self,
title: str,
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
media_urls: Optional[List[str]] = None,
plugin_type: str = "action",
) -> Optional[str]:
"""
创建新插件帖子
Args:
title: 插件标题
source_code: 插件源代码
readme_content: README 内容
metadata: 插件元数据
media_urls: 图片 URL 列表
plugin_type: 插件类型 (action/filter/pipe)
Returns:
创建成功返回帖子 ID失败返回 None
"""
# 构建 function 数据结构
function_data = {
"id": "", # 服务器会生成
"name": title,
"type": plugin_type,
"content": source_code,
"meta": {
"description": metadata.get("description", "") if metadata else "",
"manifest": metadata or {},
},
}
data = {"function": function_data}
result = self.create_post(
title=title,
content=(
readme_content or metadata.get("description", "") if metadata else ""
),
post_type="function",
data=data,
media=media_urls,
)
if result:
return result.get("id")
return None
# ========== 帖子/插件更新 ==========
def update_post(self, post_id: str, post_data: Dict) -> bool:
@@ -139,15 +246,17 @@ class OpenWebUICommunityClient:
source_code: str,
readme_content: Optional[str] = None,
metadata: Optional[Dict] = None,
media_urls: Optional[List[str]] = None,
) -> bool:
"""
更新插件(代码 + README + 元数据)
更新插件(代码 + README + 元数据 + 图片
Args:
post_id: 帖子 ID
source_code: 插件源代码
readme_content: README 内容(用于社区页面展示)
metadata: 插件元数据title, version, description 等)
media_urls: 图片 URL 列表
Returns:
是否成功
@@ -184,8 +293,63 @@ class OpenWebUICommunityClient:
"description"
]
# 更新图片
if media_urls:
post_data["media"] = media_urls
return self.update_post(post_id, post_data)
# ========== 图片上传 ==========
def upload_image(self, file_path: str) -> Optional[str]:
"""
上传图片到 OpenWebUI 社区
Args:
file_path: 图片文件路径
Returns:
上传成功后的图片 URL失败返回 None
"""
if not os.path.exists(file_path):
return None
# 获取文件信息
filename = os.path.basename(file_path)
# 根据文件扩展名确定 MIME 类型
ext = os.path.splitext(filename)[1].lower()
mime_types = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
}
content_type = mime_types.get(ext, "application/octet-stream")
try:
with open(file_path, "rb") as f:
files = {"file": (filename, f, content_type)}
# 上传时不使用 JSON Content-Type
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
response = requests.post(
f"{self.BASE_URL}/files/",
headers=headers,
files=files,
)
response.raise_for_status()
result = response.json()
# 返回图片 URL
return result.get("url")
except Exception as e:
print(f" Warning: Failed to upload image: {e}")
return None
# ========== 版本比较 ==========
def get_remote_version(self, post_id: str) -> Optional[str]:
@@ -228,14 +392,15 @@ class OpenWebUICommunityClient:
# ========== 插件发布 ==========
def publish_plugin_from_file(
self, file_path: str, force: bool = False
self, file_path: str, force: bool = False, auto_create: bool = True
) -> Tuple[bool, str]:
"""
从文件发布插件
从文件发布插件(支持首次创建和更新)
Args:
file_path: 插件文件路径
force: 是否强制更新(忽略版本检查)
auto_create: 如果没有 openwebui_id是否自动创建新帖子
Returns:
(是否成功, 消息)
@@ -247,26 +412,58 @@ class OpenWebUICommunityClient:
if not metadata:
return False, "No frontmatter found"
title = metadata.get("title")
if not title:
return False, "No title in frontmatter"
post_id = metadata.get("openwebui_id") or metadata.get("post_id")
if not post_id:
return False, "No openwebui_id found"
local_version = metadata.get("version")
# 版本检查
if not force and local_version:
if not self.version_needs_update(post_id, local_version):
return True, f"Skipped: version {local_version} matches remote"
# 查找 README
readme_content = self._find_readme(file_path)
# 查找并上传图片
media_urls = None
image_path = self._find_image(file_path)
if image_path:
print(f" Found image: {os.path.basename(image_path)}")
image_url = self.upload_image(image_path)
if image_url:
print(f" Uploaded image: {image_url}")
media_urls = [image_url]
# 如果没有 post_id尝试创建新帖子
if not post_id:
if not auto_create:
return False, "No openwebui_id found and auto_create is disabled"
print(f" Creating new post for: {title}")
new_post_id = self.create_plugin(
title=title,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
media_urls=media_urls,
)
if new_post_id:
# 将新 ID 写回本地文件
self._inject_id_to_file(file_path, new_post_id)
return True, f"Created new post (ID: {new_post_id})"
return False, "Failed to create new post"
# 版本检查(仅对更新有效)
if not force and local_version:
if not self.version_needs_update(post_id, local_version):
return True, f"Skipped: version {local_version} matches remote"
# 更新
success = self.update_plugin(
post_id=post_id,
source_code=content,
readme_content=readme_content or metadata.get("description", ""),
metadata=metadata,
media_urls=media_urls,
)
if success:
@@ -307,6 +504,77 @@ class OpenWebUICommunityClient:
return f.read()
return None
def _find_image(self, plugin_file_path: str) -> Optional[str]:
"""
查找插件对应的图片文件
图片名称需要和插件文件名一致(不含扩展名)
例如:
export_to_word.py -> export_to_word.png / export_to_word.jpg
"""
plugin_dir = os.path.dirname(plugin_file_path)
plugin_name = os.path.splitext(os.path.basename(plugin_file_path))[0]
# 支持的图片格式
image_extensions = [".png", ".jpg", ".jpeg", ".gif", ".webp"]
for ext in image_extensions:
image_path = os.path.join(plugin_dir, plugin_name + ext)
if os.path.exists(image_path):
return image_path
return None
def _inject_id_to_file(self, file_path: str, post_id: str) -> bool:
"""
将新创建的帖子 ID 写回本地插件文件的 frontmatter
Args:
file_path: 插件文件路径
post_id: 新创建的帖子 ID
Returns:
是否成功
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
inserted = False
in_frontmatter = False
for line in lines:
# Check for start/end of frontmatter
if line.strip() == '"""':
if not in_frontmatter:
in_frontmatter = True
else:
in_frontmatter = False
new_lines.append(line)
# Insert after version line
if (
in_frontmatter
and not inserted
and line.strip().startswith("version:")
):
new_lines.append(f"openwebui_id: {post_id}\n")
inserted = True
print(f" Injected openwebui_id: {post_id}")
if inserted:
with open(file_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
return True
print(f" Warning: Could not inject ID (no version line found)")
return False
except Exception as e:
print(f" Error injecting ID to file: {e}")
return False
# ========== 统计功能 ==========
def generate_stats(self, posts: List[Dict]) -> Dict:

View File

@@ -3,8 +3,10 @@ Publish plugins to OpenWebUI Community
使用 OpenWebUICommunityClient 发布插件到官方社区
用法:
python scripts/publish_plugin.py # 更新版本变化的插件
python scripts/publish_plugin.py --force # 强制更新所有插件
python scripts/publish_plugin.py # 更新已发布的插件(版本变化时)
python scripts/publish_plugin.py --force # 强制更新所有已发布的插件
python scripts/publish_plugin.py --new plugins/actions/xxx # 首次发布指定目录的新插件
python scripts/publish_plugin.py --new plugins/actions/xxx --force # 强制发布新插件
"""
import os
@@ -15,34 +17,111 @@ import argparse
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from openwebui_community_client import OpenWebUICommunityClient, get_client
from openwebui_community_client import get_client
def find_plugins_with_id(plugins_dir: str) -> list:
"""查找所有 openwebui_id 的插件文件"""
def find_existing_plugins(plugins_dir: str) -> list:
"""查找所有已发布的插件文件(有 openwebui_id 的"""
plugins = []
for root, _, files in os.walk(plugins_dir):
for file in files:
if file.endswith(".py"):
if file.endswith(".py") and not file.startswith("__"):
file_path = os.path.join(root, file)
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000) # 只读前 2000 字符检查 ID
content = f.read(2000)
id_match = re.search(
r"(?:openwebui_id|post_id):\s*([a-z0-9-]+)", content
)
if id_match:
plugins.append(
{"file_path": file_path, "post_id": id_match.group(1).strip()}
{
"file_path": file_path,
"post_id": id_match.group(1).strip(),
}
)
return plugins
def find_new_plugins_in_dir(target_dir: str) -> list:
"""查找指定目录中没有 openwebui_id 的新插件"""
plugins = []
if not os.path.isdir(target_dir):
print(f"Error: {target_dir} is not a directory")
return plugins
for file in os.listdir(target_dir):
if file.endswith(".py") and not file.startswith("__"):
file_path = os.path.join(target_dir, file)
if not os.path.isfile(file_path):
continue
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(2000)
# 检查是否有 frontmatter (title)
title_match = re.search(r"title:\s*(.+)", content)
if not title_match:
continue
# 检查是否已有 ID
id_match = re.search(r"(?:openwebui_id|post_id):\s*([a-z0-9-]+)", content)
if id_match:
print(f" ⚠️ {file} already has ID, will update instead")
plugins.append(
{
"file_path": file_path,
"title": title_match.group(1).strip(),
"post_id": id_match.group(1).strip(),
"is_new": False,
}
)
else:
plugins.append(
{
"file_path": file_path,
"title": title_match.group(1).strip(),
"post_id": None,
"is_new": True,
}
)
return plugins
def main():
parser = argparse.ArgumentParser(description="Publish plugins to OpenWebUI Market")
parser = argparse.ArgumentParser(
description="Publish plugins to OpenWebUI Market",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Update existing plugins (with version check)
python scripts/publish_plugin.py
# Force update all existing plugins
python scripts/publish_plugin.py --force
# Publish new plugins from a specific directory
python scripts/publish_plugin.py --new plugins/actions/summary
# Preview what would be done
python scripts/publish_plugin.py --new plugins/actions/summary --dry-run
""",
)
parser.add_argument(
"--force", action="store_true", help="Force update even if version matches"
)
parser.add_argument(
"--new",
metavar="DIR",
help="Publish new plugins from the specified directory (required for first-time publishing)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without actually publishing",
)
args = parser.parse_args()
try:
@@ -54,35 +133,99 @@ def main():
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
plugins_dir = os.path.join(base_dir, "plugins")
plugins = find_plugins_with_id(plugins_dir)
print(f"Found {len(plugins)} plugins with OpenWebUI ID.\n")
updated = 0
created = 0
skipped = 0
failed = 0
for plugin in plugins:
file_path = plugin["file_path"]
file_name = os.path.basename(file_path)
post_id = plugin["post_id"]
# 处理新插件发布
if args.new:
target_dir = args.new
if not os.path.isabs(target_dir):
target_dir = os.path.join(base_dir, target_dir)
print(f"Processing {file_name} (ID: {post_id})...")
print(f"🆕 Publishing new plugins from: {target_dir}\n")
new_plugins = find_new_plugins_in_dir(target_dir)
success, message = client.publish_plugin_from_file(file_path, force=args.force)
if not new_plugins:
print("No plugins found in the specified directory.")
return
if success:
if "Skipped" in message:
print(f" ⏭️ {message}")
skipped += 1
for plugin in new_plugins:
file_path = plugin["file_path"]
file_name = os.path.basename(file_path)
title = plugin["title"]
is_new = plugin.get("is_new", True)
if is_new:
print(f"🆕 Creating: {file_name} ({title})")
else:
print(f"{message}")
updated += 1
else:
print(f" {message}")
failed += 1
print(f"📦 Updating: {file_name} (ID: {plugin['post_id'][:8]}...)")
if args.dry_run:
print(f" [DRY-RUN] Would {'create' if is_new else 'update'}")
continue
success, message = client.publish_plugin_from_file(
file_path, force=args.force, auto_create=True
)
if success:
if "Created" in message:
print(f" 🎉 {message}")
created += 1
elif "Skipped" in message:
print(f" ⏭️ {message}")
skipped += 1
else:
print(f"{message}")
updated += 1
else:
print(f"{message}")
failed += 1
# 处理已有插件更新
else:
existing_plugins = find_existing_plugins(plugins_dir)
print(f"Found {len(existing_plugins)} existing plugins with OpenWebUI ID.\n")
if not existing_plugins:
print("No existing plugins to update.")
print(
"\n💡 Tip: Use --new <dir> to publish new plugins from a specific directory"
)
return
for plugin in existing_plugins:
file_path = plugin["file_path"]
file_name = os.path.basename(file_path)
post_id = plugin["post_id"]
print(f"📦 {file_name} (ID: {post_id[:8]}...)")
if args.dry_run:
print(f" [DRY-RUN] Would update")
continue
success, message = client.publish_plugin_from_file(
file_path, force=args.force, auto_create=False # 不自动创建,只更新
)
if success:
if "Skipped" in message:
print(f" ⏭️ {message}")
skipped += 1
else:
print(f"{message}")
updated += 1
else:
print(f"{message}")
failed += 1
print(f"\n{'='*50}")
print(f"Finished: {updated} updated, {skipped} skipped, {failed} failed")
print(
f"Finished: {created} created, {updated} updated, {skipped} skipped, {failed} failed"
)
if __name__ == "__main__":