diff --git a/docs/plugins/filters/async-context-compression.md b/docs/plugins/filters/async-context-compression.md index 7c93faf..0447d91 100644 --- a/docs/plugins/filters/async-context-compression.md +++ b/docs/plugins/filters/async-context-compression.md @@ -1,7 +1,7 @@ # Async Context Compression Filter -v1.1.2 +v1.1.3 Reduces token consumption in long conversations through intelligent summarization while maintaining conversational coherence. @@ -32,6 +32,8 @@ This is especially useful for: - :material-console: **Frontend Debugging**: Debug logs in browser console - :material-alert-circle-check: **Enhanced Error Reporting**: Clear error status notifications - :material-check-all: **Open WebUI v0.7.x Compatibility**: Dynamic DB session handling +- :material-account-convert: **Improved Compatibility**: Summary role changed to `assistant` +- :material-shield-check: **Enhanced Stability**: Resolved race conditions in state management --- diff --git a/docs/plugins/filters/async-context-compression.zh.md b/docs/plugins/filters/async-context-compression.zh.md index 2e20996..d7b7fe4 100644 --- a/docs/plugins/filters/async-context-compression.zh.md +++ b/docs/plugins/filters/async-context-compression.zh.md @@ -1,7 +1,7 @@ # Async Context Compression(异步上下文压缩) Filter -v1.1.2 +v1.1.3 通过智能摘要减少长对话的 token 消耗,同时保持对话连贯。 @@ -32,6 +32,8 @@ Async Context Compression 过滤器通过以下方式帮助管理长对话的 to - :material-console: **前端调试**:支持浏览器控制台日志 - :material-alert-circle-check: **增强错误报告**:清晰的错误状态通知 - :material-check-all: **Open WebUI v0.7.x 兼容性**:动态数据库会话处理 +- :material-account-convert: **兼容性提升**:摘要角色改为 `assistant` +- :material-shield-check: **稳定性增强**:解决状态管理竞态条件 --- diff --git a/docs/plugins/filters/index.md b/docs/plugins/filters/index.md index 0eec3de..5d125a9 100644 --- a/docs/plugins/filters/index.md +++ b/docs/plugins/filters/index.md @@ -22,7 +22,7 @@ Filters act as middleware in the message pipeline: Reduces token consumption in long conversations through intelligent summarization while maintaining coherence. - **Version:** 1.1.2 + **Version:** 1.1.3 [:octicons-arrow-right-24: Documentation](async-context-compression.md) diff --git a/docs/plugins/filters/index.zh.md b/docs/plugins/filters/index.zh.md index a8c75a2..70d3ff7 100644 --- a/docs/plugins/filters/index.zh.md +++ b/docs/plugins/filters/index.zh.md @@ -22,7 +22,7 @@ Filter 充当消息管线中的中间件: 通过智能总结减少长对话的 token 消耗,同时保持连贯性。 - **版本:** 1.1.0 + **版本:** 1.1.3 [:octicons-arrow-right-24: 查看文档](async-context-compression.md) diff --git a/plugins/filters/async-context-compression/README.md b/plugins/filters/async-context-compression/README.md index 88d0607..18db77b 100644 --- a/plugins/filters/async-context-compression/README.md +++ b/plugins/filters/async-context-compression/README.md @@ -1,9 +1,14 @@ # Async Context Compression Filter -**Author:** [Fu-Jie](https://github.com/Fu-Jie) | **Version:** 1.1.2 | **License:** MIT +**Author:** [Fu-Jie](https://github.com/Fu-Jie) | **Version:** 1.1.3 | **License:** MIT This filter reduces token consumption in long conversations through intelligent summarization and message compression while keeping conversations coherent. +## What's new in 1.1.3 +- **Improved Compatibility**: Changed summary injection role from `user` to `assistant` for better compatibility across different LLMs. +- **Enhanced Stability**: Fixed a race condition in state management that could cause "inlet state not found" warnings in high-concurrency scenarios. +- **Bug Fixes**: Corrected default model handling to prevent misleading logs when no model is specified. + ## What's new in 1.1.2 - **Open WebUI v0.7.x Compatibility**: Resolved a critical database session binding error affecting Open WebUI v0.7.x users. The plugin now dynamically discovers the database engine and session context, ensuring compatibility across versions. @@ -15,12 +20,7 @@ This filter reduces token consumption in long conversations through intelligent - **Frontend Debugging**: Added `show_debug_log` option to print debug info to the browser console (F12). - **Optimized Compression**: Improved token calculation logic to prevent aggressive truncation of history, ensuring more context is retained. -## What's new in 1.1.0 -- Reuses Open WebUI's shared database connection by default (no custom engine or env vars required). -- Token-based thresholds (`compression_threshold_tokens`, `max_context_tokens`) for safer long-context handling. -- Per-model overrides via `model_thresholds` for mixed-model workflows. -- Documentation now mirrors the latest async workflow and retention-first injection. --- diff --git a/plugins/filters/async-context-compression/README_CN.md b/plugins/filters/async-context-compression/README_CN.md index 58875ae..aadb7e2 100644 --- a/plugins/filters/async-context-compression/README_CN.md +++ b/plugins/filters/async-context-compression/README_CN.md @@ -1,11 +1,16 @@ # 异步上下文压缩过滤器 -**作者:** [Fu-Jie](https://github.com/Fu-Jie) | **版本:** 1.1.2 | **许可证:** MIT +**作者:** [Fu-Jie](https://github.com/Fu-Jie) | **版本:** 1.1.3 | **许可证:** MIT > **重要提示**:为了确保所有过滤器的可维护性和易用性,每个过滤器都应附带清晰、完整的文档,以确保其功能、配置和使用方法得到充分说明。 本过滤器通过智能摘要和消息压缩技术,在保持对话连贯性的同时,显著降低长对话的 Token 消耗。 +## 1.1.3 版本更新 +- **兼容性提升**: 将摘要注入角色从 `user` 改为 `assistant`,以提高在不同 LLM 之间的兼容性。 +- **稳定性增强**: 修复了状态管理中的竞态条件,解决了高并发场景下可能出现的“无法获取 inlet 状态”警告。 +- **Bug 修复**: 修正了默认模型处理逻辑,防止在未指定模型时产生误导性日志。 + ## 1.1.2 版本更新 - **Open WebUI v0.7.x 兼容性**: 修复了影响 Open WebUI v0.7.x 用户的严重数据库会话绑定错误。插件现在动态发现数据库引擎和会话上下文,确保跨版本兼容性。 @@ -17,12 +22,7 @@ - **前端调试**: 新增 `show_debug_log` 选项,支持在浏览器控制台 (F12) 打印调试信息。 - **压缩优化**: 优化 Token 计算逻辑,防止历史记录被过度截断,保留更多上下文。 -## 1.1.0 版本更新 -- 默认复用 OpenWebUI 内置数据库连接,无需自建引擎、无需配置 `DATABASE_URL`。 -- 基于 Token 的阈值控制(`compression_threshold_tokens`、`max_context_tokens`),长上下文更安全。 -- 支持 `model_thresholds` 为不同模型设置专属阈值,适合混用多模型场景。 -- 文档同步最新异步工作流与“先保留再注入”策略。 --- diff --git a/plugins/filters/async-context-compression/async_context_compression.py b/plugins/filters/async-context-compression/async_context_compression.py index 09355db..2145073 100644 --- a/plugins/filters/async-context-compression/async_context_compression.py +++ b/plugins/filters/async-context-compression/async_context_compression.py @@ -5,7 +5,7 @@ author: Fu-Jie author_url: https://github.com/Fu-Jie funding_url: https://github.com/Fu-Jie/awesome-openwebui description: Reduces token consumption in long conversations while maintaining coherence through intelligent summarization and message compression. -version: 1.1.2 +version: 1.1.3 openwebui_id: b1655bc8-6de9-4cad-8cb5-a6f7829a02ce license: MIT @@ -370,7 +370,10 @@ class Filter: self.valves = self.Valves() self._owui_db = owui_db self._db_engine = owui_engine - self.temp_state = {} # Used to pass temporary data between inlet and outlet + self._db_engine = owui_engine + self._fallback_session_factory = ( + sessionmaker(bind=self._db_engine) if self._db_engine else None + ) self._fallback_session_factory = ( sessionmaker(bind=self._db_engine) if self._db_engine else None ) @@ -638,42 +641,6 @@ class Filter: return "" - def _inject_summary_to_first_message(self, message: dict, summary: str) -> dict: - """Injects the summary into the first message (prepended to content).""" - content = message.get("content", "") - summary_block = f"【Historical Conversation Summary】\n{summary}\n\n---\nBelow is the recent conversation:\n\n" - - # Handle different content types - if isinstance(content, list): # Multimodal content - # Find the first text part and insert the summary before it - new_content = [] - summary_inserted = False - - for part in content: - if ( - isinstance(part, dict) - and part.get("type") == "text" - and not summary_inserted - ): - # Prepend summary to the first text part - new_content.append( - {"type": "text", "text": summary_block + part.get("text", "")} - ) - summary_inserted = True - else: - new_content.append(part) - - # If no text part, insert at the beginning - if not summary_inserted: - new_content.insert(0, {"type": "text", "text": summary_block}) - - message["content"] = new_content - - elif isinstance(content, str): # Plain text - message["content"] = summary_block + content - - return message - async def _emit_debug_log( self, __event_call__, @@ -803,15 +770,9 @@ class Filter: # Target is to compress up to the (total - keep_last) message target_compressed_count = max(0, len(messages) - self.valves.keep_last) - # [Optimization] Simple state cleanup check - if chat_id in self.temp_state: - await self._log( - f"[Inlet] ⚠️ Overwriting unconsumed old state (Chat ID: {chat_id})", - type="warning", - event_call=__event_call__, - ) - - self.temp_state[chat_id] = target_compressed_count + # Record the target compression progress for the original messages, for use in outlet + # Target is to compress up to the (total - keep_last) message + target_compressed_count = max(0, len(messages) - self.valves.keep_last) await self._log( f"[Inlet] Recorded target compression progress: {target_compressed_count}", @@ -844,7 +805,7 @@ class Filter: f"---\n" f"Below is the recent conversation:" ) - summary_msg = {"role": "user", "content": summary_content} + summary_msg = {"role": "assistant", "content": summary_content} # 3. Tail messages (Tail) - All messages starting from the last compression point # Note: Must ensure head messages are not duplicated @@ -914,18 +875,29 @@ class Filter: event_call=__event_call__, ) return body - model = body.get("model", "gpt-3.5-turbo") + model = body.get("model") or "" + + # Calculate target compression progress directly + # Assuming body['messages'] in outlet contains the full history (including new response) + messages = body.get("messages", []) + target_compressed_count = max(0, len(messages) - self.valves.keep_last) if self.valves.debug_mode or self.valves.show_debug_log: await self._log( - f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] Response complete", + f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] Response complete\n[Outlet] Calculated target compression progress: {target_compressed_count} (Messages: {len(messages)})", event_call=__event_call__, ) # Process Token calculation and summary generation asynchronously in the background (do not wait for completion, do not affect output) asyncio.create_task( self._check_and_generate_summary_async( - chat_id, model, body, __user__, __event_emitter__, __event_call__ + chat_id, + model, + body, + __user__, + target_compressed_count, + __event_emitter__, + __event_call__, ) ) @@ -942,6 +914,7 @@ class Filter: model: str, body: dict, user_data: Optional[dict], + target_compressed_count: Optional[int], __event_emitter__: Callable[[Any], Awaitable[None]] = None, __event_call__: Callable[[Any], Awaitable[None]] = None, ): @@ -986,6 +959,7 @@ class Filter: chat_id, body, user_data, + target_compressed_count, __event_emitter__, __event_call__, ) @@ -1015,6 +989,7 @@ class Filter: chat_id: str, body: dict, user_data: Optional[dict], + target_compressed_count: Optional[int], __event_emitter__: Callable[[Any], Awaitable[None]] = None, __event_call__: Callable[[Any], Awaitable[None]] = None, ): @@ -1031,12 +1006,11 @@ class Filter: ) # 1. Get target compression progress - # Prioritize getting from temp_state (calculated by inlet). If unavailable (e.g., after restart), assume current is full history. - target_compressed_count = self.temp_state.pop(chat_id, None) + # If target_compressed_count is not passed (should not happen with new logic), estimate it if target_compressed_count is None: target_compressed_count = max(0, len(messages) - self.valves.keep_last) await self._log( - f"[🤖 Async Summary Task] ⚠️ Could not get inlet state, estimating progress using current message count: {target_compressed_count}", + f"[🤖 Async Summary Task] ⚠️ target_compressed_count is None, estimating: {target_compressed_count}", type="warning", event_call=__event_call__, ) diff --git a/plugins/filters/async-context-compression/async_context_compression_cn.py b/plugins/filters/async-context-compression/async_context_compression_cn.py index 214c504..ce2111c 100644 --- a/plugins/filters/async-context-compression/async_context_compression_cn.py +++ b/plugins/filters/async-context-compression/async_context_compression_cn.py @@ -5,7 +5,7 @@ author: Fu-Jie author_url: https://github.com/Fu-Jie funding_url: https://github.com/Fu-Jie/awesome-openwebui description: 通过智能摘要和消息压缩,降低长对话的 token 消耗,同时保持对话连贯性。 -version: 1.1.2 +version: 1.1.3 openwebui_id: 5c0617cb-a9e4-4bd6-a440-d276534ebd18 license: MIT @@ -290,7 +290,8 @@ class Filter: self.valves = self.Valves() self._db_engine = owui_engine self._SessionLocal = owui_Session - self.temp_state = {} # 用于在 inlet 和 outlet 之间传递临时数据 + self._SessionLocal = owui_Session + self._init_database() self._init_database() def _init_database(self): @@ -471,42 +472,6 @@ class Filter: "max_context_tokens": self.valves.max_context_tokens, } - def _inject_summary_to_first_message(self, message: dict, summary: str) -> dict: - """将摘要注入到第一条消息中(追加到内容前面)""" - content = message.get("content", "") - summary_block = f"【历史对话摘要】\n{summary}\n\n---\n以下是最近的对话:\n\n" - - # 处理不同内容类型 - if isinstance(content, list): # 多模态内容 - # 查找第一个文本部分并在其前面插入摘要 - new_content = [] - summary_inserted = False - - for part in content: - if ( - isinstance(part, dict) - and part.get("type") == "text" - and not summary_inserted - ): - # 在第一个文本部分前插入摘要 - new_content.append( - {"type": "text", "text": summary_block + part.get("text", "")} - ) - summary_inserted = True - else: - new_content.append(part) - - # 如果没有文本部分,在开头插入 - if not summary_inserted: - new_content.insert(0, {"type": "text", "text": summary_block}) - - message["content"] = new_content - - elif isinstance(content, str): # 纯文本 - message["content"] = summary_block + content - - return message - async def _emit_debug_log( self, __event_call__, @@ -628,15 +593,9 @@ class Filter: # 目标是压缩到倒数第 keep_last 条之前 target_compressed_count = max(0, len(messages) - self.valves.keep_last) - # [优化] 简单的状态清理检查 - if chat_id in self.temp_state: - await self._log( - f"[Inlet] ⚠️ 覆盖未消费的旧状态 (Chat ID: {chat_id})", - type="warning", - event_call=__event_call__, - ) - - self.temp_state[chat_id] = target_compressed_count + # 记录原始消息的目标压缩进度,供 outlet 使用 + # 目标是压缩到倒数第 keep_last 条之前 + target_compressed_count = max(0, len(messages) - self.valves.keep_last) await self._log( f"[Inlet] 记录目标压缩进度: {target_compressed_count}", @@ -669,7 +628,7 @@ class Filter: f"---\n" f"以下是最近的对话:" ) - summary_msg = {"role": "user", "content": summary_content} + summary_msg = {"role": "assistant", "content": summary_content} # 3. 尾部消息 (Tail) - 从上次压缩点开始的所有消息 # 注意:这里必须确保不重复包含头部消息 @@ -732,18 +691,29 @@ class Filter: 在后台计算 Token 数并触发摘要生成(不阻塞当前响应,不影响内容输出) """ chat_id = __metadata__["chat_id"] - model = body.get("model", "gpt-3.5-turbo") + model = body.get("model") or "" + + # 直接计算目标压缩进度 + # 假设 outlet 中的 body['messages'] 包含完整历史(包括新响应) + messages = body.get("messages", []) + target_compressed_count = max(0, len(messages) - self.valves.keep_last) if self.valves.debug_mode or self.valves.show_debug_log: await self._log( - f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] 响应完成", + f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] 响应完成\n[Outlet] 计算目标压缩进度: {target_compressed_count} (消息数: {len(messages)})", event_call=__event_call__, ) # 在后台异步处理 Token 计算和摘要生成(不等待完成,不影响输出) asyncio.create_task( self._check_and_generate_summary_async( - chat_id, model, body, __user__, __event_emitter__, __event_call__ + chat_id, + model, + body, + __user__, + target_compressed_count, + __event_emitter__, + __event_call__, ) ) @@ -760,6 +730,7 @@ class Filter: model: str, body: dict, user_data: Optional[dict], + target_compressed_count: Optional[int], __event_emitter__: Callable[[Any], Awaitable[None]] = None, __event_call__: Callable[[Any], Awaitable[None]] = None, ): @@ -804,6 +775,7 @@ class Filter: chat_id, body, user_data, + target_compressed_count, __event_emitter__, __event_call__, ) @@ -833,6 +805,7 @@ class Filter: chat_id: str, body: dict, user_data: Optional[dict], + target_compressed_count: Optional[int], __event_emitter__: Callable[[Any], Awaitable[None]] = None, __event_call__: Callable[[Any], Awaitable[None]] = None, ): @@ -847,12 +820,11 @@ class Filter: await self._log(f"\n[🤖 异步摘要任务] 开始...", event_call=__event_call__) # 1. 获取目标压缩进度 - # 优先从 temp_state 获取(由 inlet 计算),如果获取不到(例如重启后),则假设当前是完整历史 - target_compressed_count = self.temp_state.pop(chat_id, None) + # 如果未传递 target_compressed_count(新逻辑下不应发生),则进行估算 if target_compressed_count is None: target_compressed_count = max(0, len(messages) - self.valves.keep_last) await self._log( - f"[🤖 异步摘要任务] ⚠️ 无法获取 inlet 状态,使用当前消息数估算进度: {target_compressed_count}", + f"[🤖 异步摘要任务] ⚠️ target_compressed_count 为 None,进行估算: {target_compressed_count}", type="warning", event_call=__event_call__, )