From 7085e794a3d3c5a4e18c783ff1b857fef7913f97 Mon Sep 17 00:00:00 2001 From: fujie Date: Sat, 10 Jan 2026 18:47:06 +0800 Subject: [PATCH] Update Async Context Compression to v1.1.1: Add frontend debug logging and optimize token calculation --- .../async_context_compression.py | 375 ++++++++++++----- .../async_context_compression_cn.py | 379 +++++++++++++----- 2 files changed, 542 insertions(+), 212 deletions(-) diff --git a/plugins/filters/async-context-compression/async_context_compression.py b/plugins/filters/async-context-compression/async_context_compression.py index a1db95c..26c13db 100644 --- a/plugins/filters/async-context-compression/async_context_compression.py +++ b/plugins/filters/async-context-compression/async_context_compression.py @@ -5,7 +5,7 @@ author: Fu-Jie author_url: https://github.com/Fu-Jie funding_url: https://github.com/Fu-Jie/awesome-openwebui description: Reduces token consumption in long conversations while maintaining coherence through intelligent summarization and message compression. -version: 1.1.0 +version: 1.1.1 openwebui_id: b1655bc8-6de9-4cad-8cb5-a6f7829a02ce license: MIT @@ -139,6 +139,10 @@ debug_mode Default: true Description: Prints detailed debug information to the log. Recommended to set to `false` in production. +show_debug_log + Default: false + Description: Print debug logs to browser console (F12). Useful for frontend debugging. + 🔧 Deployment ═══════════════════════════════════════════════════════ @@ -355,6 +359,9 @@ class Filter: debug_mode: bool = Field( default=True, description="Enable detailed logging for debugging." ) + show_debug_log: bool = Field( + default=False, description="Print debug logs to browser console (F12)" + ) def _save_summary(self, chat_id: str, summary: str, compressed_count: int): """Saves the summary to the database.""" @@ -516,12 +523,109 @@ class Filter: return message + async def _emit_debug_log( + self, + __event_call__, + chat_id: str, + original_count: int, + compressed_count: int, + summary_length: int, + kept_first: int, + kept_last: int, + ): + """Emit debug log to browser console via JS execution""" + if not self.valves.show_debug_log or not __event_call__: + return + + try: + # Prepare data for JS + log_data = { + "chatId": chat_id, + "originalCount": original_count, + "compressedCount": compressed_count, + "summaryLength": summary_length, + "keptFirst": kept_first, + "keptLast": kept_last, + "ratio": ( + f"{(1 - compressed_count/original_count)*100:.1f}%" + if original_count > 0 + else "0%" + ), + } + + # Construct JS code + js_code = f""" + (async function() {{ + console.group("🗜️ Async Context Compression Debug"); + console.log("Chat ID:", {json.dumps(chat_id)}); + console.log("Messages:", {original_count} + " -> " + {compressed_count}); + console.log("Compression Ratio:", {json.dumps(log_data['ratio'])}); + console.log("Summary Length:", {summary_length} + " chars"); + console.log("Configuration:", {{ + "Keep First": {kept_first}, + "Keep Last": {kept_last} + }}); + console.groupEnd(); + }})(); + """ + + await __event_call__( + { + "type": "execute", + "data": {"code": js_code}, + } + ) + except Exception as e: + print(f"Error emitting debug log: {e}") + + async def _log(self, message: str, type: str = "info", event_call=None): + """Unified logging to both backend (print) and frontend (console.log)""" + # Backend logging + if self.valves.debug_mode: + print(message) + + # Frontend logging + if self.valves.show_debug_log and event_call: + try: + css = "color: #3b82f6;" # Blue default + if type == "error": + css = "color: #ef4444; font-weight: bold;" # Red + elif type == "warning": + css = "color: #f59e0b;" # Orange + elif type == "success": + css = "color: #10b981; font-weight: bold;" # Green + + # Clean message for frontend: remove separators and extra newlines + lines = message.split("\n") + # Keep lines that don't start with lots of equals or hyphens + filtered_lines = [ + line + for line in lines + if not line.strip().startswith("====") + and not line.strip().startswith("----") + ] + clean_message = "\n".join(filtered_lines).strip() + + if not clean_message: + return + + # Escape quotes in message for JS string + safe_message = clean_message.replace('"', '\\"').replace("\n", "\\n") + + js_code = f""" + console.log("%c[Compression] {safe_message}", "{css}"); + """ + await event_call({"type": "execute", "data": {"code": js_code}}) + except Exception as e: + print(f"Failed to emit log to frontend: {e}") + async def inlet( self, body: dict, __user__: Optional[dict] = None, __metadata__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> dict: """ Executed before sending to the LLM. @@ -530,10 +634,11 @@ class Filter: messages = body.get("messages", []) chat_id = __metadata__["chat_id"] - if self.valves.debug_mode: - print(f"\n{'='*60}") - print(f"[Inlet] Chat ID: {chat_id}") - print(f"[Inlet] Received {len(messages)} messages") + if self.valves.debug_mode or self.valves.show_debug_log: + await self._log( + f"\n{'='*60}\n[Inlet] Chat ID: {chat_id}\n[Inlet] Received {len(messages)} messages", + event_call=__event_call__, + ) # Record the target compression progress for the original messages, for use in outlet # Target is to compress up to the (total - keep_last) message @@ -541,17 +646,18 @@ class Filter: # [Optimization] Simple state cleanup check if chat_id in self.temp_state: - if self.valves.debug_mode: - print( - f"[Inlet] ⚠️ Overwriting unconsumed old state (Chat ID: {chat_id})" - ) + await self._log( + f"[Inlet] ⚠️ Overwriting unconsumed old state (Chat ID: {chat_id})", + type="warning", + event_call=__event_call__, + ) self.temp_state[chat_id] = target_compressed_count - if self.valves.debug_mode: - print( - f"[Inlet] Recorded target compression progress: {target_compressed_count}" - ) + await self._log( + f"[Inlet] Recorded target compression progress: {target_compressed_count}", + event_call=__event_call__, + ) # Load summary record summary_record = await asyncio.to_thread(self._load_summary_record, chat_id) @@ -600,19 +706,32 @@ class Filter: } ) - if self.valves.debug_mode: - print( - f"[Inlet] Applied summary: Head({len(head_messages)}) + Summary + Tail({len(tail_messages)})" - ) + await self._log( + f"[Inlet] Applied summary: Head({len(head_messages)}) + Summary + Tail({len(tail_messages)})", + type="success", + event_call=__event_call__, + ) + + # Emit debug log to frontend (Keep the structured log as well) + await self._emit_debug_log( + __event_call__, + chat_id, + len(messages), + len(final_messages), + len(summary_record.summary), + self.valves.keep_first, + self.valves.keep_last, + ) else: # No summary, use original messages final_messages = messages body["messages"] = final_messages - if self.valves.debug_mode: - print(f"[Inlet] Final send: {len(body['messages'])} messages") - print(f"{'='*60}\n") + await self._log( + f"[Inlet] Final send: {len(body['messages'])} messages\n{'='*60}\n", + event_call=__event_call__, + ) return body @@ -622,6 +741,7 @@ class Filter: __user__: Optional[dict] = None, __metadata__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> dict: """ Executed after the LLM response is complete. @@ -630,21 +750,23 @@ class Filter: chat_id = __metadata__["chat_id"] model = body.get("model", "gpt-3.5-turbo") - if self.valves.debug_mode: - print(f"\n{'='*60}") - print(f"[Outlet] Chat ID: {chat_id}") - print(f"[Outlet] Response complete") + if self.valves.debug_mode or self.valves.show_debug_log: + await self._log( + f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] Response complete", + event_call=__event_call__, + ) # Process Token calculation and summary generation asynchronously in the background (do not wait for completion, do not affect output) asyncio.create_task( self._check_and_generate_summary_async( - chat_id, model, body, __user__, __event_emitter__ + chat_id, model, body, __user__, __event_emitter__, __event_call__ ) ) - if self.valves.debug_mode: - print(f"[Outlet] Background processing started") - print(f"{'='*60}\n") + await self._log( + f"[Outlet] Background processing started\n{'='*60}\n", + event_call=__event_call__, + ) return body @@ -655,6 +777,7 @@ class Filter: body: dict, user_data: Optional[dict], __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ): """ Background processing: Calculates Token count and generates summary (does not block response). @@ -668,36 +791,50 @@ class Filter: "compression_threshold_tokens", self.valves.compression_threshold_tokens ) - if self.valves.debug_mode: - print(f"\n[🔍 Background Calculation] Starting Token count...") + await self._log( + f"\n[🔍 Background Calculation] Starting Token count...", + event_call=__event_call__, + ) # Calculate Token count in a background thread current_tokens = await asyncio.to_thread( self._calculate_messages_tokens, messages ) - if self.valves.debug_mode: - print(f"[🔍 Background Calculation] Token count: {current_tokens}") + await self._log( + f"[🔍 Background Calculation] Token count: {current_tokens}", + event_call=__event_call__, + ) # Check if compression is needed if current_tokens >= compression_threshold_tokens: - if self.valves.debug_mode: - print( - f"[🔍 Background Calculation] ⚡ Compression threshold triggered (Token: {current_tokens} >= {compression_threshold_tokens})" - ) + await self._log( + f"[🔍 Background Calculation] ⚡ Compression threshold triggered (Token: {current_tokens} >= {compression_threshold_tokens})", + type="warning", + event_call=__event_call__, + ) # Proceed to generate summary await self._generate_summary_async( - messages, chat_id, body, user_data, __event_emitter__ + messages, + chat_id, + body, + user_data, + __event_emitter__, + __event_call__, ) else: - if self.valves.debug_mode: - print( - f"[🔍 Background Calculation] Compression threshold not reached (Token: {current_tokens} < {compression_threshold_tokens})" - ) + await self._log( + f"[🔍 Background Calculation] Compression threshold not reached (Token: {current_tokens} < {compression_threshold_tokens})", + event_call=__event_call__, + ) except Exception as e: - print(f"[🔍 Background Calculation] ❌ Error: {str(e)}") + await self._log( + f"[🔍 Background Calculation] ❌ Error: {str(e)}", + type="error", + event_call=__event_call__, + ) async def _generate_summary_async( self, @@ -706,6 +843,7 @@ class Filter: body: dict, user_data: Optional[dict], __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ): """ Generates summary asynchronously (runs in background, does not block response). @@ -715,18 +853,20 @@ class Filter: 3. Generate summary for the remaining middle messages. """ try: - if self.valves.debug_mode: - print(f"\n[🤖 Async Summary Task] Starting...") + await self._log( + f"\n[🤖 Async Summary Task] Starting...", event_call=__event_call__ + ) # 1. Get target compression progress # Prioritize getting from temp_state (calculated by inlet). If unavailable (e.g., after restart), assume current is full history. target_compressed_count = self.temp_state.pop(chat_id, None) if target_compressed_count is None: target_compressed_count = max(0, len(messages) - self.valves.keep_last) - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] ⚠️ Could not get inlet state, estimating progress using current message count: {target_compressed_count}" - ) + await self._log( + f"[🤖 Async Summary Task] ⚠️ Could not get inlet state, estimating progress using current message count: {target_compressed_count}", + type="warning", + event_call=__event_call__, + ) # 2. Determine the range of messages to compress (Middle) start_index = self.valves.keep_first @@ -736,18 +876,18 @@ class Filter: # Ensure indices are valid if start_index >= end_index: - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] Middle messages empty (Start: {start_index}, End: {end_index}), skipping" - ) + await self._log( + f"[🤖 Async Summary Task] Middle messages empty (Start: {start_index}, End: {end_index}), skipping", + event_call=__event_call__, + ) return middle_messages = messages[start_index:end_index] - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] Middle messages to process: {len(middle_messages)}" - ) + await self._log( + f"[🤖 Async Summary Task] Middle messages to process: {len(middle_messages)}", + event_call=__event_call__, + ) # 3. Check Token limit and truncate (Max Context Truncation) # [Optimization] Use the summary model's (if any) threshold to decide how many middle messages can be processed @@ -762,22 +902,26 @@ class Filter: "max_context_tokens", self.valves.max_context_tokens ) - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] Using max limit for model {summary_model_id}: {max_context_tokens} Tokens" - ) - - # Calculate current total Tokens (using summary model for counting) - total_tokens = await asyncio.to_thread( - self._calculate_messages_tokens, messages + await self._log( + f"[🤖 Async Summary Task] Using max limit for model {summary_model_id}: {max_context_tokens} Tokens", + event_call=__event_call__, ) - if total_tokens > max_context_tokens: - excess_tokens = total_tokens - max_context_tokens - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] ⚠️ Total Tokens ({total_tokens}) exceed summary model limit ({max_context_tokens}), need to remove approx {excess_tokens} Tokens" - ) + # Calculate tokens for middle messages only (plus buffer for prompt) + # We only send middle_messages to the summary model, so we shouldn't count the full history against its limit. + middle_tokens = await asyncio.to_thread( + self._calculate_messages_tokens, middle_messages + ) + # Add buffer for prompt and output (approx 2000 tokens) + estimated_input_tokens = middle_tokens + 2000 + + if estimated_input_tokens > max_context_tokens: + excess_tokens = estimated_input_tokens - max_context_tokens + await self._log( + f"[🤖 Async Summary Task] ⚠️ Middle messages ({middle_tokens} Tokens) + Buffer exceed summary model limit ({max_context_tokens}), need to remove approx {excess_tokens} Tokens", + type="warning", + event_call=__event_call__, + ) # Remove from the head of middle_messages removed_tokens = 0 @@ -785,20 +929,22 @@ class Filter: while removed_tokens < excess_tokens and middle_messages: msg_to_remove = middle_messages.pop(0) - msg_tokens = self._count_tokens(str(msg_to_remove.get("content", ""))) + msg_tokens = self._count_tokens( + str(msg_to_remove.get("content", "")) + ) removed_tokens += msg_tokens removed_count += 1 - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] Removed {removed_count} messages, totaling {removed_tokens} Tokens" - ) + await self._log( + f"[🤖 Async Summary Task] Removed {removed_count} messages, totaling {removed_tokens} Tokens", + event_call=__event_call__, + ) if not middle_messages: - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] Middle messages empty after truncation, skipping summary generation" - ) + await self._log( + f"[🤖 Async Summary Task] Middle messages empty after truncation, skipping summary generation", + event_call=__event_call__, + ) return # 4. Build conversation text @@ -820,14 +966,14 @@ class Filter: ) new_summary = await self._call_summary_llm( - None, conversation_text, body, user_data + None, conversation_text, body, user_data, __event_call__ ) # 6. Save new summary - if self.valves.debug_mode: - print( - "[Optimization] Saving summary in a background thread to avoid blocking the event loop." - ) + await self._log( + "[Optimization] Saving summary in a background thread to avoid blocking the event loop.", + event_call=__event_call__, + ) await asyncio.to_thread( self._save_summary, chat_id, new_summary, target_compressed_count @@ -845,16 +991,22 @@ class Filter: } ) - if self.valves.debug_mode: - print( - f"[🤖 Async Summary Task] ✅ Complete! New summary length: {len(new_summary)} characters" - ) - print( - f"[🤖 Async Summary Task] Progress update: Compressed up to original message {target_compressed_count}" - ) + await self._log( + f"[🤖 Async Summary Task] ✅ Complete! New summary length: {len(new_summary)} characters", + type="success", + event_call=__event_call__, + ) + await self._log( + f"[🤖 Async Summary Task] Progress update: Compressed up to original message {target_compressed_count}", + event_call=__event_call__, + ) except Exception as e: - print(f"[🤖 Async Summary Task] ❌ Error: {str(e)}") + await self._log( + f"[🤖 Async Summary Task] ❌ Error: {str(e)}", + type="error", + event_call=__event_call__, + ) import traceback traceback.print_exc() @@ -891,12 +1043,15 @@ class Filter: new_conversation_text: str, body: dict, user_data: dict, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> str: """ Calls the LLM to generate a summary using Open WebUI's built-in method. """ - if self.valves.debug_mode: - print(f"[🤖 LLM Call] Using Open WebUI's built-in method") + await self._log( + f"[🤖 LLM Call] Using Open WebUI's built-in method", + event_call=__event_call__, + ) # Build summary prompt (Optimized) summary_prompt = f""" @@ -935,8 +1090,7 @@ Based on the content above, generate the summary: # Determine the model to use model = self.valves.summary_model or body.get("model", "") - if self.valves.debug_mode: - print(f"[🤖 LLM Call] Model: {model}") + await self._log(f"[🤖 LLM Call] Model: {model}", event_call=__event_call__) # Build payload payload = { @@ -954,18 +1108,19 @@ Based on the content above, generate the summary: raise ValueError("Could not get user ID") # [Optimization] Get user object in a background thread to avoid blocking the event loop. - if self.valves.debug_mode: - print( - "[Optimization] Getting user object in a background thread to avoid blocking the event loop." - ) + await self._log( + "[Optimization] Getting user object in a background thread to avoid blocking the event loop.", + event_call=__event_call__, + ) user = await asyncio.to_thread(Users.get_user_by_id, user_id) if not user: raise ValueError(f"Could not find user: {user_id}") - if self.valves.debug_mode: - print(f"[🤖 LLM Call] User: {user.email}") - print(f"[🤖 LLM Call] Sending request...") + await self._log( + f"[🤖 LLM Call] User: {user.email}\n[🤖 LLM Call] Sending request...", + event_call=__event_call__, + ) # Create Request object request = Request(scope={"type": "http", "app": webui_app}) @@ -978,8 +1133,11 @@ Based on the content above, generate the summary: summary = response["choices"][0]["message"]["content"].strip() - if self.valves.debug_mode: - print(f"[🤖 LLM Call] ✅ Successfully received summary") + await self._log( + f"[🤖 LLM Call] ✅ Successfully received summary", + type="success", + event_call=__event_call__, + ) return summary @@ -991,7 +1149,10 @@ Based on the content above, generate the summary: "If this is a pipeline (Pipe) model or an incompatible model, please specify a compatible summary model (e.g., 'gemini-2.5-flash') in the configuration." ) - if self.valves.debug_mode: - print(f"[🤖 LLM Call] ❌ {error_message}") + await self._log( + f"[🤖 LLM Call] ❌ {error_message}", + type="error", + event_call=__event_call__, + ) raise Exception(error_message) diff --git a/plugins/filters/async-context-compression/async_context_compression_cn.py b/plugins/filters/async-context-compression/async_context_compression_cn.py index 8b3e526..8d596ae 100644 --- a/plugins/filters/async-context-compression/async_context_compression_cn.py +++ b/plugins/filters/async-context-compression/async_context_compression_cn.py @@ -5,7 +5,7 @@ author: Fu-Jie author_url: https://github.com/Fu-Jie funding_url: https://github.com/Fu-Jie/awesome-openwebui description: 通过智能摘要和消息压缩,降低长对话的 token 消耗,同时保持对话连贯性。 -version: 1.1.0 +version: 1.1.1 openwebui_id: 5c0617cb-a9e4-4bd6-a440-d276534ebd18 license: MIT @@ -138,6 +138,10 @@ debug_mode (调试模式) 默认: true 说明: 在日志中打印详细的调试信息。生产环境建议设为 `false`。 +show_debug_log (前端调试日志) + 默认: false + 说明: 在浏览器控制台打印调试日志 (F12)。便于前端调试。 + 🔧 部署配置 ═══════════════════════════════════════════════════════ @@ -345,6 +349,9 @@ class Filter: default=0.1, ge=0.0, le=2.0, description="摘要生成的温度参数" ) debug_mode: bool = Field(default=True, description="调试模式,打印详细日志") + show_debug_log: bool = Field( + default=False, description="在浏览器控制台打印调试日志 (F12)" + ) def _save_summary(self, chat_id: str, summary: str, compressed_count: int): """保存摘要到数据库""" @@ -426,9 +433,7 @@ class Filter: # 回退策略:粗略估算 (1 token ≈ 4 chars) return len(text) // 4 - def _calculate_messages_tokens( - self, messages: List[Dict] - ) -> int: + def _calculate_messages_tokens(self, messages: List[Dict]) -> int: """计算消息列表的总 Token 数""" total_tokens = 0 for msg in messages: @@ -502,12 +507,109 @@ class Filter: return message + async def _emit_debug_log( + self, + __event_call__, + chat_id: str, + original_count: int, + compressed_count: int, + summary_length: int, + kept_first: int, + kept_last: int, + ): + """Emit debug log to browser console via JS execution""" + if not self.valves.show_debug_log or not __event_call__: + return + + try: + # Prepare data for JS + log_data = { + "chatId": chat_id, + "originalCount": original_count, + "compressedCount": compressed_count, + "summaryLength": summary_length, + "keptFirst": kept_first, + "keptLast": kept_last, + "ratio": ( + f"{(1 - compressed_count/original_count)*100:.1f}%" + if original_count > 0 + else "0%" + ), + } + + # Construct JS code + js_code = f""" + (async function() {{ + console.group("🗜️ Async Context Compression Debug"); + console.log("Chat ID:", {json.dumps(chat_id)}); + console.log("Messages:", {original_count} + " -> " + {compressed_count}); + console.log("Compression Ratio:", {json.dumps(log_data['ratio'])}); + console.log("Summary Length:", {summary_length} + " chars"); + console.log("Configuration:", {{ + "Keep First": {kept_first}, + "Keep Last": {kept_last} + }}); + console.groupEnd(); + }})(); + """ + + await __event_call__( + { + "type": "execute", + "data": {"code": js_code}, + } + ) + except Exception as e: + print(f"Error emitting debug log: {e}") + + async def _log(self, message: str, type: str = "info", event_call=None): + """统一日志输出到后端 (print) 和前端 (console.log)""" + # 后端日志 + if self.valves.debug_mode: + print(message) + + # 前端日志 + if self.valves.show_debug_log and event_call: + try: + css = "color: #3b82f6;" # 默认蓝色 + if type == "error": + css = "color: #ef4444; font-weight: bold;" # 红色 + elif type == "warning": + css = "color: #f59e0b;" # 橙色 + elif type == "success": + css = "color: #10b981; font-weight: bold;" # 绿色 + + # 清理前端消息:移除分隔符和多余换行 + lines = message.split("\n") + # 保留不以大量等号或连字符开头的行 + filtered_lines = [ + line + for line in lines + if not line.strip().startswith("====") + and not line.strip().startswith("----") + ] + clean_message = "\n".join(filtered_lines).strip() + + if not clean_message: + return + + # 转义消息中的引号和换行符 + safe_message = clean_message.replace('"', '\\"').replace("\n", "\\n") + + js_code = f""" + console.log("%c[压缩] {safe_message}", "{css}"); + """ + await event_call({"type": "execute", "data": {"code": js_code}}) + except Exception as e: + print(f"发送前端日志失败: {e}") + async def inlet( self, body: dict, __user__: Optional[dict] = None, __metadata__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> dict: """ 在发送到 LLM 之前执行 @@ -516,10 +618,11 @@ class Filter: messages = body.get("messages", []) chat_id = __metadata__["chat_id"] - if self.valves.debug_mode: - print(f"\n{'='*60}") - print(f"[Inlet] Chat ID: {chat_id}") - print(f"[Inlet] 收到 {len(messages)} 条消息") + if self.valves.debug_mode or self.valves.show_debug_log: + await self._log( + f"\n{'='*60}\n[Inlet] Chat ID: {chat_id}\n[Inlet] 收到 {len(messages)} 条消息", + event_call=__event_call__, + ) # 记录原始消息的目标压缩进度,供 outlet 使用 # 目标是压缩到倒数第 keep_last 条之前 @@ -527,13 +630,18 @@ class Filter: # [优化] 简单的状态清理检查 if chat_id in self.temp_state: - if self.valves.debug_mode: - print(f"[Inlet] ⚠️ 覆盖未消费的旧状态 (Chat ID: {chat_id})") + await self._log( + f"[Inlet] ⚠️ 覆盖未消费的旧状态 (Chat ID: {chat_id})", + type="warning", + event_call=__event_call__, + ) self.temp_state[chat_id] = target_compressed_count - if self.valves.debug_mode: - print(f"[Inlet] 记录目标压缩进度: {target_compressed_count}") + await self._log( + f"[Inlet] 记录目标压缩进度: {target_compressed_count}", + event_call=__event_call__, + ) # 加载摘要记录 summary_record = await asyncio.to_thread(self._load_summary_record, chat_id) @@ -582,19 +690,32 @@ class Filter: } ) - if self.valves.debug_mode: - print( - f"[Inlet] 应用摘要: Head({len(head_messages)}) + Summary + Tail({len(tail_messages)})" - ) + await self._log( + f"[Inlet] 应用摘要: Head({len(head_messages)}) + Summary + Tail({len(tail_messages)})", + type="success", + event_call=__event_call__, + ) + + # Emit debug log to frontend (Keep the structured log as well) + await self._emit_debug_log( + __event_call__, + chat_id, + len(messages), + len(final_messages), + len(summary_record.summary), + self.valves.keep_first, + self.valves.keep_last, + ) else: # 没有摘要,使用原始消息 final_messages = messages body["messages"] = final_messages - if self.valves.debug_mode: - print(f"[Inlet] 最终发送: {len(body['messages'])} 条消息") - print(f"{'='*60}\n") + await self._log( + f"[Inlet] 最终发送: {len(body['messages'])} 条消息\n{'='*60}\n", + event_call=__event_call__, + ) return body @@ -604,6 +725,7 @@ class Filter: __user__: Optional[dict] = None, __metadata__: dict = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> dict: """ 在 LLM 响应完成后执行 @@ -612,21 +734,23 @@ class Filter: chat_id = __metadata__["chat_id"] model = body.get("model", "gpt-3.5-turbo") - if self.valves.debug_mode: - print(f"\n{'='*60}") - print(f"[Outlet] Chat ID: {chat_id}") - print(f"[Outlet] 响应完成") + if self.valves.debug_mode or self.valves.show_debug_log: + await self._log( + f"\n{'='*60}\n[Outlet] Chat ID: {chat_id}\n[Outlet] 响应完成", + event_call=__event_call__, + ) # 在后台异步处理 Token 计算和摘要生成(不等待完成,不影响输出) asyncio.create_task( self._check_and_generate_summary_async( - chat_id, model, body, __user__, __event_emitter__ + chat_id, model, body, __user__, __event_emitter__, __event_call__ ) ) - if self.valves.debug_mode: - print(f"[Outlet] 后台处理已启动") - print(f"{'='*60}\n") + await self._log( + f"[Outlet] 后台处理已启动\n{'='*60}\n", + event_call=__event_call__, + ) return body @@ -637,6 +761,7 @@ class Filter: body: dict, user_data: Optional[dict], __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ): """ 后台处理:计算 Token 数并生成摘要(不阻塞响应) @@ -650,36 +775,50 @@ class Filter: "compression_threshold_tokens", self.valves.compression_threshold_tokens ) - if self.valves.debug_mode: - print(f"\n[🔍 后台计算] 开始 Token 计数...") + await self._log( + f"\n[🔍 后台计算] 开始 Token 计数...", + event_call=__event_call__, + ) # 在后台线程中计算 Token 数 current_tokens = await asyncio.to_thread( self._calculate_messages_tokens, messages ) - if self.valves.debug_mode: - print(f"[🔍 后台计算] Token 数: {current_tokens}") + await self._log( + f"[🔍 后台计算] Token 数: {current_tokens}", + event_call=__event_call__, + ) # 检查是否需要压缩 if current_tokens >= compression_threshold_tokens: - if self.valves.debug_mode: - print( - f"[🔍 后台计算] ⚡ 触发压缩阈值 (Token: {current_tokens} >= {compression_threshold_tokens})" - ) + await self._log( + f"[🔍 后台计算] ⚡ 触发压缩阈值 (Token: {current_tokens} >= {compression_threshold_tokens})", + type="warning", + event_call=__event_call__, + ) # 继续生成摘要 await self._generate_summary_async( - messages, chat_id, body, user_data, __event_emitter__ + messages, + chat_id, + body, + user_data, + __event_emitter__, + __event_call__, ) else: - if self.valves.debug_mode: - print( - f"[🔍 后台计算] 未触发压缩阈值 (Token: {current_tokens} < {compression_threshold_tokens})" - ) + await self._log( + f"[🔍 后台计算] 未触发压缩阈值 (Token: {current_tokens} < {compression_threshold_tokens})", + event_call=__event_call__, + ) except Exception as e: - print(f"[🔍 后台计算] ❌ 错误: {str(e)}") + await self._log( + f"[🔍 后台计算] ❌ 错误: {str(e)}", + type="error", + event_call=__event_call__, + ) async def _generate_summary_async( self, @@ -688,6 +827,7 @@ class Filter: body: dict, user_data: Optional[dict], __event_emitter__: Callable[[Any], Awaitable[None]] = None, + __event_call__: Callable[[Any], Awaitable[None]] = None, ): """ 异步生成摘要(后台执行,不阻塞响应) @@ -697,18 +837,18 @@ class Filter: 3. 对剩余的中间消息生成摘要。 """ try: - if self.valves.debug_mode: - print(f"\n[🤖 异步摘要任务] 开始...") + await self._log(f"\n[🤖 异步摘要任务] 开始...", event_call=__event_call__) # 1. 获取目标压缩进度 # 优先从 temp_state 获取(由 inlet 计算),如果获取不到(例如重启后),则假设当前是完整历史 target_compressed_count = self.temp_state.pop(chat_id, None) if target_compressed_count is None: target_compressed_count = max(0, len(messages) - self.valves.keep_last) - if self.valves.debug_mode: - print( - f"[🤖 异步摘要任务] ⚠️ 无法获取 inlet 状态,使用当前消息数估算进度: {target_compressed_count}" - ) + await self._log( + f"[🤖 异步摘要任务] ⚠️ 无法获取 inlet 状态,使用当前消息数估算进度: {target_compressed_count}", + type="warning", + event_call=__event_call__, + ) # 2. 确定待压缩的消息范围 (Middle) start_index = self.valves.keep_first @@ -718,16 +858,18 @@ class Filter: # 确保索引有效 if start_index >= end_index: - if self.valves.debug_mode: - print( - f"[🤖 异步摘要任务] 中间消息为空 (Start: {start_index}, End: {end_index}),跳过" - ) + await self._log( + f"[🤖 异步摘要任务] 中间消息为空 (Start: {start_index}, End: {end_index}),跳过", + event_call=__event_call__, + ) return middle_messages = messages[start_index:end_index] - if self.valves.debug_mode: - print(f"[🤖 异步摘要任务] 待处理中间消息: {len(middle_messages)} 条") + await self._log( + f"[🤖 异步摘要任务] 待处理中间消息: {len(middle_messages)} 条", + event_call=__event_call__, + ) # 3. 检查 Token 上限并截断 (Max Context Truncation) # [优化] 使用摘要模型(如果有)的阈值来决定能处理多少中间消息 @@ -740,22 +882,26 @@ class Filter: "max_context_tokens", self.valves.max_context_tokens ) - if self.valves.debug_mode: - print( - f"[🤖 异步摘要任务] 使用模型 {summary_model_id} 的上限: {max_context_tokens} Tokens" - ) - - # 计算当前总 Token (使用摘要模型进行计数) - total_tokens = await asyncio.to_thread( - self._calculate_messages_tokens, messages + await self._log( + f"[🤖 异步摘要任务] 使用模型 {summary_model_id} 的上限: {max_context_tokens} Tokens", + event_call=__event_call__, ) - if total_tokens > max_context_tokens: - excess_tokens = total_tokens - max_context_tokens - if self.valves.debug_mode: - print( - f"[🤖 异步摘要任务] ⚠️ 总 Token ({total_tokens}) 超过摘要模型上限 ({max_context_tokens}),需要移除约 {excess_tokens} Token" - ) + # 计算中间消息的 Token (加上提示词的缓冲) + # 我们只把 middle_messages 发送给摘要模型,所以不应该把完整历史计入限制 + middle_tokens = await asyncio.to_thread( + self._calculate_messages_tokens, middle_messages + ) + # 增加提示词和输出的缓冲 (约 2000 Tokens) + estimated_input_tokens = middle_tokens + 2000 + + if estimated_input_tokens > max_context_tokens: + excess_tokens = estimated_input_tokens - max_context_tokens + await self._log( + f"[🤖 异步摘要任务] ⚠️ 中间消息 ({middle_tokens} Tokens) + 缓冲超过摘要模型上限 ({max_context_tokens}),需要移除约 {excess_tokens} Token", + type="warning", + event_call=__event_call__, + ) # 从 middle_messages 头部开始移除 removed_tokens = 0 @@ -769,14 +915,16 @@ class Filter: removed_tokens += msg_tokens removed_count += 1 - if self.valves.debug_mode: - print( - f"[🤖 异步摘要任务] 已移除 {removed_count} 条消息,共 {removed_tokens} Token" - ) + await self._log( + f"[🤖 异步摘要任务] 已移除 {removed_count} 条消息,共 {removed_tokens} Token", + event_call=__event_call__, + ) if not middle_messages: - if self.valves.debug_mode: - print(f"[🤖 异步摘要任务] 截断后中间消息为空,跳过摘要生成") + await self._log( + f"[🤖 异步摘要任务] 截断后中间消息为空,跳过摘要生成", + event_call=__event_call__, + ) return # 4. 构建对话文本 @@ -798,12 +946,14 @@ class Filter: ) new_summary = await self._call_summary_llm( - None, conversation_text, body, user_data + None, conversation_text, body, user_data, __event_call__ ) # 6. 保存新摘要 - if self.valves.debug_mode: - print("[优化] 正在后台线程中保存摘要,以避免阻塞事件循环。") + await self._log( + "[优化] 在后台线程中保存摘要以避免阻塞事件循环。", + event_call=__event_call__, + ) await asyncio.to_thread( self._save_summary, chat_id, new_summary, target_compressed_count @@ -815,32 +965,40 @@ class Filter: { "type": "status", "data": { - "description": f"上下文摘要已更新 (已压缩 {len(middle_messages)} 条消息)", + "description": f"上下文摘要已更新 (压缩了 {len(middle_messages)} 条消息)", "done": True, }, } ) - if self.valves.debug_mode: - print(f"[🤖 异步摘要任务] ✅ 完成!新摘要长度: {len(new_summary)} 字符") - print( - f"[🤖 异步摘要任务] 进度更新: 已压缩至原始第 {target_compressed_count} 条消息" - ) + await self._log( + f"[🤖 异步摘要任务] ✅ 完成!新摘要长度: {len(new_summary)} 字符", + type="success", + event_call=__event_call__, + ) + await self._log( + f"[🤖 异步摘要任务] 进度更新: 已压缩至原始消息 {target_compressed_count}", + event_call=__event_call__, + ) except Exception as e: - print(f"[🤖 异步摘要任务] ❌ 错误: {str(e)}") + await self._log( + f"[🤖 异步摘要任务] ❌ 错误: {str(e)}", + type="error", + event_call=__event_call__, + ) import traceback traceback.print_exc() def _format_messages_for_summary(self, messages: list) -> str: - """格式化消息用于摘要""" + """Formats messages for summarization.""" formatted = [] for i, msg in enumerate(messages, 1): role = msg.get("role", "unknown") content = msg.get("content", "") - # 处理多模态内容 + # Handle multimodal content if isinstance(content, list): text_parts = [] for part in content: @@ -848,10 +1006,10 @@ class Filter: text_parts.append(part.get("text", "")) content = " ".join(text_parts) - # 处理角色名称 - role_name = {"user": "用户", "assistant": "助手"}.get(role, role) + # Handle role name + role_name = {"user": "User", "assistant": "Assistant"}.get(role, role) - # 限制每条消息的长度,避免过长 + # Limit length of each message to avoid excessive length if len(content) > 500: content = content[:500] + "..." @@ -865,12 +1023,15 @@ class Filter: new_conversation_text: str, body: dict, user_data: dict, + __event_call__: Callable[[Any], Awaitable[None]] = None, ) -> str: """ - 使用 Open WebUI 内置方法调用 LLM 生成摘要 + 调用 LLM 生成摘要,使用 Open Web UI 的内置方法。 """ - if self.valves.debug_mode: - print(f"[🤖 LLM 调用] 使用 Open WebUI 内置方法") + await self._log( + f"[🤖 LLM 调用] 使用 Open Web UI 内置方法", + event_call=__event_call__, + ) # 构建摘要提示词 (优化版) summary_prompt = f""" @@ -909,8 +1070,7 @@ class Filter: # 确定使用的模型 model = self.valves.summary_model or body.get("model", "") - if self.valves.debug_mode: - print(f"[🤖 LLM 调用] 模型: {model}") + await self._log(f"[🤖 LLM 调用] 模型: {model}", event_call=__event_call__) # 构建 payload payload = { @@ -927,17 +1087,20 @@ class Filter: if not user_id: raise ValueError("无法获取用户 ID") - # [优化] 在后台线程中获取用户对象,以避免阻塞事件循环 - if self.valves.debug_mode: - print("[优化] 正在后台线程中获取用户对象,以避免阻塞事件循环。") + # [优化] 在后台线程中获取用户对象以避免阻塞事件循环 + await self._log( + "[优化] 在后台线程中获取用户对象以避免阻塞事件循环。", + event_call=__event_call__, + ) user = await asyncio.to_thread(Users.get_user_by_id, user_id) if not user: raise ValueError(f"无法找到用户: {user_id}") - if self.valves.debug_mode: - print(f"[🤖 LLM 调用] 用户: {user.email}") - print(f"[🤖 LLM 调用] 发送请求...") + await self._log( + f"[🤖 LLM 调用] 用户: {user.email}\n[🤖 LLM 调用] 发送请求...", + event_call=__event_call__, + ) # 创建 Request 对象 request = Request(scope={"type": "http", "app": webui_app}) @@ -950,8 +1113,11 @@ class Filter: summary = response["choices"][0]["message"]["content"].strip() - if self.valves.debug_mode: - print(f"[🤖 LLM 调用] ✅ 成功获取摘要") + await self._log( + f"[🤖 LLM 调用] ✅ 成功接收摘要", + type="success", + event_call=__event_call__, + ) return summary @@ -959,11 +1125,14 @@ class Filter: error_message = f"调用 LLM ({model}) 生成摘要时发生错误: {str(e)}" if not self.valves.summary_model: error_message += ( - "\n[提示] 您没有指定摘要模型 (summary_model),因此尝试使用当前对话的模型。" - "如果这是一个流水线(Pipe)模型或不兼容的模型,请在配置中指定一个兼容的摘要模型(如 'gemini-2.5-flash')。" + "\n[提示] 您未指定 summary_model,因此过滤器尝试使用当前对话的模型。" + "如果这是流水线 (Pipe) 模型或不兼容的模型,请在配置中指定兼容的摘要模型 (例如 'gemini-2.5-flash')。" ) - if self.valves.debug_mode: - print(f"[🤖 LLM 调用] ❌ {error_message}") + await self._log( + f"[🤖 LLM 调用] ❌ {error_message}", + type="error", + event_call=__event_call__, + ) raise Exception(error_message)