Files
Fu-Jie_openwebui-extensions/docs/examples/gemini_manifold_plugin_examples.md

1839 lines
56 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Gemini Manifold 插件通用例子
## 1. 配置层叠Valves + UserValves
```python
from pydantic import BaseModel
class Valves(BaseModel):
GEMINI_API_KEY: str | None = None
USE_VERTEX_AI: bool = False
THINKING_BUDGET: int = 8192
class UserValves(BaseModel):
GEMINI_API_KEY: str | None = None
THINKING_BUDGET: int | None = None
def merge_valves(default: Valves, user: UserValves | None) -> Valves:
merged = default.model_dump()
if user:
for field in user.model_fields:
value = getattr(user, field)
if value not in (None, ""):
merged[field] = value
return Valves(**merged)
admin_settings = Valves(GEMINI_API_KEY="admin-key", THINKING_BUDGET=8192)
user_settings = UserValves(GEMINI_API_KEY="user-key", THINKING_BUDGET=4096)
effective = merge_valves(admin_settings, user_settings)
print(effective)
```
**场景说明:**`gemini_manifold.py``Valves`/`UserValves` 合并逻辑一致,适用于需要在 admin 默认与用户覆盖之间做透明优先级控制的插件。
## 2. 异步事件与进度反馈EventEmitter + 上传队列)
```python
import asyncio
from typing import Callable, Awaitable
class EventEmitter:
"""
抽象事件发射器,将所有前端交互统一到异步通道中。
"""
def __init__(self, emit: Callable[[dict], Awaitable[None]] | None = None,
hide_successful_status: bool = False):
self._emit = emit
self.hide_successful_status = hide_successful_status
async def emit_status(self, message: str, done: bool = False, hidden: bool = False) -> None:
"""
发出状态消息。如果 done=True 且 hide_successful_status=True则在前端隐藏。
"""
if not self._emit:
return
if done and self.hide_successful_status:
hidden = True
event = {
"type": "status",
"data": {
"description": message,
"done": done,
"hidden": hidden
}
}
await self._emit(event)
async def emit_toast(self, msg: str, toast_type: str = "info") -> None:
"""
发出 toast 通知(弹窗)。
"""
if not self._emit:
return
event = {
"type": "notification",
"data": {
"type": toast_type,
"content": msg
}
}
await self._emit(event)
async def emit_completion(self, content: str | None = None, done: bool = False,
error: str | None = None, usage: dict | None = None) -> None:
"""
发出完成事件,可含内容、错误、使用量等信息。
"""
if not self._emit:
return
event = {"type": "chat:completion", "data": {"done": done}}
if content is not None:
event["data"]["content"] = content
if error is not None:
event["data"]["error"] = {"detail": error}
if usage is not None:
event["data"]["usage"] = usage
await self._emit(event)
class UploadStatusManager:
"""
管理并发文件上传的状态,自动追踪注册与完成计数。
"""
def __init__(self, emitter: EventEmitter, start_time: float):
self.emitter = emitter
self.start_time = start_time
self.queue = asyncio.Queue()
self.total_uploads_expected = 0
self.uploads_completed = 0
self.finalize_received = False
self.is_active = False
async def run(self) -> None:
"""
后台任务,监听队列并发出状态更新。
"""
import time
while not (self.finalize_received and
self.total_uploads_expected == self.uploads_completed):
try:
msg = await asyncio.wait_for(self.queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
msg_type = msg[0]
if msg_type == "REGISTER_UPLOAD":
self.is_active = True
self.total_uploads_expected += 1
await self._emit_progress_update(time.monotonic())
elif msg_type == "COMPLETE_UPLOAD":
self.uploads_completed += 1
await self._emit_progress_update(time.monotonic())
elif msg_type == "FINALIZE":
self.finalize_received = True
self.queue.task_done()
async def _emit_progress_update(self, current_time: float) -> None:
"""发出进度更新到前端。"""
if not self.is_active:
return
elapsed = current_time - self.start_time
time_str = f"(+{elapsed:.2f}s)"
is_done = (self.total_uploads_expected > 0 and
self.uploads_completed == self.total_uploads_expected)
if is_done:
msg = f"上传完成。{self.uploads_completed} 个文件已处理。{time_str}"
else:
msg = f"上传中 {self.uploads_completed + 1}/{self.total_uploads_expected}{time_str}"
await self.emitter.emit_status(msg, done=is_done)
async def multi_file_upload_workflow(
file_list: list[tuple[str, bytes]],
emitter: EventEmitter
) -> list[str]:
"""
示范多文件并发上传的完整工作流。
"""
import time
start_time = time.monotonic()
status_mgr = UploadStatusManager(emitter, start_time)
# 启动后台状态管理器
manager_task = asyncio.create_task(status_mgr.run())
# 为每个文件创建上传任务
async def upload_file(name: str, data: bytes) -> str:
await status_mgr.queue.put(("REGISTER_UPLOAD",))
try:
await asyncio.sleep(0.5) # 模拟网络延迟
result = f"uploaded-{name}"
await emitter.emit_toast(f"✓ 文件 {name} 上传成功", "success")
return result
except Exception as e:
await emitter.emit_toast(f"✗ 文件 {name} 上传失败: {e}", "error")
raise
finally:
await status_mgr.queue.put(("COMPLETE_UPLOAD",))
# 并发执行所有上传
tasks = [upload_file(name, data) for name, data in file_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 通知状态管理器完成
await status_mgr.queue.put(("FINALIZE",))
await manager_task
# 汇总结果
success = [r for r in results if not isinstance(r, Exception)]
return success
# 完整使用示例
async def demo():
async def fake_emit(payload):
print(f"{payload['type']}: {payload['data']}")
emitter = EventEmitter(fake_emit, hide_successful_status=False)
files = [
("doc1.pdf", b"content1"),
("image.jpg", b"content2"),
("data.csv", b"content3"),
]
results = await multi_file_upload_workflow(files, emitter)
print(f"\n✓ 上传成功: {len(results)} 个文件")
asyncio.run(demo())
```
**完整流程状态显示说明:**
整个异步工作流的状态显示遵循以下链路:
```text
初始化
发出"准备请求"状态 → [emit_status] → 前端显示状态条
启动后台 UploadStatusManager 任务
并发执行多个上传任务
├─→ 任务1: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 1/3…")
├─→ 任务2: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 2/3…")
└─→ 任务3: REGISTER_UPLOAD → [更新计数] → emit_status("上传中 3/3…")
每个任务完成时
├─→ emit_toast("✓ 文件上传成功", "success") → 前端弹窗确认
└─→ COMPLETE_UPLOAD → [更新计数] → emit_status("上传中 1/3…") 或 "上传完成"
所有任务完成 → FINALIZE → 关闭后台管理器
发出最终状态 → emit_status("全部完成", done=True) → 前端状态条完成
```
**关键数据流动:**
1. **EventEmitter** 负责将事件发送到前端
- `emit_status()`: 状态条消息
- `emit_toast()`: 弹窗通知
- `emit_completion()`: 完成事件(含 usage 等)
2. **UploadStatusManager** 后台任务持续监听队列
- 接收 `("REGISTER_UPLOAD",)` → 计数加 1 → 计算进度 → 更新状态显示
- 接收 `("COMPLETE_UPLOAD",)` → 计数加 1 → 重新计算进度 → 更新状态显示
- 接收 `("FINALIZE",)` → 退出循环 → 任务完成
3. **实时计数逻辑**
```python
已完成数 / 总数 = 进度百分比
显示: "上传中 {已完成+1}/{总数}… (+X.XXs)"
当完成数 == 总数: 显示 "上传完成。3 个文件已处理。(+2.50s)"
```
**场景说明:** 完整模拟 `gemini_manifold.py``EventEmitter` + `UploadStatusManager` 的实战设计。支持多并发任务状态跟踪、自动计数、toast 通知与后台进度汇报。适用于:
- 多文件并发上传且需要实时进度反馈的场景
- API 轮询或长流程中持续向前端汇报进展
- 需要自隐藏成功状态但保留错误警告的交互流程
- 复杂的异步任务编排与协调
- 需要细粒度时间戳与计数统计的长流程
**场景说明:** 完整模拟 `gemini_manifold.py``EventEmitter` + `UploadStatusManager` 的实战设计。支持多并发任务状态跟踪、自动计数、toast 通知与后台进度汇报。适用于:
- 多文件并发上传且需要实时进度反馈的场景
- API 轮询或长流程中持续向前端汇报进展
- 需要自隐藏成功状态但保留错误警告的交互流程
- 复杂的异步任务编排与协调
- 需要细粒度时间戳与计数统计的长流程
## 3. 文件缓存 + 幂等上传xxHash + deterministic 名称)
```python
import xxhash
def content_hash(data: bytes) -> str:
return xxhash.xxh64(data).hexdigest()
cache: dict[str, str] = {}
def deterministic_name(hash_val: str) -> str:
return f"files/owui-v1-{hash_val}"
async def maybe_upload(data: bytes):
h = content_hash(data)
if h in cache:
print("cache hit", cache[h])
return cache[h]
name = deterministic_name(h)
cache[h] = name
print("uploading", name)
return name
```
**场景说明** 简化版 `FilesAPIManager` //冷路径适合需要避免重复上传并希望后端能通过 deterministic 名称恢复文件状态的场景
## 4. 统一响应处理(流式 + 非流式适配)
```python
from typing import AsyncGenerator
class UnifiedResponseProcessor:
async def process_stream(
self, response_stream: AsyncGenerator, is_stream: bool = True
) -> AsyncGenerator:
"""
处理流式或一次性响应,统一返回 AsyncGenerator。
"""
try:
async for chunk in response_stream:
# 处理单个 chunk
processed = await self._process_chunk(chunk)
if processed:
yield {"choices": [{"delta": processed}]}
except Exception as e:
yield {"choices": [{"delta": {"content": f"Error: {e}"}}]}
finally:
yield "data: [DONE]"
async def _process_chunk(self, chunk):
# 简化处理逻辑
return {"content": str(chunk)}
# 使用示例
async def main():
processor = UnifiedResponseProcessor()
async def fake_stream():
for i in range(3):
yield f"chunk-{i}"
async for item in processor.process_stream(fake_stream()):
print(item)
```
**场景说明:** 对应 `gemini_manifold.py``_unified_response_processor` 的核心思想——无论前端是否启用流式,插件内部都用统一的 AsyncGenerator 处理,避免代码分支。适用于需要兼容流式与非流式响应的任何插件。
## 5. 特殊标签禁用(防止前端解析干扰)
```python
import re
ZWS = "\u200b" # 零宽空格
SPECIAL_TAGS = ["think", "details", "thinking", "reason"]
def disable_special_tags(text: str) -> tuple[str, int]:
"""
在特殊标签前插入零宽空格,防止前端 HTML 解析器处理它们。
"""
if not text:
return "", 0
TAG_REGEX = re.compile(
r"<(/?(" + "|".join(re.escape(tag) for tag in SPECIAL_TAGS) + r"))"
)
modified, count = TAG_REGEX.subn(rf"<{ZWS}\1", text)
return modified, count
def enable_special_tags(text: str) -> str:
"""
移除零宽空格,恢复原始标签,用于模型理解上下文。
"""
if not text:
return ""
REVERSE_REGEX = re.compile(
r"<" + ZWS + r"(/?(" + "|".join(re.escape(tag) for tag in SPECIAL_TAGS) + r"))"
)
return REVERSE_REGEX.sub(r"<\1", text)
# 使用示例
original = "<think>这是思考内容</think>"
disabled, count = disable_special_tags(original)
print(f"禁用前: {original}")
print(f"禁用后: {disabled}")
print(f"修改数: {count}")
```
**场景说明:** 当模型可能生成会被前端 HTML 解析器误触发的标签(如 `<think>` 推理块)时,通过注入零宽空格破坏标签结构,再在需要时恢复。这是 `gemini_manifold.py` 中保护前端的一种防御手段,对任何可能含有模型生成 HTML 的插件都有借鉴价值。
## 6. 统一异常处理与用户反馈
```python
class PluginException(Exception):
"""插件统一异常基类。"""
pass
class APIError(PluginException):
"""API 调用异常。"""
pass
class FileUploadError(PluginException):
"""文件上传异常。"""
pass
class EventEmitterForErrors:
def __init__(self):
self.event_queue = []
async def emit_error(self, error_msg: str, is_toast: bool = True):
"""
发出错误事件,同时记录日志。
"""
event = {"type": "error", "data": {"detail": error_msg}}
if is_toast:
event["data"]["toast_type"] = "error"
self.event_queue.append(event)
print(f"[ERROR] {error_msg}")
async def call_api_with_fallback(api_key: str, emitter: EventEmitterForErrors):
"""
调用 API 并完整处理异常。
"""
try:
# 模拟 API 调用
if not api_key:
raise ValueError("API key 未提供")
# 成功处理
return {"status": "ok"}
except ValueError as e:
await emitter.emit_error(f"参数错误: {e}")
raise APIError(f"API 调用失败: {e}") from e
except Exception as e:
await emitter.emit_error(f"意外错误: {e}", is_toast=True)
raise PluginException(f"插件异常: {e}") from e
# 使用示例
import asyncio
emitter = EventEmitterForErrors()
try:
result = asyncio.run(call_api_with_fallback("", emitter))
except PluginException as e:
print(f"捕获到插件异常: {e}")
```
**场景说明:** 对应 `gemini_manifold.py``GenaiApiError``FilesAPIError` 等定制异常。通过分层异常类和统一的 emit_error 机制,确保所有错误都能被前端看到,同时便于调试和日志分析。
## 7. 后处理与数据回写Usage + Grounding
```python
from datetime import datetime
class PostProcessor:
def __init__(self, request_state):
self.state = request_state
async def emit_usage(self, prompt_tokens: int, completion_tokens: int):
"""
发出 Token 使用情况。
"""
total = prompt_tokens + completion_tokens
elapsed = datetime.now().timestamp()
usage_data = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total,
"completion_time": elapsed,
}
print(f"Usage: {usage_data}")
return usage_data
async def emit_grounding(self, chat_id: str, message_id: str, grounding_metadata):
"""
将 grounding 数据存入应用状态,供 Filter 或后续步骤使用。
"""
key = f"grounding_{chat_id}_{message_id}"
self.state[key] = grounding_metadata
print(f"存储 grounding 数据到 {key}")
async def emit_status(self, message: str, done: bool = False):
"""
发出最终状态。
"""
status_event = {
"type": "status",
"data": {"description": message, "done": done}
}
print(f"Status: {status_event}")
# 使用示例
async def main():
state = {} # 模拟 request.app.state
processor = PostProcessor(state)
await processor.emit_usage(prompt_tokens=50, completion_tokens=100)
await processor.emit_grounding(
chat_id="chat_123",
message_id="msg_456",
grounding_metadata={"sources": ["source1", "source2"]}
)
await processor.emit_status("Response finished", done=True)
print("\n最终状态:", state)
asyncio.run(main())
```
**场景说明:** 模拟 `gemini_manifold.py``_do_post_processing` 的职责——在主响应流完成后,将 usage、grounding、状态等元数据通过独立通道发出。这种分离确保前端能获得完整信息同时不阻塞流式响应。
## 8. 日志与数据截断Loguru + 自动截断)
```python
import json
from functools import wraps
class PluginLogger:
def __init__(self, max_payload_length: int = 256):
self.max_length = max_payload_length
def truncate_payload(self, data: any) -> str:
"""
将复杂数据序列化并截断。
"""
try:
serialized = json.dumps(data, default=str)
if len(serialized) > self.max_length:
return serialized[:self.max_length] + "[...]"
return serialized
except Exception as e:
return f"<Serialization Error: {e}>"
def log_with_payload(self, level: str, message: str, payload: any = None):
"""
记录带有 payload 的日志,自动截断。
"""
log_line = f"[{level}] {message}"
if payload is not None:
truncated = self.truncate_payload(payload)
log_line += f" - {truncated}"
print(log_line)
# 使用示例
logger = PluginLogger(max_payload_length=100)
logger.log_with_payload("DEBUG", "API Response",
payload={"data": "x" * 200, "status": "ok"})
logger.log_with_payload("INFO", "File uploaded",
payload={"file_id": "abc123", "size": 1024})
```
**场景说明:** 对应 `gemini_manifold.py` 中自定义 loguru handler 与 `_truncate_long_strings` 的逻辑。当插件需要调试复杂 API 响应或大型 payload 时,通过自动截断避免日志爆炸,同时保留关键信息。
## 9. 联网搜索功能与源引用显示
```python
from typing import TypedDict
class SearchSource(TypedDict):
title: str
url: str
snippet: str
class GroundingMetadata(TypedDict):
search_queries: list[str] # 使用的搜索查询
sources: list[SearchSource] # 检索到的源
class SearchableResponseBuilder:
"""
当启用联网搜索功能时,构建含有搜索信息的响应。
对应 gemini_manifold.py 中依据 features["google_search_tool"] 的逻辑。
"""
def __init__(self, enable_search: bool = False, emitter = None):
self.enable_search = enable_search
self.emitter = emitter
self.grounding_metadata: GroundingMetadata | None = None
async def build_response_with_search(self,
query: str,
use_search: bool = True) -> tuple[str, GroundingMetadata | None]:
"""
构建响应,如果启用搜索则收集源信息。
"""
if not (self.enable_search and use_search):
# 未启用搜索,直接返回响应
return "这是直接回答,无搜索", None
# 模拟搜索过程
search_results = await self._perform_search(query)
# 构建 grounding 元数据
self.grounding_metadata = {
"search_queries": [query],
"sources": search_results
}
# 构建含源引用的响应
response_with_sources = await self._format_response_with_citations(
query, search_results
)
return response_with_sources, self.grounding_metadata
async def _perform_search(self, query: str) -> list[SearchSource]:
"""
模拟调用 Google Search API实际中由 gemini_manifold.py 的 tool 层处理)。
"""
# 模拟搜索结果
results = [
{
"title": "Open WebUI 官方文档",
"url": "https://docs.openwebui.com",
"snippet": "Open WebUI 是一个开源的大语言模型管理平台..."
},
{
"title": "Open WebUI GitHub 仓库",
"url": "https://github.com/open-webui/open-webui",
"snippet": "开源代码库,包含所有源码和插件..."
},
{
"title": "Open WebUI 社区论坛",
"url": "https://community.openwebui.com",
"snippet": "用户交流和问题解答社区..."
}
]
if self.emitter:
await self.emitter.emit_toast(
f"✓ 已搜索: '{query}' 找到 {len(results)} 个结果",
"success"
)
return results
async def _format_response_with_citations(self,
query: str,
sources: list[SearchSource]) -> str:
"""
将搜索结果格式化为含有源引用的响应。
"""
response = f"关于 '{query}' 的搜索结果:\n\n"
for idx, source in enumerate(sources, 1):
response += f"[{idx}] **{source['title']}**\n"
response += f" URL: {source['url']}\n"
response += f" 摘要: {source['snippet']}\n\n"
response += "---\n\n根据上述源的信息,可以得出以下结论:\n"
response += "Open WebUI 是一个功能丰富的平台,提供了完整的文档、源码和社区支持。"
return response
def extract_sources_for_frontend(self) -> list[dict]:
"""
提取 grounding 元数据中的源,用于前端显示为 'sources' 字段。
对应 gemini_manifold.py 中 emit_completion(sources=...) 的数据。
"""
if not self.grounding_metadata:
return []
sources_for_ui = []
for source in self.grounding_metadata["sources"]:
sources_for_ui.append({
"title": source["title"],
"url": source["url"],
"description": source["snippet"]
})
return sources_for_ui
async def demo_search_workflow():
"""
演示启用联网搜索时的完整工作流。
"""
class FakeEmitter:
async def emit_toast(self, msg, ttype):
print(f"[{ttype.upper()}] {msg}")
async def emit_status(self, msg, done=False):
print(f"[STATUS] {msg} (done={done})")
emitter = FakeEmitter()
# 创建搜索构建器,启用联网搜索
builder = SearchableResponseBuilder(enable_search=True, emitter=emitter)
# 步骤 1: 用户提问
user_query = "Open WebUI 的插件开发最佳实践"
await emitter.emit_status(f"处理查询: {user_query}")
# 步骤 2: 构建响应并收集源
response_text, grounding = await builder.build_response_with_search(user_query)
# 步骤 3: 提取源引用供前端使用
sources_for_ui = builder.extract_sources_for_frontend()
# 步骤 4: 构建完整的 completion 事件
completion_event = {
"type": "chat:completion",
"data": {
"content": response_text,
"sources": sources_for_ui, # 前端将在消息下方显示这些源
"done": True
}
}
print("\n=== 最终响应 ===")
print(f"内容:\n{response_text}")
print(f"\n源信息 (供前端显示):")
for source in sources_for_ui:
print(f" - {source['title']}: {source['url']}")
print(f"\n完整事件数据:")
import json
print(json.dumps(completion_event, ensure_ascii=False, indent=2))
asyncio.run(demo_search_workflow())
```
**实现细节说明:**
联网搜索功能的完整链路(对应 `gemini_manifold.py`
```text
1. 前端请求时features 包含 "google_search_tool": true
2. Pipe.pipe() 检测到 features["google_search_tool"]
3. 在 _build_gen_content_config() 中:
gen_content_conf.tools.append(
types.Tool(google_search=types.GoogleSearch())
)
4. 将 config 传给 Google Gemini API
5. API 自动执行搜索并返回搜索结果
6. 获取 response.candidates[0].grounding_metadata
├─ 包含搜索查询
├─ 包含检索到的源标题、URL、摘要
└─ 包含段落级的源匹配信息
7. 在 _do_post_processing() 中:
将 grounding_metadata 存入 request.app.state
供后续 Filter 使用
8. 在响应流中通过 emit_completion(sources=...)
将源引用发送到前端
9. 前端在消息下方显示:
[1] 源标题 (链接)
[2] 源标题 (链接)
...
```
**关键实现要点:**
| 步骤 | 职责 | 代码位置 |
|------|------|---------|
| **检测开关** | 检查 `features["google_search_tool"]` | `_build_gen_content_config()` |
| **配置工具** | 将 `google_search` 添加到 tools 列表 | `gen_content_conf.tools.append()` |
| **执行搜索** | Google API 自动执行,返回 grounding_metadata | API 响应处理 |
| **提取源** | 从 grounding_metadata 提取源信息 | `_do_post_processing()` |
| **存储状态** | 将 grounding 存入 `request.app.state` | `_add_grounding_data_to_state()` |
| **发送前端** | 通过 `emit_completion(sources=...)` 发送 | `_unified_response_processor()` |
| **显示引用** | 前端渲染源链接和摘要 | 前端 UI 逻辑 |
**场景说明:** 展示如何在启用联网搜索时收集、处理和展示搜索源。适用于:
- 需要集成搜索功能的插件
- 需要展示信息来源的智能应用
- 需要追踪 API 调用的溯源场景
- 需要构建可引用的 LLM 应用
## 总结与最佳实践
| 哲学 | 核心机制 | 使用场景 |
|------|---------|----------|
| **配置层叠** | Valves + UserValves 合并 | Admin 全局设置 + 用户按需覆盖 |
| **异步反馈** | EventEmitter + Queue | 长流程中持续向前端汇报状态 |
| **资源复用** | xxHash + 缓存 + Stateless GET | 避免重复上传,快速恢复 |
| **统一处理** | AsyncGenerator + 适配器 | 流式和非流式响应一致处理 |
| **安全防护** | 特殊标签注入 ZWS | 防止模型生成的 HTML 破坏前端 |
| **异常管理** | 分层异常 + emit_error | 所有错误对前端可见 |
| **后处理** | Usage/Grounding 在 response 后 | 非阻塞式补充元数据 |
| **日志控制** | 自动截断 + 多级别 | 避免日志爆炸,便于调试 |
| **搜索集成** | grounding_metadata 提取 + 源展示 | 联网搜索时收集并显示信息来源 |
## 补充:响应格式与引用解析
### 一、源Source的数据结构
当启用联网搜索时Google Gemini API 返回的 `grounding_metadata` 包含搜索源信息,对应以下结构:
```python
# Google API 返回的 grounding_metadata 格式
{
"search_queries": ["用户的搜索查询"],
"web_search_results": [
{
"uri": "https://example.com/page1",
"title": "网页标题",
"snippet": "网页摘要文本...",
"display_uri": "example.com",
},
# ... 更多搜索结果
],
"grounding_supports": [
{
"segment": {
"start_index": 0,
"end_index": 145,
"text": "模型回答中被引用的这段文本"
},
"supporting_segments": [
{
"segment": {
"text": "网页中的相关内容"
},
"uri": "https://example.com/page1"
}
],
"confidence_scores": [0.95]
}
]
}
```
### 二、引用标记的格式
**API 返回的响应中引用标记格式:**
Google Gemini API 在响应文本中自动插入引用标记:
```text
根据搜索结果[1]Open WebUI 是一个开源平台[2]。用户可以通过插件[1][2]
扩展功能。
[1] https://docs.openwebui.com - Open WebUI 官方文档
[2] https://github.com/open-webui/open-webui - GitHub 仓库
```
**引用标记特征:**
- 格式:`[N]` 其中 N 是数字索引1-based
- 位置:内联在文本中,跟在被引用的短语后
- 对应关系:`[1]``web_search_results[0]``[2]``web_search_results[1]`
### 三、前端显示的 sources 格式
`emit_completion(sources=...)` 发送给前端的数据格式:
```python
sources = [
{
"title": "Open WebUI 官方文档",
"uri": "https://docs.openwebui.com",
"snippet": "Open WebUI 是一个开源的大语言模型管理平台...",
"display_uri": "docs.openwebui.com",
},
{
"title": "Open WebUI GitHub 仓库",
"uri": "https://github.com/open-webui/open-webui",
"snippet": "开源代码库,包含所有源码和插件...",
"display_uri": "github.com",
}
]
```
**前端如何渲染:**
1. **识别内联引用标记** → 将 `[1]` 链接到 `sources[0]`
2. **在消息下方显示源面板**,通常格式为:
```text
[1] Open WebUI 官方文档 (docs.openwebui.com)
[2] Open WebUI GitHub 仓库 (github.com)
```
3. **点击引用标记** → 高亮对应的源,显示摘要
4. **点击源链接** → 在新标签页打开 URL
### 四、完整数据流转
```text
1. 用户启用搜索功能 (features["google_search_tool"] = true)
2. Pipe 配置 APIgen_content_conf.tools.append(
types.Tool(google_search=types.GoogleSearch())
)
3. Google Gemini API 执行搜索,返回:
- 文本响应(含内联 [N] 标记)
- grounding_metadata含搜索结果和支撑段落
4. gemini_manifold.py _process_part() 处理:
- 提取文本响应
- 通过 _disable_special_tags() 处理特殊标签
- 返回结构化 chunk: {"content": "文本[1][2]..."}
5. _do_post_processing() 后处理:
- 提取 candidate.grounding_metadata
- 存入 request.app.state[f"grounding_{chat_id}_{message_id}"]
- 提取 web_search_results → sources 列表
6. emit_completion(content="...", sources=[...])
- 发送完整内容给前端
- 同时发送 sources 列表
7. 前端渲染:
- 消息体显示文本和 [1][2] 引用标记
- 底部显示 sources 面板
- 用户可交互查看源信息
```
### 五、可能需要移除的引用标记
在某些情况下(如用户编辑消息),需要调用 `_remove_citation_markers()` 移除不再有效的引用标记:
```python
# 源数据结构(来自 grounding_metadata
source = {
"uri": "https://example.com",
"title": "Page Title",
"metadata": [
{
"supports": [
{
"segment": {
"start_index": 10,
"end_index": 50,
"text": "这是被引用的文本片段"
},
"grounding_chunk_indices": [0, 1] # 对应 [1], [2]
}
]
}
]
}
# 方法会找到 "这是被引用的文本片段[1][2]" 并删除 [1][2]
cleaned_text = _remove_citation_markers(response_text, [source])
```
### 六、关键要点
**✓ 引用的识别规则:**
- 文本内联的 `[数字]` 是引用标记
- 必须对应 sources 列表中的同序号元素
- 通常由 API 自动生成和嵌入
**✗ 常见问题:**
- 删除源但保留标记 → 前端会显示孤立的 `[N]`
- 修改文本后标记位置错误 → 需要重新生成
- 混合多个搜索结果 → 确保索引连续且不重叠
### 七、Chat/Completions 接口的响应格式
当直接通过 Open WebUI 的 `chat/completions` API 调用 pipe 时,响应应采用以下格式返回引用信息。
**流式响应streaming=true**
Pipe 返回 `AsyncGenerator[dict]`,每个 dict 按以下顺序发送:
```python
# 流式块(多次)
{
"choices": [
{
"delta": {
"content": "根据搜索结果[1]Open WebUI..."
}
}
]
}
# 完成标记
"data: [DONE]"
# 后续元数据通过 event_emitter 事件发送
# 1. emit_status - 状态更新消息
# 2. emit_toast - 弹窗通知(如错误或成功提示)
# 3. emit_usage - Token 使用量数据
# 4. emit_completion(sources=[...]) - 发送最终的源信息列表
```
**关键特性:**
- 文本内容通过 `{"choices": [{"delta": {"content": "..."}}]}` 流式返回
- 引用标记 `[1][2]` 直接包含在内容文本中
- 源信息通过 `emit_completion(sources=[...])` 以事件形式发送到前端
- 完成后发送 `"data: [DONE]"` 标记
**非流式响应streaming=false**
整个响应通过适配器转换为单次 AsyncGenerator
```python
async def single_item_stream(response):
yield response
# 输出结果类似流式,但内容全部在一个块中
{
"choices": [
{
"delta": {
"content": "完整的回答文本[1][2]..."
}
}
]
}
"data: [DONE]"
```
### 八、sources 数据的发送方式
#### 方式 1通过 EventEmitter 事件发送(推荐)
```python
await event_emitter.emit_completion(
content=None, # 内容已通过 delta 发送
sources=[
{
"title": "Open WebUI 官方文档",
"uri": "https://docs.openwebui.com",
"snippet": "Open WebUI 是一个开源的大语言模型管理平台...",
"display_uri": "docs.openwebui.com",
},
{
"title": "Open WebUI GitHub 仓库",
"uri": "https://github.com/open-webui/open-webui",
"snippet": "开源代码库,包含所有源码和插件...",
"display_uri": "github.com",
}
],
done=True
)
```
这会产生事件:
```python
{
"type": "chat:completion",
"data": {
"done": True,
"sources": [
{"title": "...", "uri": "...", ...},
{"title": "...", "uri": "...", ...}
]
}
}
```
#### 方式 2通过应用状态存储Companion Filter 读取)
gemini_manifold 的 `_add_grounding_data_to_state()` 将 grounding_metadata 存入:
```python
request.app.state[f"grounding_{chat_id}_{message_id}"] = grounding_metadata_obj
```
Companion Filter 或其他处理组件可以读取这个状态并从中提取源信息。
#### 方式 3直接在响应文本中最简单
如果只需要在文本中显示源链接,可以让 API 返回:
```text
根据搜索结果[1]Open WebUI 是一个开源平台[2]。
[1] https://docs.openwebui.com - Open WebUI 官方文档
[2] https://github.com/open-webui/open-webui - GitHub 仓库
```
前端将识别 `[N]` 标记并自动提取为引用。
### 九、完整的 pipe 返回规范
**Pipe 方法签名:**
```python
async def pipe(
self,
body: dict, # 请求体:模型、消息、流式标志等
__user__: dict, # 用户信息
__request__: Request, # FastAPI Request
__event_emitter__: Callable[[Event], Awaitable[None]] | None, # 事件发射器
__metadata__: dict, # 元数据:特性、任务类型等
) -> AsyncGenerator[dict, None] | str:
...
return self._unified_response_processor(...)
```
**返回的 AsyncGenerator 应产生的消息序列:**
```text
1. {"choices": [{"delta": {"content": "流式文本块..."}}]} ← 多次
2. {"choices": [{"delta": {"content": "[1][2]..."}}]} ← 最后的内容块
3. "data: [DONE]" ← 完成标记
4. (事件发送阶段) emit_status, emit_toast, emit_usage, emit_completion(sources=[...])
```
**事件发送(通过 EventEmitter**
这些不是 AsyncGenerator 的产出,而是通过 `__event_emitter__` 回调发送:
```python
# 在处理过程中发送状态
await event_emitter.emit_status("处理中...", done=False)
# 发送错误或成功提示
event_emitter.emit_toast("✓ 完成", "success")
# 发送 Token 使用量
await event_emitter.emit_usage({
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
"completion_time": 2.34
})
# 发送最终的源信息和其他元数据
await event_emitter.emit_completion(
sources=[...],
usage={...},
done=True
)
```
### 十、实现 Pipe 时的源处理清单
当你实现一个支持搜索的 pipe 时,确保:
**✓ 流式响应部分:**
- [ ] 文本包含内联的 `[1]`, `[2]` 等引用标记
- [ ] 每个块通过 `yield {"choices": [{"delta": {"content": "..."}}]}` 返回
- [ ] 最后一块完成后发送 `yield "data: [DONE]"`
**✓ 元数据部分:**
- [ ] 调用 `emit_status()` 显示处理进度
- [ ] 调用 `emit_toast()` 通知成功或错误
- [ ] 调用 `emit_usage()` 发送 Token 使用量
- [ ] 调用 `emit_completion(sources=[...])` 发送源列表
**✓ 源数据结构:**
- [ ] 每个源包含 `title`, `uri`, `snippet`, `display_uri`
- [ ] 源的顺序与文本中 `[N]` 的顺序一一对应
- [ ] 使用 `emit_completion(sources=[...], done=True)` 标记完成
**✗ 常见错误:**
- [ ] ❌ 只返回文本,不发送源信息
- [ ] ❌ 源数据格式不完整或字段错误
- [ ] ❌ 源顺序与引用标记不匹配
- [ ] ❌ 混合了内容和元数据返回方式
## 补充Open WebUI 核心模块详解
开发 Open WebUI Pipe 时,需要调用的五个核心模块及其功能说明:
```python
from open_webui.models.chats import Chats
from open_webui.models.files import FileForm, Files
from open_webui.storage.provider import Storage
from open_webui.models.functions import Functions
from open_webui.utils.misc import pop_system_message
```
### 模块 1`Chats` - 聊天历史管理
**功能:** 访问和管理用户的聊天会话历史记录。
**核心方法:**
```python
Chats.get_chat_by_id_and_user_id(id: str, user_id: str) -> Chat | None
```
**使用示例:**
```python
# 获取特定用户的特定聊天记录
chat = Chats.get_chat_by_id_and_user_id(
id=chat_id,
user_id=user_data["id"]
)
if chat:
# 访问聊天内容和消息历史
chat_content = chat.chat # 获取 ChatObjectDataTD
messages_db = chat_content.get("messages", [])[:-1] # 获取消息列表,排除最后的空响应
# 从消息中提取源信息(用于引用过滤)
for i, message_db in enumerate(messages_db):
sources = message_db.get("sources") # 提取引用源
files = message_db.get("files", []) # 提取文件列表
else:
log.warning(f"Chat {chat_id} not found")
```
**关键数据结构:**
```python
# Chat 对象包含:
{
"id": str,
"user_id": str,
"chat": {
"messages": [
{
"role": "user|assistant",
"content": str,
"files": [{"type": "file|image", "id": str, "url": str}],
"sources": [{"uri": str, "title": str, ...}]
},
...
{} # 最后一条消息为空(待填充的助手响应)
],
"title": str
}
}
```
**使用场景:**
- 需要访问历史消息以过滤引用标记
- 需要获取用户上传的文件附件列表
- 需要验证当前请求与数据库消息数量是否匹配
- 需要在处理过程中追踪消息上下文
**注意事项:**
- ⚠️ **必须在线程中调用**:这是同步阻塞操作,需要用 `asyncio.to_thread()` 包装
- ⚠️ **返回值可为 None**:聊天不存在时返回 None需要检查
- ⚠️ **消息数量验证**:请求体消息数必须等于数据库消息数,否则可能表示数据不同步
---
### 模块 2`Files` - 文件数据库操作
**功能:** 查询和管理 Open WebUI 文件数据库中的文件元数据。
**核心方法:**
```python
# 查询文件
Files.get_file_by_id(file_id: str) -> FileModel | None
# 创建新文件记录
Files.insert_new_file(user_id: str, file_form: FileForm) -> FileModel | None
# 获取文件 MIME 类型等
FileForm(
id: str,
filename: str,
path: str,
meta: dict # 包含 content_type, size, data 等
)
```
**使用示例:**
```python
# 查询已上传的文件
file_model = await asyncio.to_thread(Files.get_file_by_id, file_id)
if file_model:
# 访问文件元数据
file_path = file_model.path # 磁盘路径或 gs:// 云存储路径
mime_type = file_model.meta.get("content_type") # e.g., "image/png"
file_size = file_model.meta.get("size")
# 读取文件内容
with open(file_path, "rb") as f:
file_bytes = f.read()
# 创建新文件记录(如生成图像后)
file_item = await asyncio.to_thread(
Files.insert_new_file,
user_id,
FileForm(
id=str(uuid.uuid4()),
filename="generated-image.png",
path="/path/to/file",
meta={
"name": "generated-image.png",
"content_type": "image/png",
"size": len(image_bytes),
"data": {
"model": model_name,
"chat_id": chat_id,
"message_id": message_id,
}
}
)
)
```
**关键数据结构:**
```python
class FileModel:
id: str
user_id: str
filename: str
path: str # 本地路径或 gs:// URI
meta: dict # 文件元数据
created_at: datetime
updated_at: datetime
meta = {
"name": str, # 显示名称
"content_type": str, # MIME 类型
"size": int, # 字节数
"data": { # 自定义元数据
"model": str,
"chat_id": str,
"message_id": str,
}
}
```
**使用场景:**
- 获取用户上传文件的实际路径和 MIME 类型
- 读取文件内容以上传到 Google Gemini API
- 记录生成的图像和其他输出文件
- 追踪文件与生成任务的关联关系
**注意事项:**
- ⚠️ **必须在线程中调用**:使用 `asyncio.to_thread()` 包装
- ⚠️ **返回值可为 None**:文件不存在时返回 None
- ⚠️ **路径处理**:可能是本地路径或云存储 URIgs://),读取时需要相应处理
- ⚠️ **元数据字段**`meta["data"]` 是自定义字段,用于存储业务逻辑相关的上下文
---
### 模块 3`Storage` - 文件存储管理
**功能:** 上传和管理文件到 Open WebUI 的存储后端(本地磁盘或云存储如 Google Cloud Storage
**核心方法:**
```python
Storage.upload_file(
file: BinaryIO, # 文件对象
filename: str, # 文件名
tags: dict = {} # 标签
) -> tuple[bytes, str] # 返回 (文件内容, 存储路径)
```
**使用示例:**
```python
import io
import uuid
# 准备图像数据
image_data = generate_image() # 生成的字节数据
image_id = str(uuid.uuid4())
imagename = f"{image_id}_generated-image.png"
image_file = io.BytesIO(image_data)
# 上传到存储后端
try:
contents, storage_path = await asyncio.to_thread(
Storage.upload_file,
image_file,
imagename,
tags={"model": model_name} # 可选标签
)
log.info(f"File uploaded to: {storage_path}")
# storage_path 可能是:
# - 本地: "/data/uploads/uuid_filename.png"
# - 云存储: "gs://bucket/uploads/uuid_filename.png"
except Exception as e:
log.exception("Upload failed")
```
**关键特性:**
```text
存储层次:
├─ 本地存储:/data/uploads/ 下的文件
└─ 云存储gs://bucket/ 下的 GCS 文件
自动处理:
├─ 创建目录
├─ 重命名以避免冲突
├─ 返回可访问的路径
└─ 支持标签分类
```
**使用场景:**
- 上传模型生成的图像
- 存储处理后的文件
- 在数据库记录前持久化文件
**注意事项:**
- ⚠️ **必须在线程中调用**:使用 `asyncio.to_thread()` 包装
- ⚠️ **返回的路径**:取决于配置(本地/云),需要配合 `Files.insert_new_file` 记录
- ⚠️ **文件大小**:确保内存中有足够空间存储文件
- ✓ **与 Files 配合**:通常先 `Storage.upload_file()`,再 `Files.insert_new_file()`
---
### 模块 4`Functions` - 过滤器/插件管理
**功能:** 查询已安装的过滤器Filter的状态和配置用于检测依赖的 Companion Filter。
**核心方法:**
```python
Functions.get_function_by_id(filter_id: str) -> Function | None
# Function 对象属性:
# - id: str
# - name: str
# - is_active: bool # 过滤器在 Functions 仪表板中是否启用
# - is_global: bool # 是否对所有模型全局启用
# - models: list[str] # 该过滤器启用的模型列表
```
**使用示例:**
```python
# 检查 Companion Filter 是否安装并启用
def is_feature_available(filter_id: str, metadata: dict) -> tuple[bool, bool]:
"""
检查功能是否可用。
返回: (is_available, is_toggled_on)
"""
# 1. 检查过滤器是否已安装
f = Functions.get_function_by_id(filter_id)
if not f:
log.warning(f"Filter '{filter_id}' not installed")
return (False, False)
# 2. 检查过滤器在 Functions 仪表板中是否启用
if not f.is_active:
log.warning(f"Filter '{filter_id}' is disabled in Functions dashboard")
return (False, False)
# 3. 检查过滤器是否为当前模型启用
model_id = metadata.get("model", {}).get("id")
model_filters = metadata.get("model", {}).get("info", {}).get("meta", {}).get("filterIds", [])
is_enabled = filter_id in model_filters or f.is_global
if not is_enabled:
log.debug(f"Filter '{filter_id}' not enabled for model '{model_id}'")
return (False, False)
# 4. 检查用户是否在当前请求中启用了该功能
user_toggled = filter_id in metadata.get("filter_ids", [])
return (True, user_toggled)
# 使用
is_available, is_enabled = is_feature_available(
"gemini_manifold_companion_v1.7.0",
metadata
)
if is_available and is_enabled:
log.info("Companion filter available and enabled")
elif is_available:
log.debug("Companion filter available but user disabled it")
else:
log.warning("Companion filter not available")
```
**关键检查流程:**
```text
功能可用性检查链:
1. 安装检查
Functions.get_function_by_id() → None? 返回不可用
2. 启用检查
f.is_active == False? 返回不可用
3. 模型启用检查
filter_id in model_filters or f.is_global?
否则返回不可用
4. 用户切换检查
filter_id in metadata["filter_ids"]?
返回用户是否启用
```
**使用场景:**
- 检测 Companion Filter 是否已安装(用于引用过滤功能)
- 检查 URL Context Tool 或其他高级功能的依赖
- 在日志中区分"功能不可用"和"用户未启用"
- 决定是否执行相关的处理逻辑
**注意事项:**
- ✓ **同步操作**:不需要 `asyncio.to_thread()`
- ⚠️ **返回值可为 None**:未安装的过滤器返回 None
- ✓ **多层检查**:需要逐层检查安装、启用、配置、用户选择
- 💡 **日志级别**根据检查阶段使用不同日志级别warning/debug
---
### 模块 5`pop_system_message` - 消息提取工具
**功能:** 从消息列表中提取和分离系统消息。
**功能签名:**
```python
pop_system_message(
messages: list[Message]
) -> tuple[Message | None, list[Message]]
```
**使用示例:**
```python
# 原始消息列表
messages = [
{
"role": "system",
"content": "You are a helpful assistant..."
},
{
"role": "user",
"content": "What is Python?"
},
{
"role": "assistant",
"content": "Python is a programming language..."
}
]
# 分离系统消息
system_message, remaining_messages = pop_system_message(messages)
# 结果:
# system_message = {"role": "system", "content": "You are a helpful assistant..."}
# remaining_messages = [
# {"role": "user", "content": "What is Python?"},
# {"role": "assistant", "content": "Python is a programming language..."}
# ]
# 提取系统提示文本
system_prompt = (system_message or {}).get("content")
# 检查是否存在系统消息
if system_prompt:
log.debug(f"System prompt found: {system_prompt[:100]}...")
else:
log.debug("No system prompt provided")
```
**工作流程:**
```text
输入消息列表
遍历找第一个 role=="system" 的消息
提取该消息
返回 (提取的消息, 剩余消息列表)
```
**关键特性:**
- 返回元组:`(system_message, remaining_messages)`
- `system_message` 为 None 如果不存在系统消息
- `remaining_messages` 不包含系统消息
- 只提取第一个系统消息(如果有多个,后续的被视为普通消息)
**使用场景:**
- 从 Open WebUI 的请求中提取系统消息
- 将系统消息转换为 `GenerateContentConfig.system_instruction`
- 将其余消息作为对话上下文
**注意事项:**
- ✓ **返回类型安全**:总是返回 2 元组
- ⚠️ **系统消息可为 None**:需要 `(system_message or {})` 防止错误
- ✓ **消息顺序保留**`remaining_messages` 中的消息顺序保持原样
- 💡 **使用场景**:几乎所有 Pipe 都需要这个操作来提取系统提示
---
### 通用使用技巧总结
#### 技巧 1异步上下文中调用同步 API
这些模块的大部分方法都是同步阻塞的,但 Pipe 运行在异步上下文中:
```python
# ❌ 错误:会阻塞事件循环
chat = Chats.get_chat_by_id_and_user_id(chat_id, user_id)
# ✓ 正确:在线程池中运行
chat = await asyncio.to_thread(
Chats.get_chat_by_id_and_user_id,
chat_id,
user_id
)
```
#### 技巧 2链式 None 检查
由于这些 API 经常返回 None使用链式赋值简化代码
```python
# ❌ 冗长
file_model = await asyncio.to_thread(Files.get_file_by_id, file_id)
if file_model is None:
return None
file_path = file_model.path
mime_type = file_model.meta.get("content_type")
# ✓ 简洁
if not (file_model := await asyncio.to_thread(Files.get_file_by_id, file_id)):
return None
file_path = file_model.path
mime_type = file_model.meta.get("content_type")
```
#### 技巧 3错误恢复优先级
不同模块的错误处理优先级:
```python
# 1. 功能检查失败 → 返回默认值,继续
if not (f := Functions.get_function_by_id(filter_id)):
log.warning("Feature not available")
return (False, False)
# 2. 数据库查询失败 → 记录警告,但不中断流程
try:
chat = await asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...)
except Exception as e:
log.exception("Failed to fetch chat history")
chat = None
# 3. 存储操作失败 → 使用 toast 通知用户,并记录错误
try:
path = await asyncio.to_thread(Storage.upload_file, ...)
except Exception as e:
event_emitter.emit_toast("File upload failed", "error")
log.exception("Storage error")
raise
```
#### 技巧 4并发操作优化
多个 API 调用时使用并发:
```python
# ❌ 串行:慢
chat = await asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...)
file = await asyncio.to_thread(Files.get_file_by_id, ...)
filter_info = Functions.get_function_by_id(...)
# ✓ 并发:快
chat, file = await asyncio.gather(
asyncio.to_thread(Chats.get_chat_by_id_and_user_id, ...),
asyncio.to_thread(Files.get_file_by_id, ...),
)
filter_info = Functions.get_function_by_id(...) # 这个本来就是同步的
```
#### 技巧 5日志级别选择
根据严重程度选择日志级别:
```python
# 配置问题(管理员处理)→ warning
if not f.is_active:
log.warning(f"Filter '{filter_id}' disabled in dashboard")
# 正常功能流程(调试用)→ debug
if filter_id not in model_filters:
log.debug(f"Filter not in model list: {filter_id}")
# 数据不一致(可能的 bug→ error
if len(messages_db) != len(messages_body):
log.error("Message count mismatch")
# 检查点(流程追踪)→ info
if is_toggled_on:
log.info(f"Feature '{filter_id}' enabled by user")
```
#### 技巧 6元数据字段扩展
`Files.meta` 中的 `data` 字段是自定义字段,可存储任意上下文:
```python
file_item = await asyncio.to_thread(
Files.insert_new_file,
user_id,
FileForm(
id=id,
filename="output.json",
path=path,
meta={
"name": "output.json",
"content_type": "application/json",
"size": len(contents),
"data": { # 自定义字段,存储业务逻辑上下文
"model": model_name,
"chat_id": chat_id,
"message_id": message_id,
"timestamp": time.time(),
"processing_time": elapsed_ms,
"version": "v1.0",
}
}
)
)
# 后续查询时可以恢复这些信息
if file_model.meta.get("data", {}).get("processing_time"):
log.debug(f"File processed in {file_model.meta['data']['processing_time']}ms")
```
#### 技巧 7条件式功能启用
根据多个条件决定是否启用某项功能:
```python
# 检查引用过滤是否可用
companion_available, companion_enabled = is_feature_available(
"gemini_manifold_companion",
__metadata__
)
# 结合其他条件
can_filter_citations = (
companion_available and # 过滤器已安装
companion_enabled and # 用户启用了该功能
self.messages_db is not None and # 聊天历史可用
len(messages_db) == len(messages) # 消息数量一致
)
if can_filter_citations:
# 执行引用过滤逻辑
...
else:
# 跳过该功能
log.debug("Citation filtering unavailable")
```
---
### 实战代码完整示例
```python
import asyncio
from open_webui.models.chats import Chats
from open_webui.models.files import FileForm, Files
from open_webui.storage.provider import Storage
from open_webui.models.functions import Functions
from open_webui.utils.misc import pop_system_message
class MyPipe:
async def pipe(
self,
body: dict,
__user__: dict,
__request__,
__event_emitter__,
__metadata__: dict,
):
# 1. 提取系统消息
system_message, messages = pop_system_message(body.get("messages", []))
system_prompt = (system_message or {}).get("content")
# 2. 并发获取聊天和过滤器信息
chat_data, filter_status = await asyncio.gather(
asyncio.to_thread(
Chats.get_chat_by_id_and_user_id,
__metadata__.get("chat_id", ""),
__user__["id"]
),
self._check_filter_available("companion_filter_id", __metadata__),
return_exceptions=True
)
# 3. 处理结果
chat = chat_data if not isinstance(chat_data, Exception) else None
is_available, is_enabled = filter_status if not isinstance(filter_status, Exception) else (False, False)
# 4. 条件式处理文件
if chat and is_available:
for message in chat.chat.get("messages", []):
if files := message.get("files", []):
for file_ref in files:
file_model = await asyncio.to_thread(
Files.get_file_by_id,
file_ref.get("id")
)
if file_model:
# 处理文件...
pass
# 5. 返回结果
async for chunk in self._generate_response(messages, system_prompt):
yield chunk
@staticmethod
def _check_filter_available(filter_id: str, metadata: dict) -> tuple[bool, bool]:
f = Functions.get_function_by_id(filter_id)
if not f or not f.is_active:
return (False, False)
is_enabled = filter_id in metadata.get("filter_ids", []) or f.is_global
return (True, is_enabled)
```
> 这些示例可直接集成进团队的插件开发指南或代码模板库,新插件可参考对应场景快速实现相关功能。