# Gemini Manifold 插件通用例子 ## 1. 配置层叠(Valves + UserValves) ```python from pydantic import BaseModel class Valves(BaseModel): GEMINI_API_KEY: str | None = None USE_VERTEX_AI: bool = False THINKING_BUDGET: int = 8192 class UserValves(BaseModel): GEMINI_API_KEY: str | None = None THINKING_BUDGET: int | None = None def merge_valves(default: Valves, user: UserValves | None) -> Valves: merged = default.model_dump() if user: for field in user.model_fields: value = getattr(user, field) if value not in (None, ""): merged[field] = value return Valves(**merged) admin_settings = Valves(GEMINI_API_KEY="admin-key", THINKING_BUDGET=8192) user_settings = UserValves(GEMINI_API_KEY="user-key", THINKING_BUDGET=4096) effective = merge_valves(admin_settings, user_settings) print(effective) ``` **场景说明:** 与 `gemini_manifold.py` 中 `Valves`/`UserValves` 合并逻辑一致,适用于需要在 admin 默认与用户覆盖之间做透明优先级控制的插件。 ## 2. 异步事件与进度反馈(EventEmitter + 上传队列) ```python import asyncio from typing import Callable, Awaitable class EventEmitter: """ 抽象事件发射器,将所有前端交互统一到异步通道中。 """ def __init__(self, emit: Callable[[dict], Awaitable[None]] | None = None, hide_successful_status: bool = False): self._emit = emit self.hide_successful_status = hide_successful_status async def emit_status(self, message: str, done: bool = False, hidden: bool = False) -> None: """ 发出状态消息。如果 done=True 且 hide_successful_status=True,则在前端隐藏。 """ if not self._emit: return if done and self.hide_successful_status: hidden = True event = { "type": "status", "data": { "description": message, "done": done, "hidden": hidden } } await self._emit(event) async def emit_toast(self, msg: str, toast_type: str = "info") -> None: """ 发出 toast 通知(弹窗)。 """ if not self._emit: return event = { "type": "notification", "data": { "type": toast_type, "content": msg } } await self._emit(event) async def emit_completion(self, content: str | None = None, done: bool = False, error: str | None = None, usage: dict | None = None) -> None: """ 发出完成事件,可含内容、错误、使用量等信息。 """ if not self._emit: return event = {"type": "chat:completion", "data": {"done": done}} if content is not None: event["data"]["content"] = content if error is not None: event["data"]["error"] = {"detail": error} if usage is not None: event["data"]["usage"] = usage await self._emit(event) class UploadStatusManager: """ 管理并发文件上传的状态,自动追踪注册与完成计数。 """ def __init__(self, emitter: EventEmitter, start_time: float): self.emitter = emitter self.start_time = start_time self.queue = asyncio.Queue() self.total_uploads_expected = 0 self.uploads_completed = 0 self.finalize_received = False self.is_active = False async def run(self) -> None: """ 后台任务,监听队列并发出状态更新。 """ import time while not (self.finalize_received and self.total_uploads_expected == self.uploads_completed): try: msg = await asyncio.wait_for(self.queue.get(), timeout=1.0) except asyncio.TimeoutError: continue msg_type = msg[0] if msg_type == "REGISTER_UPLOAD": self.is_active = True self.total_uploads_expected += 1 await self._emit_progress_update(time.monotonic()) elif msg_type == "COMPLETE_UPLOAD": self.uploads_completed += 1 await self._emit_progress_update(time.monotonic()) elif msg_type == "FINALIZE": self.finalize_received = True self.queue.task_done() async def _emit_progress_update(self, current_time: float) -> None: """发出进度更新到前端。""" if not self.is_active: return elapsed = current_time - self.start_time time_str = f"(+{elapsed:.2f}s)" is_done = (self.total_uploads_expected > 0 and self.uploads_completed == self.total_uploads_expected) if is_done: msg = f"上传完成。{self.uploads_completed} 个文件已处理。{time_str}" else: msg = f"上传中 {self.uploads_completed + 1}/{self.total_uploads_expected}… {time_str}" await self.emitter.emit_status(msg, done=is_done) async def multi_file_upload_workflow( file_list: list[tuple[str, bytes]], emitter: EventEmitter ) -> list[str]: """ 示范多文件并发上传的完整工作流。 """ import time start_time = time.monotonic() status_mgr = UploadStatusManager(emitter, start_time) # 启动后台状态管理器 manager_task = asyncio.create_task(status_mgr.run()) # 为每个文件创建上传任务 async def upload_file(name: str, data: bytes) -> str: await status_mgr.queue.put(("REGISTER_UPLOAD",)) try: await asyncio.sleep(0.5) # 模拟网络延迟 result = f"uploaded-{name}" await emitter.emit_toast(f"✓ 文件 {name} 上传成功", "success") return result except Exception as e: await emitter.emit_toast(f"✗ 文件 {name} 上传失败: {e}", "error") raise finally: await status_mgr.queue.put(("COMPLETE_UPLOAD",)) # 并发执行所有上传 tasks = [upload_file(name, data) for name, data in file_list] results = await asyncio.gather(*tasks, return_exceptions=True) # 通知状态管理器完成 await status_mgr.queue.put(("FINALIZE",)) await manager_task # 汇总结果 success = [r for r in results if not isinstance(r, Exception)] return success # 完整使用示例 async def demo(): async def fake_emit(payload): print(f"→ {payload['type']}: {payload['data']}") emitter = EventEmitter(fake_emit, hide_successful_status=False) files = [ ("doc1.pdf", b"content1"), ("image.jpg", b"content2"), ("data.csv", b"content3"), ] results = await multi_file_upload_workflow(files, emitter) print(f"\n✓ 上传成功: {len(results)} 个文件") asyncio.run(demo()) ``` **完整流程状态显示说明:** 整个异步工作流的状态显示遵循以下链路: ```text 初始化 ↓ 发出"准备请求"状态 → [emit_status] → 前端显示状态条 ↓ 启动后台 UploadStatusManager 任务 ↓ 并发执行多个上传任务 ├─→ 任务1: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 1/3…") ├─→ 任务2: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 2/3…") └─→ 任务3: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 3/3…") ↓ 每个任务完成时 ├─→ emit_toast("✓ 文件上传成功", "success") → 前端弹窗确认 └─→ COMPLETE_UPLOAD → [更新计数] → emit_status("上传中 1/3…") 或 "上传完成" ↓ 所有任务完成 → FINALIZE → 关闭后台管理器 ↓ 发出最终状态 → emit_status("全部完成", done=True) → 前端状态条完成 ``` **关键数据流动:** 1. **EventEmitter** 负责将事件发送到前端 - `emit_status()`: 状态条消息 - `emit_toast()`: 弹窗通知 - `emit_completion()`: 完成事件(含 usage 等) 2. **UploadStatusManager** 后台任务持续监听队列 - 接收 `("REGISTER_UPLOAD",)` → 计数加 1 → 计算进度 → 更新状态显示 - 接收 `("COMPLETE_UPLOAD",)` → 计数加 1 → 重新计算进度 → 更新状态显示 - 接收 `("FINALIZE",)` → 退出循环 → 任务完成 3. **实时计数逻辑** ```python 已完成数 / 总数 = 进度百分比 显示: "上传中 {已完成+1}/{总数}… (+X.XXs)" 当完成数 == 总数: 显示 "上传完成。3 个文件已处理。(+2.50s)" ``` **场景说明:** 完整模拟 `gemini_manifold.py` 中 `EventEmitter` + `UploadStatusManager` 的实战设计。支持多并发任务状态跟踪、自动计数、toast 通知与后台进度汇报。适用于: - 多文件并发上传且需要实时进度反馈的场景 - API 轮询或长流程中持续向前端汇报进展 - 需要自隐藏成功状态但保留错误警告的交互流程 - 复杂的异步任务编排与协调 - 需要细粒度时间戳与计数统计的长流程 **场景说明:** 完整模拟 `gemini_manifold.py` 中 `EventEmitter` + `UploadStatusManager` 的实战设计。支持多并发任务状态跟踪、自动计数、toast 通知与后台进度汇报。适用于: - 多文件并发上传且需要实时进度反馈的场景 - API 轮询或长流程中持续向前端汇报进展 - 需要自隐藏成功状态但保留错误警告的交互流程 - 复杂的异步任务编排与协调 - 需要细粒度时间戳与计数统计的长流程 ## 3. 文件缓存 + 幂等上传(xxHash + deterministic 名称) ```python import xxhash def content_hash(data: bytes) -> str: return xxhash.xxh64(data).hexdigest() cache: dict[str, str] = {} def deterministic_name(hash_val: str) -> str: return f"files/owui-v1-{hash_val}" async def maybe_upload(data: bytes): h = content_hash(data) if h in cache: print("cache hit", cache[h]) return cache[h] name = deterministic_name(h) cache[h] = name print("uploading", name) return name ``` **场景说明:** 简化版 `FilesAPIManager` 热/暖/冷路径,适合需要避免重复上传、并希望后端能通过 deterministic 名称恢复文件状态的场景。 ## 4. 统一响应处理(流式 + 非流式适配) ```python from typing import AsyncGenerator class UnifiedResponseProcessor: async def process_stream( self, response_stream: AsyncGenerator, is_stream: bool = True ) -> AsyncGenerator: """ 处理流式或一次性响应,统一返回 AsyncGenerator。 """ try: async for chunk in response_stream: # 处理单个 chunk processed = await self._process_chunk(chunk) if processed: yield {"choices": [{"delta": processed}]} except Exception as e: yield {"choices": [{"delta": {"content": f"Error: {e}"}}]} finally: yield "data: [DONE]" async def _process_chunk(self, chunk): # 简化处理逻辑 return {"content": str(chunk)} # 使用示例 async def main(): processor = UnifiedResponseProcessor() async def fake_stream(): for i in range(3): yield f"chunk-{i}" async for item in processor.process_stream(fake_stream()): print(item) ``` **场景说明:** 对应 `gemini_manifold.py` 中 `_unified_response_processor` 的核心思想——无论前端是否启用流式,插件内部都用统一的 AsyncGenerator 处理,避免代码分支。适用于需要兼容流式与非流式响应的任何插件。 ## 5. 特殊标签禁用(防止前端解析干扰) ```python import re ZWS = "\u200b" # 零宽空格 SPECIAL_TAGS = ["think", "details", "thinking", "reason"] def disable_special_tags(text: str) -> tuple[str, int]: """ 在特殊标签前插入零宽空格,防止前端 HTML 解析器处理它们。 """ if not text: return "", 0 TAG_REGEX = re.compile( r"<(/?(" + "|".join(re.escape(tag) for tag in SPECIAL_TAGS) + r"))" ) modified, count = TAG_REGEX.subn(rf"<{ZWS}\1", text) return modified, count def enable_special_tags(text: str) -> str: """ 移除零宽空格,恢复原始标签,用于模型理解上下文。 """ if not text: return "" REVERSE_REGEX = re.compile( r"<" + ZWS + r"(/?(" + "|".join(re.escape(tag) for tag in SPECIAL_TAGS) + r"))" ) return REVERSE_REGEX.sub(r"<\1", text) # 使用示例 original = "这是思考内容" disabled, count = disable_special_tags(original) print(f"禁用前: {original}") print(f"禁用后: {disabled}") print(f"修改数: {count}") ``` **场景说明:** 当模型可能生成会被前端 HTML 解析器误触发的标签(如 `` 推理块)时,通过注入零宽空格破坏标签结构,再在需要时恢复。这是 `gemini_manifold.py` 中保护前端的一种防御手段,对任何可能含有模型生成 HTML 的插件都有借鉴价值。 ## 6. 统一异常处理与用户反馈 ```python class PluginException(Exception): """插件统一异常基类。""" pass class APIError(PluginException): """API 调用异常。""" pass class FileUploadError(PluginException): """文件上传异常。""" pass class EventEmitterForErrors: def __init__(self): self.event_queue = [] async def emit_error(self, error_msg: str, is_toast: bool = True): """ 发出错误事件,同时记录日志。 """ event = {"type": "error", "data": {"detail": error_msg}} if is_toast: event["data"]["toast_type"] = "error" self.event_queue.append(event) print(f"[ERROR] {error_msg}") async def call_api_with_fallback(api_key: str, emitter: EventEmitterForErrors): """ 调用 API 并完整处理异常。 """ try: # 模拟 API 调用 if not api_key: raise ValueError("API key 未提供") # 成功处理 return {"status": "ok"} except ValueError as e: await emitter.emit_error(f"参数错误: {e}") raise APIError(f"API 调用失败: {e}") from e except Exception as e: await emitter.emit_error(f"意外错误: {e}", is_toast=True) raise PluginException(f"插件异常: {e}") from e # 使用示例 import asyncio emitter = EventEmitterForErrors() try: result = asyncio.run(call_api_with_fallback("", emitter)) except PluginException as e: print(f"捕获到插件异常: {e}") ``` **场景说明:** 对应 `gemini_manifold.py` 中 `GenaiApiError`、`FilesAPIError` 等定制异常。通过分层异常类和统一的 emit_error 机制,确保所有错误都能被前端看到,同时便于调试和日志分析。 ## 7. 后处理与数据回写(Usage + Grounding) ```python from datetime import datetime class PostProcessor: def __init__(self, request_state): self.state = request_state async def emit_usage(self, prompt_tokens: int, completion_tokens: int): """ 发出 Token 使用情况。 """ total = prompt_tokens + completion_tokens elapsed = datetime.now().timestamp() usage_data = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total, "completion_time": elapsed, } print(f"Usage: {usage_data}") return usage_data async def emit_grounding(self, chat_id: str, message_id: str, grounding_metadata): """ 将 grounding 数据存入应用状态,供 Filter 或后续步骤使用。 """ key = f"grounding_{chat_id}_{message_id}" self.state[key] = grounding_metadata print(f"存储 grounding 数据到 {key}") async def emit_status(self, message: str, done: bool = False): """ 发出最终状态。 """ status_event = { "type": "status", "data": {"description": message, "done": done} } print(f"Status: {status_event}") # 使用示例 async def main(): state = {} # 模拟 request.app.state processor = PostProcessor(state) await processor.emit_usage(prompt_tokens=50, completion_tokens=100) await processor.emit_grounding( chat_id="chat_123", message_id="msg_456", grounding_metadata={"sources": ["source1", "source2"]} ) await processor.emit_status("Response finished", done=True) print("\n最终状态:", state) asyncio.run(main()) ``` **场景说明:** 模拟 `gemini_manifold.py` 中 `_do_post_processing` 的职责——在主响应流完成后,将 usage、grounding、状态等元数据通过独立通道发出。这种分离确保前端能获得完整信息,同时不阻塞流式响应。 ## 8. 日志与数据截断(Loguru + 自动截断) ```python import json from functools import wraps class PluginLogger: def __init__(self, max_payload_length: int = 256): self.max_length = max_payload_length def truncate_payload(self, data: any) -> str: """ 将复杂数据序列化并截断。 """ try: serialized = json.dumps(data, default=str) if len(serialized) > self.max_length: return serialized[:self.max_length] + "[...]" return serialized except Exception as e: return f"" def log_with_payload(self, level: str, message: str, payload: any = None): """ 记录带有 payload 的日志,自动截断。 """ log_line = f"[{level}] {message}" if payload is not None: truncated = self.truncate_payload(payload) log_line += f" - {truncated}" print(log_line) # 使用示例 logger = PluginLogger(max_payload_length=100) logger.log_with_payload("DEBUG", "API Response", payload={"data": "x" * 200, "status": "ok"}) logger.log_with_payload("INFO", "File uploaded", payload={"file_id": "abc123", "size": 1024}) ``` **场景说明:** 对应 `gemini_manifold.py` 中自定义 loguru handler 与 `_truncate_long_strings` 的逻辑。当插件需要调试复杂 API 响应或大型 payload 时,通过自动截断避免日志爆炸,同时保留关键信息。 ## 9. 联网搜索功能与源引用显示 ```python from typing import TypedDict class SearchSource(TypedDict): title: str url: str snippet: str class GroundingMetadata(TypedDict): search_queries: list[str] # 使用的搜索查询 sources: list[SearchSource] # 检索到的源 class SearchableResponseBuilder: """ 当启用联网搜索功能时,构建含有搜索信息的响应。 对应 gemini_manifold.py 中依据 features["google_search_tool"] 的逻辑。 """ def __init__(self, enable_search: bool = False, emitter = None): self.enable_search = enable_search self.emitter = emitter self.grounding_metadata: GroundingMetadata | None = None async def build_response_with_search(self, query: str, use_search: bool = True) -> tuple[str, GroundingMetadata | None]: """ 构建响应,如果启用搜索则收集源信息。 """ if not (self.enable_search and use_search): # 未启用搜索,直接返回响应 return "这是直接回答,无搜索", None # 模拟搜索过程 search_results = await self._perform_search(query) # 构建 grounding 元数据 self.grounding_metadata = { "search_queries": [query], "sources": search_results } # 构建含源引用的响应 response_with_sources = await self._format_response_with_citations( query, search_results ) return response_with_sources, self.grounding_metadata async def _perform_search(self, query: str) -> list[SearchSource]: """ 模拟调用 Google Search API(实际中由 gemini_manifold.py 的 tool 层处理)。 """ # 模拟搜索结果 results = [ { "title": "Open WebUI 官方文档", "url": "https://docs.openwebui.com", "snippet": "Open WebUI 是一个开源的大语言模型管理平台..." }, { "title": "Open WebUI GitHub 仓库", "url": "https://github.com/open-webui/open-webui", "snippet": "开源代码库,包含所有源码和插件..." }, { "title": "Open WebUI 社区论坛", "url": "https://community.openwebui.com", "snippet": "用户交流和问题解答社区..." } ] if self.emitter: await self.emitter.emit_toast( f"✓ 已搜索: '{query}' 找到 {len(results)} 个结果", "success" ) return results async def _format_response_with_citations(self, query: str, sources: list[SearchSource]) -> str: """ 将搜索结果格式化为含有源引用的响应。 """ response = f"关于 '{query}' 的搜索结果:\n\n" for idx, source in enumerate(sources, 1): response += f"[{idx}] **{source['title']}**\n" response += f" URL: {source['url']}\n" response += f" 摘要: {source['snippet']}\n\n" response += "---\n\n根据上述源的信息,可以得出以下结论:\n" response += "Open WebUI 是一个功能丰富的平台,提供了完整的文档、源码和社区支持。" return response def extract_sources_for_frontend(self) -> list[dict]: """ 提取 grounding 元数据中的源,用于前端显示为 'sources' 字段。 对应 gemini_manifold.py 中 emit_completion(sources=...) 的数据。 """ if not self.grounding_metadata: return [] sources_for_ui = [] for source in self.grounding_metadata["sources"]: sources_for_ui.append({ "title": source["title"], "url": source["url"], "description": source["snippet"] }) return sources_for_ui async def demo_search_workflow(): """ 演示启用联网搜索时的完整工作流。 """ class FakeEmitter: async def emit_toast(self, msg, ttype): print(f"[{ttype.upper()}] {msg}") async def emit_status(self, msg, done=False): print(f"[STATUS] {msg} (done={done})") emitter = FakeEmitter() # 创建搜索构建器,启用联网搜索 builder = SearchableResponseBuilder(enable_search=True, emitter=emitter) # 步骤 1: 用户提问 user_query = "Open WebUI 的插件开发最佳实践" await emitter.emit_status(f"处理查询: {user_query}") # 步骤 2: 构建响应并收集源 response_text, grounding = await builder.build_response_with_search(user_query) # 步骤 3: 提取源引用供前端使用 sources_for_ui = builder.extract_sources_for_frontend() # 步骤 4: 构建完整的 completion 事件 completion_event = { "type": "chat:completion", "data": { "content": response_text, "sources": sources_for_ui, # 前端将在消息下方显示这些源 "done": True } } print("\n=== 最终响应 ===") print(f"内容:\n{response_text}") print(f"\n源信息 (供前端显示):") for source in sources_for_ui: print(f" - {source['title']}: {source['url']}") print(f"\n完整事件数据:") import json print(json.dumps(completion_event, ensure_ascii=False, indent=2)) asyncio.run(demo_search_workflow()) ``` **实现细节说明:** 联网搜索功能的完整链路(对应 `gemini_manifold.py`): ```text 1. 前端请求时,features 包含 "google_search_tool": true ↓ 2. Pipe.pipe() 检测到 features["google_search_tool"] ↓ 3. 在 _build_gen_content_config() 中: gen_content_conf.tools.append( types.Tool(google_search=types.GoogleSearch()) ) ↓ 4. 将 config 传给 Google Gemini API ↓ 5. API 自动执行搜索并返回搜索结果 ↓ 6. 获取 response.candidates[0].grounding_metadata ├─ 包含搜索查询 ├─ 包含检索到的源(标题、URL、摘要) └─ 包含段落级的源匹配信息 ↓ 7. 在 _do_post_processing() 中: 将 grounding_metadata 存入 request.app.state 供后续 Filter 使用 ↓ 8. 在响应流中通过 emit_completion(sources=...) 将源引用发送到前端 ↓ 9. 前端在消息下方显示: [1] 源标题 (链接) [2] 源标题 (链接) ... ``` **关键实现要点:** | 步骤 | 职责 | 代码位置 | |------|------|---------| | **检测开关** | 检查 `features["google_search_tool"]` | `_build_gen_content_config()` | | **配置工具** | 将 `google_search` 添加到 tools 列表 | `gen_content_conf.tools.append()` | | **执行搜索** | Google API 自动执行,返回 grounding_metadata | API 响应处理 | | **提取源** | 从 grounding_metadata 提取源信息 | `_do_post_processing()` | | **存储状态** | 将 grounding 存入 `request.app.state` | `_add_grounding_data_to_state()` | | **发送前端** | 通过 `emit_completion(sources=...)` 发送 | `_unified_response_processor()` | | **显示引用** | 前端渲染源链接和摘要 | 前端 UI 逻辑 | **场景说明:** 展示如何在启用联网搜索时收集、处理和展示搜索源。适用于: - 需要集成搜索功能的插件 - 需要展示信息来源的智能应用 - 需要追踪 API 调用的溯源场景 - 需要构建可引用的 LLM 应用 ## 总结与最佳实践 | 哲学 | 核心机制 | 使用场景 | |------|---------|----------| | **配置层叠** | Valves + UserValves 合并 | Admin 全局设置 + 用户按需覆盖 | | **异步反馈** | EventEmitter + Queue | 长流程中持续向前端汇报状态 | | **资源复用** | xxHash + 缓存 + Stateless GET | 避免重复上传,快速恢复 | | **统一处理** | AsyncGenerator + 适配器 | 流式和非流式响应一致处理 | | **安全防护** | 特殊标签注入 ZWS | 防止模型生成的 HTML 破坏前端 | | **异常管理** | 分层异常 + emit_error | 所有错误对前端可见 | | **后处理** | Usage/Grounding 在 response 后 | 非阻塞式补充元数据 | | **日志控制** | 自动截断 + 多级别 | 避免日志爆炸,便于调试 | | **搜索集成** | grounding_metadata 提取 + 源展示 | 联网搜索时收集并显示信息来源 | ## 补充:响应格式与引用解析 ### 一、源(Source)的数据结构 当启用联网搜索时,Google Gemini API 返回的 `grounding_metadata` 包含搜索源信息,对应以下结构: ```python # Google API 返回的 grounding_metadata 格式 { "search_queries": ["用户的搜索查询"], "web_search_results": [ { "uri": "https://example.com/page1", "title": "网页标题", "snippet": "网页摘要文本...", "display_uri": "example.com", }, # ... 更多搜索结果 ], "grounding_supports": [ { "segment": { "start_index": 0, "end_index": 145, "text": "模型回答中被引用的这段文本" }, "supporting_segments": [ { "segment": { "text": "网页中的相关内容" }, "uri": "https://example.com/page1" } ], "confidence_scores": [0.95] } ] } ``` ### 二、引用标记的格式 **API 返回的响应中引用标记格式:** Google Gemini API 在响应文本中自动插入引用标记: ```text 根据搜索结果[1],Open WebUI 是一个开源平台[2]。用户可以通过插件[1][2] 扩展功能。 [1] https://docs.openwebui.com - Open WebUI 官方文档 [2] https://github.com/open-webui/open-webui - GitHub 仓库 ``` **引用标记特征:** - 格式:`[N]` 其中 N 是数字索引(1-based) - 位置:内联在文本中,跟在被引用的短语后 - 对应关系:`[1]` → `web_search_results[0]`,`[2]` → `web_search_results[1]` 等 ### 三、前端显示的 sources 格式 `emit_completion(sources=...)` 发送给前端的数据格式: ```python sources = [ { "title": "Open WebUI 官方文档", "uri": "https://docs.openwebui.com", "snippet": "Open WebUI 是一个开源的大语言模型管理平台...", "display_uri": "docs.openwebui.com", }, { "title": "Open WebUI GitHub 仓库", "uri": "https://github.com/open-webui/open-webui", "snippet": "开源代码库,包含所有源码和插件...", "display_uri": "github.com", } ] ``` **前端如何渲染:** 1. **识别内联引用标记** → 将 `[1]` 链接到 `sources[0]` 2. **在消息下方显示源面板**,通常格式为: ```text [1] Open WebUI 官方文档 (docs.openwebui.com) [2] Open WebUI GitHub 仓库 (github.com) ``` 3. **点击引用标记** → 高亮对应的源,显示摘要 4. **点击源链接** → 在新标签页打开 URL ### 四、完整数据流转 ```text 1. 用户启用搜索功能 (features["google_search_tool"] = true) ↓ 2. Pipe 配置 API:gen_content_conf.tools.append( types.Tool(google_search=types.GoogleSearch()) ) ↓ 3. Google Gemini API 执行搜索,返回: - 文本响应(含内联 [N] 标记) - grounding_metadata(含搜索结果和支撑段落) ↓ 4. gemini_manifold.py _process_part() 处理: - 提取文本响应 - 通过 _disable_special_tags() 处理特殊标签 - 返回结构化 chunk: {"content": "文本[1][2]..."} ↓ 5. _do_post_processing() 后处理: - 提取 candidate.grounding_metadata - 存入 request.app.state[f"grounding_{chat_id}_{message_id}"] - 提取 web_search_results → sources 列表 ↓ 6. emit_completion(content="...", sources=[...]) - 发送完整内容给前端 - 同时发送 sources 列表 ↓ 7. 前端渲染: - 消息体显示文本和 [1][2] 引用标记 - 底部显示 sources 面板 - 用户可交互查看源信息 ``` ### 五、可能需要移除的引用标记 在某些情况下(如用户编辑消息),需要调用 `_remove_citation_markers()` 移除不再有效的引用标记: ```python # 源数据结构(来自 grounding_metadata) source = { "uri": "https://example.com", "title": "Page Title", "metadata": [ { "supports": [ { "segment": { "start_index": 10, "end_index": 50, "text": "这是被引用的文本片段" }, "grounding_chunk_indices": [0, 1] # 对应 [1], [2] } ] } ] } # 方法会找到 "这是被引用的文本片段[1][2]" 并删除 [1][2] cleaned_text = _remove_citation_markers(response_text, [source]) ``` ### 六、关键要点 **✓ 引用的识别规则:** - 文本内联的 `[数字]` 是引用标记 - 必须对应 sources 列表中的同序号元素 - 通常由 API 自动生成和嵌入 **✗ 常见问题:** - 删除源但保留标记 → 前端会显示孤立的 `[N]` - 修改文本后标记位置错误 → 需要重新生成 - 混合多个搜索结果 → 确保索引连续且不重叠 ### 七、Chat/Completions 接口的响应格式 当直接通过 Open WebUI 的 `chat/completions` API 调用 pipe 时,响应应采用以下格式返回引用信息。 **流式响应(streaming=true):** Pipe 返回 `AsyncGenerator[dict]`,每个 dict 按以下顺序发送: ```python # 流式块(多次) { "choices": [ { "delta": { "content": "根据搜索结果[1],Open WebUI..." } } ] } # 完成标记 "data: [DONE]" # 后续元数据通过 event_emitter 事件发送 # 1. emit_status - 状态更新消息 # 2. emit_toast - 弹窗通知(如错误或成功提示) # 3. emit_usage - Token 使用量数据 # 4. emit_completion(sources=[...]) - 发送最终的源信息列表 ``` **关键特性:** - 文本内容通过 `{"choices": [{"delta": {"content": "..."}}]}` 流式返回 - 引用标记 `[1][2]` 直接包含在内容文本中 - 源信息通过 `emit_completion(sources=[...])` 以事件形式发送到前端 - 完成后发送 `"data: [DONE]"` 标记 **非流式响应(streaming=false):** 整个响应通过适配器转换为单次 AsyncGenerator: ```python async def single_item_stream(response): yield response # 输出结果类似流式,但内容全部在一个块中 { "choices": [ { "delta": { "content": "完整的回答文本[1][2]..." } } ] } "data: [DONE]" ``` ### 八、sources 数据的发送方式 #### 方式 1:通过 EventEmitter 事件发送(推荐) ```python await event_emitter.emit_completion( content=None, # 内容已通过 delta 发送 sources=[ { "title": "Open WebUI 官方文档", "uri": "https://docs.openwebui.com", "snippet": "Open WebUI 是一个开源的大语言模型管理平台...", "display_uri": "docs.openwebui.com", }, { "title": "Open WebUI GitHub 仓库", "uri": "https://github.com/open-webui/open-webui", "snippet": "开源代码库,包含所有源码和插件...", "display_uri": "github.com", } ], done=True ) ``` 这会产生事件: ```python { "type": "chat:completion", "data": { "done": True, "sources": [ {"title": "...", "uri": "...", ...}, {"title": "...", "uri": "...", ...} ] } } ``` #### 方式 2:通过应用状态存储(Companion Filter 读取) gemini_manifold 的 `_add_grounding_data_to_state()` 将 grounding_metadata 存入: ```python request.app.state[f"grounding_{chat_id}_{message_id}"] = grounding_metadata_obj ``` Companion Filter 或其他处理组件可以读取这个状态并从中提取源信息。 #### 方式 3:直接在响应文本中(最简单) 如果只需要在文本中显示源链接,可以让 API 返回: ```text 根据搜索结果[1],Open WebUI 是一个开源平台[2]。 [1] https://docs.openwebui.com - Open WebUI 官方文档 [2] https://github.com/open-webui/open-webui - GitHub 仓库 ``` 前端将识别 `[N]` 标记并自动提取为引用。 ### 九、完整的 pipe 返回规范 **Pipe 方法签名:** ```python async def pipe( self, body: dict, # 请求体:模型、消息、流式标志等 __user__: dict, # 用户信息 __request__: Request, # FastAPI Request __event_emitter__: Callable[[Event], Awaitable[None]] | None, # 事件发射器 __metadata__: dict, # 元数据:特性、任务类型等 ) -> AsyncGenerator[dict, None] | str: ... return self._unified_response_processor(...) ``` **返回的 AsyncGenerator 应产生的消息序列:** ```text 1. {"choices": [{"delta": {"content": "流式文本块..."}}]} ← 多次 2. {"choices": [{"delta": {"content": "[1][2]..."}}]} ← 最后的内容块 3. "data: [DONE]" ← 完成标记 4. (事件发送阶段) emit_status, emit_toast, emit_usage, emit_completion(sources=[...]) ``` **事件发送(通过 EventEmitter):** 这些不是 AsyncGenerator 的产出,而是通过 `__event_emitter__` 回调发送: ```python # 在处理过程中发送状态 await event_emitter.emit_status("处理中...", done=False) # 发送错误或成功提示 event_emitter.emit_toast("✓ 完成", "success") # 发送 Token 使用量 await event_emitter.emit_usage({ "prompt_tokens": 100, "completion_tokens": 50, "total_tokens": 150, "completion_time": 2.34 }) # 发送最终的源信息和其他元数据 await event_emitter.emit_completion( sources=[...], usage={...}, done=True ) ``` ### 十、实现 Pipe 时的源处理清单 当你实现一个支持搜索的 pipe 时,确保: **✓ 流式响应部分:** - [ ] 文本包含内联的 `[1]`, `[2]` 等引用标记 - [ ] 每个块通过 `yield {"choices": [{"delta": {"content": "..."}}]}` 返回 - [ ] 最后一块完成后发送 `yield "data: [DONE]"` **✓ 元数据部分:** - [ ] 调用 `emit_status()` 显示处理进度 - [ ] 调用 `emit_toast()` 通知成功或错误 - [ ] 调用 `emit_usage()` 发送 Token 使用量 - [ ] 调用 `emit_completion(sources=[...])` 发送源列表 **✓ 源数据结构:** - [ ] 每个源包含 `title`, `uri`, `snippet`, `display_uri` - [ ] 源的顺序与文本中 `[N]` 的顺序一一对应 - [ ] 使用 `emit_completion(sources=[...], done=True)` 标记完成 **✗ 常见错误:** - [ ] ❌ 只返回文本,不发送源信息 - [ ] ❌ 源数据格式不完整或字段错误 - [ ] ❌ 源顺序与引用标记不匹配 - [ ] ❌ 混合了内容和元数据返回方式 ## 补充:Open WebUI 核心模块详解 开发 Open WebUI Pipe 时,需要调用的五个核心模块及其功能说明: ```python from open_webui.models.chats import Chats from open_webui.models.files import FileForm, Files from open_webui.storage.provider import Storage from open_webui.models.functions import Functions from open_webui.utils.misc import pop_system_message ``` ### 模块 1:`Chats` - 聊天历史管理 **功能:** 访问和管理用户的聊天会话历史记录。 **核心方法:** ```python Chats.get_chat_by_id_and_user_id(id: str, user_id: str) -> Chat | None ``` **使用示例:** ```python # 获取特定用户的特定聊天记录 chat = Chats.get_chat_by_id_and_user_id( id=chat_id, user_id=user_data["id"] ) if chat: # 访问聊天内容和消息历史 chat_content = chat.chat # 获取 ChatObjectDataTD messages_db = chat_content.get("messages", [])[:-1] # 获取消息列表,排除最后的空响应 # 从消息中提取源信息(用于引用过滤) for i, message_db in enumerate(messages_db): sources = message_db.get("sources") # 提取引用源 files = message_db.get("files", []) # 提取文件列表 else: log.warning(f"Chat {chat_id} not found") ``` **关键数据结构:** ```python # Chat 对象包含: { "id": str, "user_id": str, "chat": { "messages": [ { "role": "user|assistant", "content": str, "files": [{"type": "file|image", "id": str, "url": str}], "sources": [{"uri": str, "title": str, ...}] }, ... {} # 最后一条消息为空(待填充的助手响应) ], "title": str } } ``` **使用场景:** - 需要访问历史消息以过滤引用标记 - 需要获取用户上传的文件附件列表 - 需要验证当前请求与数据库消息数量是否匹配 - 需要在处理过程中追踪消息上下文 **注意事项:** - ⚠️ **必须在线程中调用**:这是同步阻塞操作,需要用 `asyncio.to_thread()` 包装 - ⚠️ **返回值可为 None**:聊天不存在时返回 None,需要检查 - ⚠️ **消息数量验证**:请求体消息数必须等于数据库消息数,否则可能表示数据不同步 --- ### 模块 2:`Files` - 文件数据库操作 **功能:** 查询和管理 Open WebUI 文件数据库中的文件元数据。 **核心方法:** ```python # 查询文件 Files.get_file_by_id(file_id: str) -> FileModel | None # 创建新文件记录 Files.insert_new_file(user_id: str, file_form: FileForm) -> FileModel | None # 获取文件 MIME 类型等 FileForm( id: str, filename: str, path: str, meta: dict # 包含 content_type, size, data 等 ) ``` **使用示例:** ```python # 查询已上传的文件 file_model = await asyncio.to_thread(Files.get_file_by_id, file_id) if file_model: # 访问文件元数据 file_path = file_model.path # 磁盘路径或 gs:// 云存储路径 mime_type = file_model.meta.get("content_type") # e.g., "image/png" file_size = file_model.meta.get("size") # 读取文件内容 with open(file_path, "rb") as f: file_bytes = f.read() # 创建新文件记录(如生成图像后) file_item = await asyncio.to_thread( Files.insert_new_file, user_id, FileForm( id=str(uuid.uuid4()), filename="generated-image.png", path="/path/to/file", meta={ "name": "generated-image.png", "content_type": "image/png", "size": len(image_bytes), "data": { "model": model_name, "chat_id": chat_id, "message_id": message_id, } } ) ) ``` **关键数据结构:** ```python class FileModel: id: str user_id: str filename: str path: str # 本地路径或 gs:// URI meta: dict # 文件元数据 created_at: datetime updated_at: datetime meta = { "name": str, # 显示名称 "content_type": str, # MIME 类型 "size": int, # 字节数 "data": { # 自定义元数据 "model": str, "chat_id": str, "message_id": str, } } ``` **使用场景:** - 获取用户上传文件的实际路径和 MIME 类型 - 读取文件内容以上传到 Google Gemini API - 记录生成的图像和其他输出文件 - 追踪文件与生成任务的关联关系 **注意事项:** - ⚠️ **必须在线程中调用**:使用 `asyncio.to_thread()` 包装 - ⚠️ **返回值可为 None**:文件不存在时返回 None - ⚠️ **路径处理**:可能是本地路径或云存储 URI(gs://),读取时需要相应处理 - ⚠️ **元数据字段**:`meta["data"]` 是自定义字段,用于存储业务逻辑相关的上下文 --- ### 模块 3:`Storage` - 文件存储管理 **功能:** 上传和管理文件到 Open WebUI 的存储后端(本地磁盘或云存储如 Google Cloud Storage)。 **核心方法:** ```python Storage.upload_file( file: BinaryIO, # 文件对象 filename: str, # 文件名 tags: dict = {} # 标签 ) -> tuple[bytes, str] # 返回 (文件内容, 存储路径) ``` **使用示例:** ```python import io import uuid # 准备图像数据 image_data = generate_image() # 生成的字节数据 image_id = str(uuid.uuid4()) imagename = f"{image_id}_generated-image.png" image_file = io.BytesIO(image_data) # 上传到存储后端 try: contents, storage_path = await asyncio.to_thread( Storage.upload_file, image_file, imagename, tags={"model": model_name} # 可选标签 ) log.info(f"File uploaded to: {storage_path}") # storage_path 可能是: # - 本地: "/data/uploads/uuid_filename.png" # - 云存储: "gs://bucket/uploads/uuid_filename.png" except Exception as e: log.exception("Upload failed") ``` **关键特性:** ```text 存储层次: ├─ 本地存储:/data/uploads/ 下的文件 └─ 云存储:gs://bucket/ 下的 GCS 文件 自动处理: ├─ 创建目录 ├─ 重命名以避免冲突 ├─ 返回可访问的路径 └─ 支持标签分类 ``` **使用场景:** - 上传模型生成的图像 - 存储处理后的文件 - 在数据库记录前持久化文件 **注意事项:** - ⚠️ **必须在线程中调用**:使用 `asyncio.to_thread()` 包装 - ⚠️ **返回的路径**:取决于配置(本地/云),需要配合 `Files.insert_new_file` 记录 - ⚠️ **文件大小**:确保内存中有足够空间存储文件 - ✓ **与 Files 配合**:通常先 `Storage.upload_file()`,再 `Files.insert_new_file()` --- ### 模块 4:`Functions` - 过滤器/插件管理 **功能:** 查询已安装的过滤器(Filter)的状态和配置,用于检测依赖的 Companion Filter。 **核心方法:** ```python Functions.get_function_by_id(filter_id: str) -> Function | None # Function 对象属性: # - id: str # - name: str # - is_active: bool # 过滤器在 Functions 仪表板中是否启用 # - is_global: bool # 是否对所有模型全局启用 # - models: list[str] # 该过滤器启用的模型列表 ``` **使用示例:** ```python # 检查 Companion Filter 是否安装并启用 def is_feature_available(filter_id: str, metadata: dict) -> tuple[bool, bool]: """ 检查功能是否可用。 返回: (is_available, is_toggled_on) """ # 1. 检查过滤器是否已安装 f = Functions.get_function_by_id(filter_id) if not f: log.warning(f"Filter '{filter_id}' not installed") return (False, False) # 2. 检查过滤器在 Functions 仪表板中是否启用 if not f.is_active: log.warning(f"Filter '{filter_id}' is disabled in Functions dashboard") return (False, False) # 3. 检查过滤器是否为当前模型启用 model_id = metadata.get("model", {}).get("id") model_filters = metadata.get("model", {}).get("info", {}).get("meta", {}).get("filterIds", []) is_enabled = filter_id in model_filters or f.is_global if not is_enabled: log.debug(f"Filter '{filter_id}' not enabled for model '{model_id}'") return (False, False) # 4. 检查用户是否在当前请求中启用了该功能 user_toggled = filter_id in metadata.get("filter_ids", []) return (True, user_toggled) # 使用 is_available, is_enabled = is_feature_available( "gemini_manifold_companion_v1.7.0", metadata ) if is_available and is_enabled: log.info("Companion filter available and enabled") elif is_available: log.debug("Companion filter available but user disabled it") else: log.warning("Companion filter not available") ``` **关键检查流程:** ```text 功能可用性检查链: 1. 安装检查 Functions.get_function_by_id() → None? 返回不可用 2. 启用检查 f.is_active == False? 返回不可用 3. 模型启用检查 filter_id in model_filters or f.is_global? 否则返回不可用 4. 用户切换检查 filter_id in metadata["filter_ids"]? 返回用户是否启用 ``` **使用场景:** - 检测 Companion Filter 是否已安装(用于引用过滤功能) - 检查 URL Context Tool 或其他高级功能的依赖 - 在日志中区分"功能不可用"和"用户未启用" - 决定是否执行相关的处理逻辑 **注意事项:** - ✓ **同步操作**:不需要 `asyncio.to_thread()` - ⚠️ **返回值可为 None**:未安装的过滤器返回 None - ✓ **多层检查**:需要逐层检查安装、启用、配置、用户选择 - 💡 **日志级别**:根据检查阶段使用不同日志级别(warning/debug) --- ### 模块 5:`pop_system_message` - 消息提取工具 **功能:** 从消息列表中提取和分离系统消息。 **功能签名:** ```python pop_system_message( messages: list[Message] ) -> tuple[Message | None, list[Message]] ``` **使用示例:** ```python # 原始消息列表 messages = [ { "role": "system", "content": "You are a helpful assistant..." }, { "role": "user", "content": "What is Python?" }, { "role": "assistant", "content": "Python is a programming language..." } ] # 分离系统消息 system_message, remaining_messages = pop_system_message(messages) # 结果: # system_message = {"role": "system", "content": "You are a helpful assistant..."} # remaining_messages = [ # {"role": "user", "content": "What is Python?"}, # {"role": "assistant", "content": "Python is a programming language..."} # ] # 提取系统提示文本 system_prompt = (system_message or {}).get("content") # 检查是否存在系统消息 if system_prompt: log.debug(f"System prompt found: {system_prompt[:100]}...") else: log.debug("No system prompt provided") ``` **工作流程:** ```text 输入消息列表 ↓ 遍历找第一个 role=="system" 的消息 ↓ 提取该消息 ↓ 返回 (提取的消息, 剩余消息列表) ``` **关键特性:** - 返回元组:`(system_message, remaining_messages)` - `system_message` 为 None 如果不存在系统消息 - `remaining_messages` 不包含系统消息 - 只提取第一个系统消息(如果有多个,后续的被视为普通消息) **使用场景:** - 从 Open WebUI 的请求中提取系统消息 - 将系统消息转换为 `GenerateContentConfig.system_instruction` - 将其余消息作为对话上下文 **注意事项:** - ✓ **返回类型安全**:总是返回 2 元组 - ⚠️ **系统消息可为 None**:需要 `(system_message or {})` 防止错误 - ✓ **消息顺序保留**:`remaining_messages` 中的消息顺序保持原样 - 💡 **使用场景**:几乎所有 Pipe 都需要这个操作来提取系统提示 --- ### 通用使用技巧总结 #### 技巧 1:异步上下文中调用同步 API 这些模块的大部分方法都是同步阻塞的,但 Pipe 运行在异步上下文中: ```python # ❌ 错误:会阻塞事件循环 chat = Chats.get_chat_by_id_and_user_id(chat_id, user_id) # ✓ 正确:在线程池中运行 chat = await asyncio.to_thread( Chats.get_chat_by_id_and_user_id, chat_id, user_id ) ``` #### 技巧 2:链式 None 检查 由于这些 API 经常返回 None,使用链式赋值简化代码: ```python # ❌ 冗长 file_model = await asyncio.to_thread(Files.get_file_by_id, file_id) if file_model is None: return None file_path = file_model.path mime_type = file_model.meta.get("content_type") # ✓ 简洁 if not (file_model := await asyncio.to_thread(Files.get_file_by_id, file_id)): return None file_path = file_model.path mime_type = file_model.meta.get("content_type") ``` #### 技巧 3:错误恢复优先级 不同模块的错误处理优先级: ```python # 1. 功能检查失败 → 返回默认值,继续 if not (f := Functions.get_function_by_id(filter_id)): log.warning("Feature not available") return (False, False) # 2. 数据库查询失败 → 记录警告,但不中断流程 try: chat = await asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...) except Exception as e: log.exception("Failed to fetch chat history") chat = None # 3. 存储操作失败 → 使用 toast 通知用户,并记录错误 try: path = await asyncio.to_thread(Storage.upload_file, ...) except Exception as e: event_emitter.emit_toast("File upload failed", "error") log.exception("Storage error") raise ``` #### 技巧 4:并发操作优化 多个 API 调用时使用并发: ```python # ❌ 串行:慢 chat = await asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...) file = await asyncio.to_thread(Files.get_file_by_id, ...) filter_info = Functions.get_function_by_id(...) # ✓ 并发:快 chat, file = await asyncio.gather( asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...), asyncio.to_thread(Files.get_file_by_id, ...), ) filter_info = Functions.get_function_by_id(...) # 这个本来就是同步的 ``` #### 技巧 5:日志级别选择 根据严重程度选择日志级别: ```python # 配置问题(管理员处理)→ warning if not f.is_active: log.warning(f"Filter '{filter_id}' disabled in dashboard") # 正常功能流程(调试用)→ debug if filter_id not in model_filters: log.debug(f"Filter not in model list: {filter_id}") # 数据不一致(可能的 bug)→ error if len(messages_db) != len(messages_body): log.error("Message count mismatch") # 检查点(流程追踪)→ info if is_toggled_on: log.info(f"Feature '{filter_id}' enabled by user") ``` #### 技巧 6:元数据字段扩展 `Files.meta` 中的 `data` 字段是自定义字段,可存储任意上下文: ```python file_item = await asyncio.to_thread( Files.insert_new_file, user_id, FileForm( id=id, filename="output.json", path=path, meta={ "name": "output.json", "content_type": "application/json", "size": len(contents), "data": { # 自定义字段,存储业务逻辑上下文 "model": model_name, "chat_id": chat_id, "message_id": message_id, "timestamp": time.time(), "processing_time": elapsed_ms, "version": "v1.0", } } ) ) # 后续查询时可以恢复这些信息 if file_model.meta.get("data", {}).get("processing_time"): log.debug(f"File processed in {file_model.meta['data']['processing_time']}ms") ``` #### 技巧 7:条件式功能启用 根据多个条件决定是否启用某项功能: ```python # 检查引用过滤是否可用 companion_available, companion_enabled = is_feature_available( "gemini_manifold_companion", __metadata__ ) # 结合其他条件 can_filter_citations = ( companion_available and # 过滤器已安装 companion_enabled and # 用户启用了该功能 self.messages_db is not None and # 聊天历史可用 len(messages_db) == len(messages) # 消息数量一致 ) if can_filter_citations: # 执行引用过滤逻辑 ... else: # 跳过该功能 log.debug("Citation filtering unavailable") ``` --- ### 实战代码完整示例 ```python import asyncio from open_webui.models.chats import Chats from open_webui.models.files import FileForm, Files from open_webui.storage.provider import Storage from open_webui.models.functions import Functions from open_webui.utils.misc import pop_system_message class MyPipe: async def pipe( self, body: dict, __user__: dict, __request__, __event_emitter__, __metadata__: dict, ): # 1. 提取系统消息 system_message, messages = pop_system_message(body.get("messages", [])) system_prompt = (system_message or {}).get("content") # 2. 并发获取聊天和过滤器信息 chat_data, filter_status = await asyncio.gather( asyncio.to_thread( Chats.get_chat_by_id_and_user_id, __metadata__.get("chat_id", ""), __user__["id"] ), self._check_filter_available("companion_filter_id", __metadata__), return_exceptions=True ) # 3. 处理结果 chat = chat_data if not isinstance(chat_data, Exception) else None is_available, is_enabled = filter_status if not isinstance(filter_status, Exception) else (False, False) # 4. 条件式处理文件 if chat and is_available: for message in chat.chat.get("messages", []): if files := message.get("files", []): for file_ref in files: file_model = await asyncio.to_thread( Files.get_file_by_id, file_ref.get("id") ) if file_model: # 处理文件... pass # 5. 返回结果 async for chunk in self._generate_response(messages, system_prompt): yield chunk @staticmethod def _check_filter_available(filter_id: str, metadata: dict) -> tuple[bool, bool]: f = Functions.get_function_by_id(filter_id) if not f or not f.is_active: return (False, False) is_enabled = filter_id in metadata.get("filter_ids", []) or f.is_global return (True, is_enabled) ``` > 这些示例可直接集成进团队的插件开发指南或代码模板库,新插件可参考对应场景快速实现相关功能。