Files
Fu-Jie_openwebui-extensions/plugins/actions/ACTION_PLUGIN_TEMPLATE_CN.py
2026-01-04 03:17:35 +08:00

419 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
title: [插件名称] (例如: 智能思维导图)
author: [作者姓名]
author_url: [作者主页链接]
funding_url: [赞助链接]
version: 0.1.0
icon_url: [图标 URL 或 Data URI]
description: [简短描述插件的功能]
requirements: [依赖列表, 例如: jinja2, markdown]
"""
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List, Callable, Awaitable
import logging
import re
import json
from fastapi import Request
from datetime import datetime
import pytz
# 导入 OpenWebUI 工具函数
from open_webui.utils.chat import generate_chat_completion
from open_webui.models.users import Users
# 设置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# =================================================================
# 常量与提示词 (Constants & Prompts)
# =================================================================
SYSTEM_PROMPT = """
[在此处插入系统提示词]
你是一个有用的助手...
请以 [JSON/Markdown] 格式输出...
"""
USER_PROMPT_TEMPLATE = """
[在此处插入用户提示词模板]
用户上下文:
姓名: {user_name}
时间: {current_date_time_str}
待处理内容:
{content}
"""
# HTML 容器模板 (支持多插件共存与网格布局)
HTML_WRAPPER_TEMPLATE = """
<!-- OPENWEBUI_PLUGIN_OUTPUT -->
<!DOCTYPE html>
<html lang="{user_language}">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
margin: 0;
padding: 10px;
background-color: transparent;
}
#main-container {
display: flex;
flex-wrap: wrap;
gap: 20px;
align-items: flex-start;
width: 100%;
}
.plugin-item {
flex: 1 1 400px; /* 默认宽度,允许伸缩 */
min-width: 300px;
background: white;
border-radius: 12px;
box-shadow: 0 4px 6px rgba(0,0,0,0.05);
overflow: hidden;
border: 1px solid #e5e7eb;
transition: all 0.3s ease;
}
.plugin-item:hover {
box-shadow: 0 10px 15px rgba(0,0,0,0.1);
}
@media (max-width: 768px) {
.plugin-item { flex: 1 1 100%; }
}
/* STYLES_INSERTION_POINT */
</style>
</head>
<body>
<div id="main-container">
<!-- CONTENT_INSERTION_POINT -->
</div>
<!-- SCRIPTS_INSERTION_POINT -->
</body>
</html>
"""
class Action:
class Valves(BaseModel):
SHOW_STATUS: bool = Field(
default=True,
description="是否在聊天界面显示操作状态更新。",
)
MODEL_ID: str = Field(
default="",
description="用于处理的内置 LLM 模型 ID。如果为空则使用当前对话的模型。",
)
MIN_TEXT_LENGTH: int = Field(
default=50,
description="处理所需的最小文本长度(字符数)。",
)
CLEAR_PREVIOUS_HTML: bool = Field(
default=False,
description="是否强制清除旧的插件结果(如果为 True则不合并直接覆盖",
)
MESSAGE_COUNT: int = Field(
default=1,
description="用于生成的最近消息数量。设置为1仅使用最后一条消息更大值可包含更多上下文。",
)
# 根据需要添加其他配置字段
# MAX_TEXT_LENGTH: int = Field(default=2000, description="...")
def __init__(self):
self.valves = self.Valves()
def _get_user_context(self, __user__: Optional[Dict[str, Any]]) -> Dict[str, str]:
"""提取用户上下文信息。"""
if isinstance(__user__, (list, tuple)):
user_data = __user__[0] if __user__ else {}
elif isinstance(__user__, dict):
user_data = __user__
else:
user_data = {}
return {
"user_id": user_data.get("id", "unknown_user"),
"user_name": user_data.get("name", "用户"),
"user_language": user_data.get("language", "zh-CN"),
}
def _get_current_time_context(self) -> Dict[str, str]:
"""获取当前时间上下文。"""
try:
# 默认为特定时区或系统时间
tz = pytz.timezone("Asia/Shanghai") # 根据需要修改
now = datetime.now(tz)
except Exception:
now = datetime.now()
weekday_map = {
"Monday": "星期一",
"Tuesday": "星期二",
"Wednesday": "星期三",
"Thursday": "星期四",
"Friday": "星期五",
"Saturday": "星期六",
"Sunday": "星期日",
}
weekday_en = now.strftime("%A")
return {
"current_date_time_str": now.strftime("%Y年%m月%d%H:%M:%S"),
"current_weekday": weekday_map.get(weekday_en, weekday_en),
"current_year": now.strftime("%Y"),
"current_timezone_str": str(now.tzinfo) if now.tzinfo else "未知时区",
}
def _process_llm_output(self, llm_output: str) -> Any:
"""
处理 LLM 的原始输出。
重写此方法以解析 JSON、提取 Markdown 等。
"""
# 示例: 提取 JSON
# try:
# start = llm_output.find('{')
# end = llm_output.rfind('}') + 1
# if start != -1 and end != -1:
# return json.loads(llm_output[start:end])
# except Exception:
# pass
return llm_output.strip()
def _remove_existing_html(self, content: str) -> str:
"""移除内容中已有的插件生成 HTML 代码块 (通过标记识别)。"""
# 匹配 ```html <!-- OPENWEBUI_PLUGIN_OUTPUT --> ... ``` 模式
# 使用 [\s\S]*? 非贪婪匹配任意字符
pattern = r"```html\s*<!-- OPENWEBUI_PLUGIN_OUTPUT -->[\s\S]*?```"
return re.sub(pattern, "", content).strip()
def _extract_text_content(self, content) -> str:
"""从消息内容中提取文本,支持多模态消息格式。"""
if isinstance(content, str):
return content
elif isinstance(content, list):
# 多模态消息: [{"type": "text", "text": "..."}, {"type": "image_url", ...}]
text_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif isinstance(item, str):
text_parts.append(item)
return "\n".join(text_parts)
return str(content) if content else ""
def _merge_html(
self,
existing_html_code: str,
new_content: str,
new_styles: str = "",
new_scripts: str = "",
user_language: str = "zh-CN",
) -> str:
"""
将新内容合并到现有的 HTML 容器中,或者创建一个新的容器。
"""
# 检查是否存在兼容的容器标记
if (
"<!-- OPENWEBUI_PLUGIN_OUTPUT -->" in existing_html_code
and "<!-- CONTENT_INSERTION_POINT -->" in existing_html_code
):
base_html = existing_html_code
# 移除代码块标记 ```html ... ``` 以便处理
base_html = re.sub(r"^```html\s*", "", base_html)
base_html = re.sub(r"\s*```$", "", base_html)
else:
# 初始化新容器
base_html = HTML_WRAPPER_TEMPLATE.replace("{user_language}", user_language)
# 包装新内容
wrapped_content = f'<div class="plugin-item">\n{new_content}\n</div>'
# 注入样式
if new_styles:
base_html = base_html.replace(
"/* STYLES_INSERTION_POINT */",
f"{new_styles}\n/* STYLES_INSERTION_POINT */",
)
# 注入内容
base_html = base_html.replace(
"<!-- CONTENT_INSERTION_POINT -->",
f"{wrapped_content}\n<!-- CONTENT_INSERTION_POINT -->",
)
# 注入脚本
if new_scripts:
base_html = base_html.replace(
"<!-- SCRIPTS_INSERTION_POINT -->",
f"{new_scripts}\n<!-- SCRIPTS_INSERTION_POINT -->",
)
return base_html.strip()
async def _emit_status(
self,
emitter: Optional[Callable[[Any], Awaitable[None]]],
description: str,
done: bool = False,
):
"""发送状态更新事件。"""
if self.valves.SHOW_STATUS and emitter:
await emitter(
{"type": "status", "data": {"description": description, "done": done}}
)
async def _emit_notification(
self,
emitter: Optional[Callable[[Any], Awaitable[None]]],
content: str,
type: str = "info",
):
"""发送通知事件 (info, success, warning, error)。"""
if emitter:
await emitter(
{"type": "notification", "data": {"type": type, "content": content}}
)
async def _emit_message(
self, emitter: Optional[Callable[[Any], Awaitable[None]]], content: str
):
"""发送消息追加事件 (追加到当前消息)。"""
if emitter:
await emitter({"type": "message", "data": {"content": content}})
async def _emit_replace(
self, emitter: Optional[Callable[[Any], Awaitable[None]]], content: str
):
"""发送消息替换事件 (替换当前消息)。"""
if emitter:
await emitter({"type": "replace", "data": {"content": content}})
async def action(
self,
body: dict,
__user__: Optional[Dict[str, Any]] = None,
__event_emitter__: Optional[Callable[[Any], Awaitable[None]]] = None,
__event_call__: Optional[Callable[[Any], Awaitable[Any]]] = None,
__request__: Optional[Request] = None,
) -> Optional[dict]:
logger.info(f"Action: {__name__} started")
# 1. 上下文设置
user_context = self._get_user_context(__user__)
time_context = self._get_current_time_context()
# 2. 输入验证
messages = body.get("messages", [])
if not messages:
return body # 或者处理错误
# 根据 MESSAGE_COUNT 获取最近 N 条消息
message_count = min(self.valves.MESSAGE_COUNT, len(messages))
recent_messages = messages[-message_count:]
# 聚合选中消息的内容,带标签
aggregated_parts = []
for i, msg in enumerate(recent_messages, 1):
text_content = self._extract_text_content(msg.get("content"))
if text_content:
role = msg.get("role", "unknown")
role_label = (
"用户"
if role == "user"
else "助手" if role == "assistant" else role
)
aggregated_parts.append(f"{text_content}")
if not aggregated_parts:
return body # 或者处理错误
original_content = "\n\n---\n\n".join(aggregated_parts)
if len(original_content) < self.valves.MIN_TEXT_LENGTH:
warning_msg = f"文本过短 ({len(original_content)} 字符)。最少需要: {self.valves.MIN_TEXT_LENGTH}"
await self._emit_notification(__event_emitter__, warning_msg, "warning")
return body # 或者返回失败消息
# 3. 状态通知 (开始)
await self._emit_status(__event_emitter__, "正在处理...", done=False)
try:
# 4. 准备提示词
formatted_prompt = USER_PROMPT_TEMPLATE.format(
user_name=user_context["user_name"],
current_date_time_str=time_context["current_date_time_str"],
content=original_content,
# 添加其他上下文变量
)
# 5. 确定模型
target_model = self.valves.MODEL_ID
if not target_model:
target_model = body.get("model")
# 注意: 这里没有硬编码的回退,依赖于系统/用户上下文
# 6. 调用 LLM
user_obj = Users.get_user_by_id(user_context["user_id"])
payload = {
"model": target_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": formatted_prompt},
],
"stream": False,
# "temperature": 0.5,
}
llm_response = await generate_chat_completion(
__request__, payload, user_obj
)
if not llm_response or "choices" not in llm_response:
raise ValueError("无效的 LLM 响应")
assistant_content = llm_response["choices"][0]["message"]["content"]
# 7. 处理输出
processed_data = self._process_llm_output(assistant_content)
# 8. 生成 HTML/结果
# 示例: 简单的字符串替换
final_html = HTML_TEMPLATE.replace("{result_content}", str(processed_data))
final_html = final_html.replace(
"{user_language}", user_context["user_language"]
)
# 9. 注入结果
if self.valves.CLEAR_PREVIOUS_HTML:
body["messages"][-1]["content"] = self._remove_existing_html(
body["messages"][-1]["content"]
)
html_embed_tag = f"```html\n{final_html}\n```"
body["messages"][-1]["content"] += f"\n\n{html_embed_tag}"
# 10. 状态通知 (成功)
await self._emit_status(__event_emitter__, "处理完成!", done=True)
await self._emit_notification(
__event_emitter__, "操作成功完成。", "success"
)
except Exception as e:
logger.error(f"Action failed: {e}", exc_info=True)
error_msg = f"错误: {str(e)}"
# 将错误附加到聊天中 (可选)
body["messages"][-1]["content"] += f"\n\n❌ **错误**: {error_msg}"
await self._emit_status(__event_emitter__, "处理失败。", done=True)
await self._emit_notification(
__event_emitter__, "操作失败,请检查日志。", "error"
)
return body