-
-
- 用户: {user_name}
- 时间: {current_date_time_str}
-
-
-
-
-
-
-
-
-
-
-
-
-
-"""
-
-# =================================================================
-# JavaScript 渲染脚本
-# =================================================================
-
-SCRIPT_TEMPLATE_INFOGRAPHIC = """
-
-
-"""
-
-
-class Action:
- class Valves(BaseModel):
- SHOW_STATUS: bool = Field(
- default=True, description="是否在聊天界面显示操作状态更新。"
- )
- MODEL_ID: str = Field(
- default="",
- description="用于文本分析的内置LLM模型ID。如果为空,则使用当前对话的模型。",
- )
- MIN_TEXT_LENGTH: int = Field(
- default=100,
- description="进行信息图分析所需的最小文本长度(字符数)。",
- )
- CLEAR_PREVIOUS_HTML: bool = Field(
- default=False,
- description="是否强制清除旧的插件结果(如果为 True,则不合并,直接覆盖)。",
- )
- MESSAGE_COUNT: int = Field(
- default=1,
- description="用于生成的最近消息数量。设置为1仅使用最后一条消息,更大值可包含更多上下文。",
- )
- OUTPUT_MODE: str = Field(
- default="image",
- description="输出模式:'html' 为交互式HTML,'image' 将嵌入为Markdown图片(默认)。",
- )
- SHOW_DEBUG_LOG: bool = Field(
- default=False,
- description="是否在浏览器控制台打印调试日志。",
- )
-
- def __init__(self):
- self.valves = self.Valves()
- self.weekday_map = {
- "Monday": "星期一",
- "Tuesday": "星期二",
- "Wednesday": "星期三",
- "Thursday": "星期四",
- "Friday": "星期五",
- "Saturday": "星期六",
- "Sunday": "星期日",
- }
-
- async def _get_user_context(
- self,
- __user__: Optional[Dict[str, Any]],
- __event_call__: Optional[Callable[[Any], Awaitable[None]]] = None,
- ) -> Dict[str, str]:
- """安全提取用户上下文信息。"""
- if isinstance(__user__, (list, tuple)):
- user_data = __user__[0] if __user__ else {}
- elif isinstance(__user__, dict):
- user_data = __user__
- else:
- user_data = {}
-
- user_id = user_data.get("id", "unknown_user")
- user_name = user_data.get("name", "用户")
- user_language = user_data.get("language", "zh-CN")
-
- if __event_call__:
- try:
- js_code = """
- return (
- localStorage.getItem('locale') ||
- localStorage.getItem('language') ||
- navigator.language ||
- 'zh-CN'
- );
- """
- frontend_lang = await __event_call__(
- {"type": "execute", "data": {"code": js_code}}
- )
- if frontend_lang and isinstance(frontend_lang, str):
- user_language = frontend_lang
- except Exception as e:
- pass
-
- return {
- "user_id": user_id,
- "user_name": user_name,
- "user_language": user_language,
- }
-
- def _get_chat_context(
- self, body: dict, __metadata__: Optional[dict] = None
- ) -> Dict[str, str]:
- """
- 统一提取聊天上下文信息 (chat_id, message_id)。
- 优先从 body 中提取,其次从 metadata 中提取。
- """
- chat_id = ""
- message_id = ""
-
- # 1. 尝试从 body 获取
- if isinstance(body, dict):
- chat_id = body.get("chat_id", "")
- message_id = body.get("id", "") # message_id 在 body 中通常是 id
-
- # 再次检查 body.metadata
- if not chat_id or not message_id:
- body_metadata = body.get("metadata", {})
- if isinstance(body_metadata, dict):
- if not chat_id:
- chat_id = body_metadata.get("chat_id", "")
- if not message_id:
- message_id = body_metadata.get("message_id", "")
-
- # 2. 尝试从 __metadata__ 获取 (作为补充)
- if __metadata__ and isinstance(__metadata__, dict):
- if not chat_id:
- chat_id = __metadata__.get("chat_id", "")
- if not message_id:
- message_id = __metadata__.get("message_id", "")
-
- return {
- "chat_id": str(chat_id).strip(),
- "message_id": str(message_id).strip(),
- }
-
- def _extract_infographic_syntax(self, llm_output: str) -> str:
- """提取LLM输出中的infographic语法"""
- # 1. 优先匹配 ```infographic
- match = re.search(r"```infographic\s*(.*?)\s*```", llm_output, re.DOTALL)
- if match:
- return match.group(1).strip().replace("", "<\\/script>")
-
- # 2. 其次匹配 ```mermaid (有时 LLM 会混淆)
- match = re.search(r"```mermaid\s*(.*?)\s*```", llm_output, re.DOTALL)
- if match:
- content = match.group(1).strip()
- # 简单检查是否包含 infographic 关键字
- if "infographic" in content or "data" in content:
- return content.replace("", "<\\/script>")
-
- # 3. 再次匹配通用 ``` (无语言标记)
- match = re.search(r"```\s*(.*?)\s*```", llm_output, re.DOTALL)
- if match:
- content = match.group(1).strip()
- # 简单的启发式检查
- if "infographic" in content or "data" in content:
- return content.replace("", "<\\/script>")
-
- # 4. 兜底:如果看起来像直接输出了语法(以 infographic 或 list-grid 等开头)
- cleaned_output = llm_output.strip()
- first_line = cleaned_output.split("\n")[0].lower()
- if (
- first_line.startswith("infographic")
- or first_line.startswith("list-")
- or first_line.startswith("tree-")
- or first_line.startswith("mindmap")
- ):
- return cleaned_output.replace("", "<\\/script>")
-
- logger.warning("LLM输出未严格遵循预期格式,将整个输出作为语法处理。")
- return cleaned_output.replace("", "<\\/script>")
-
- async def _emit_status(self, emitter, description: str, done: bool = False):
- """发送状态更新事件"""
- if self.valves.SHOW_STATUS and emitter:
- await emitter(
- {"type": "status", "data": {"description": description, "done": done}}
- )
-
- async def _emit_notification(self, emitter, content: str, ntype: str = "info"):
- """发送通知事件 (info/success/warning/error)"""
- if emitter:
- await emitter(
- {"type": "notification", "data": {"type": ntype, "content": content}}
- )
-
- async def _emit_debug_log(self, emitter, title: str, data: dict):
- """在浏览器控制台打印结构化调试日志"""
- if not self.valves.SHOW_DEBUG_LOG or not emitter:
- return
-
- try:
- js_code = f"""
- (async function() {{
- console.group("🛠️ {title}");
- console.log({json.dumps(data, ensure_ascii=False)});
- console.groupEnd();
- }})();
- """
-
- await emitter({"type": "execute", "data": {"code": js_code}})
- except Exception as e:
- print(f"Error emitting debug log: {e}")
-
- def _remove_existing_html(self, content: str) -> str:
- """移除内容中已有的插件生成 HTML 代码块"""
- pattern = r"```html\s*[\s\S]*?```"
- return re.sub(pattern, "", content).strip()
-
- def _extract_text_content(self, content) -> str:
- """从消息内容中提取文本,支持多模态消息格式"""
- if isinstance(content, str):
- return content
- elif isinstance(content, list):
- # 多模态消息: [{"type": "text", "text": "..."}, {"type": "image_url", ...}]
- text_parts = []
- for item in content:
- if isinstance(item, dict) and item.get("type") == "text":
- text_parts.append(item.get("text", ""))
- elif isinstance(item, str):
- text_parts.append(item)
- return "\n".join(text_parts)
- return str(content) if content else ""
-
- def _merge_html(
- self,
- existing_html_code: str,
- new_content: str,
- new_styles: str = "",
- new_scripts: str = "",
- user_language: str = "zh-CN",
- ) -> str:
- """将新内容合并到现有的 HTML 容器中,或者创建一个新的容器"""
- if (
- "" in existing_html_code
- and "" in existing_html_code
- ):
- base_html = existing_html_code
- base_html = re.sub(r"^```html\s*", "", base_html)
- base_html = re.sub(r"\s*```$", "", base_html)
- else:
- base_html = HTML_WRAPPER_TEMPLATE.replace("{user_language}", user_language)
-
- wrapped_content = f'