From ac50cd249a21da9ad36721d71ae6d2b5bab9e671 Mon Sep 17 00:00:00 2001 From: fujie Date: Tue, 27 Jan 2026 04:22:36 +0800 Subject: [PATCH] fix(pipes): sync copilot sdk thinking - Fix thinking visibility by passing user overrides into streaming - Harden UserValves handling for mapping/instance inputs - Update bilingual README with per-user valves and troubleshooting --- .github/copilot-instructions.md | 13 + .github/workflows/release.yml | 46 + plugins/pipes/github-copilot-sdk/README.md | 55 +- plugins/pipes/github-copilot-sdk/README_CN.md | 55 +- .../github-copilot-sdk/github_copilot_sdk.py | 1279 ++++++++++++--- .../github_copilot_sdk_cn.py | 1427 +++++++++++++---- scripts/openwebui_community_client.py | 27 +- 7 files changed, 2298 insertions(+), 604 deletions(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 9ad7f08..4aa095c 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -789,6 +789,19 @@ Filter 实例是**单例 (Singleton)**。 --- +## 🧪 测试规范 (Testing Standards) + +### 1. Copilot SDK 测试模型 (Copilot SDK Test Models) + +在编写 Copilot SDK 相关的测试脚本时 (如 `test_injection.py`, `test_capabilities.py` 等),**必须**优先使用以下免费/低成本模型之一,严禁使用高昂费用的模型进行常规测试,除非用户明确要求: + +- `gpt-5-mini` (首选 / Preferred) +- `gpt-4.1` + +此规则适用于所有自动化测试脚本和临时验证脚本。 + +--- + ## 🔄 工作流与流程 (Workflow & Process) ### 1. ✅ 开发检查清单 (Development Checklist) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 71b5d1a..05169f7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -246,6 +246,52 @@ jobs: echo "=== Collected Files ===" find release_plugins -name "*.py" -type f | head -20 + - name: Update plugin icon URLs + run: | + echo "Updating icon_url in plugins to use absolute GitHub URLs..." + # Base URL for raw content using the release tag + REPO_URL="https://raw.githubusercontent.com/${{ github.repository }}/${{ steps.version.outputs.version }}" + + find release_plugins -name "*.py" | while read -r file; do + # $file is like release_plugins/plugins/actions/infographic/infographic.py + # Remove release_plugins/ prefix to get the path in the repo + src_file="${file#release_plugins/}" + src_dir=$(dirname "$src_file") + base_name=$(basename "$src_file" .py) + + # Check if a corresponding png exists in the source repository + png_file="${src_dir}/${base_name}.png" + + if [ -f "$png_file" ]; then + echo "Found icon for $src_file: $png_file" + TARGET_ICON_URL="${REPO_URL}/${png_file}" + + # Use python for safe replacement + python3 -c " + import sys + import re + + file_path = '$file' + icon_url = '$TARGET_ICON_URL' + + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Replace icon_url: ... with new url + # Matches 'icon_url: ...' and replaces it + new_content = re.sub(r'^icon_url:.*$', f'icon_url: {icon_url}', content, flags=re.MULTILINE) + + with open(file_path, 'w', encoding='utf-8') as f: + f.write(new_content) + print(f'Successfully updated icon_url in {file_path}') + except Exception as e: + print(f'Error updating {file_path}: {e}', file=sys.stderr) + sys.exit(1) + " + fi + done + - name: Debug Filenames run: | python3 -c "import sys; print(f'Filesystem encoding: {sys.getfilesystemencoding()}')" diff --git a/plugins/pipes/github-copilot-sdk/README.md b/plugins/pipes/github-copilot-sdk/README.md index 44ade3b..2a1f815 100644 --- a/plugins/pipes/github-copilot-sdk/README.md +++ b/plugins/pipes/github-copilot-sdk/README.md @@ -1,26 +1,25 @@ # GitHub Copilot SDK Pipe for OpenWebUI -**Author:** [Fu-Jie](https://github.com/Fu-Jie/awesome-openwebui) | **Version:** 0.1.1 | **Project:** [Awesome OpenWebUI](https://github.com/Fu-Jie/awesome-openwebui) | **License:** MIT +**Author:** [Fu-Jie](https://github.com/Fu-Jie/awesome-openwebui) | **Version:** 0.2.3 | **Project:** [Awesome OpenWebUI](https://github.com/Fu-Jie/awesome-openwebui) | **License:** MIT This is an advanced Pipe function for [OpenWebUI](https://github.com/open-webui/open-webui) that allows you to use GitHub Copilot models (such as `gpt-5`, `gpt-5-mini`, `claude-sonnet-4.5`) directly within OpenWebUI. It is built upon the official [GitHub Copilot SDK for Python](https://github.com/github/copilot-sdk), providing a native integration experience. -## 🚀 What's New (v0.1.1) +## 🚀 What's New (v0.2.3) -* **♾️ Infinite Sessions**: Automatic context compaction for long-running conversations. No more context limit errors! -* **🧠 Thinking Process**: Real-time display of model reasoning/thinking process (for supported models). -* **📂 Workspace Control**: Restricted workspace directory for secure file operations. -* **🔍 Model Filtering**: Exclude specific models using keywords (e.g., `codex`, `haiku`). -* **💾 Session Persistence**: Improved session resume logic using OpenWebUI chat ID mapping. +* **🧩 Per-user Overrides**: Added user-level overrides for `REASONING_EFFORT`, `CLI_PATH`, `DEBUG`, `SHOW_THINKING`, and `MODEL_ID`. +* **🧠 Thinking Output Reliability**: Thinking visibility now respects the user setting and is correctly passed into streaming. +* **📝 Formatting Enforcement**: Added automatic formatting hints to ensure outputs are well-structured (paragraphs, lists) and addressed "tight output" issues. ## ✨ Core Features * **🚀 Official SDK Integration**: Built on the official SDK for stability and reliability. +* **🛠️ Custom Tools Support**: Example tools included (random number). Easy to extend with your own tools. * **💬 Multi-turn Conversation**: Automatically concatenates history context so Copilot understands your previous messages. * **🌊 Streaming Output**: Supports typewriter effect for fast responses. * **🖼️ Multimodal Support**: Supports image uploads, automatically converting them to attachments for Copilot (requires model support). * **🛠️ Zero-config Installation**: Automatically detects and downloads the GitHub Copilot CLI, ready to use out of the box. * **🔑 Secure Authentication**: Supports Fine-grained Personal Access Tokens for minimized permissions. -* **🐛 Debug Mode**: Built-in detailed log output for easy connection troubleshooting. +* **🐛 Debug Mode**: Built-in detailed log output (browser console) for easy troubleshooting. * **⚠️ Single Node Only**: Due to local session storage, this plugin currently supports single-node OpenWebUI deployment or multi-node with sticky sessions enabled. ## 📦 Installation & Usage @@ -42,16 +41,48 @@ Find "GitHub Copilot" in the function list and click the **⚙️ (Valves)** ico | **GH_TOKEN** | **(Required)** Your GitHub Token. | - | | **MODEL_ID** | The model name to use. | `gpt-5-mini` | | **CLI_PATH** | Path to the Copilot CLI. Will download automatically if not found. | `/usr/local/bin/copilot` | -| **DEBUG** | Whether to enable debug logs (output to chat). | `True` | -| **SHOW_THINKING** | Show model reasoning/thinking process. | `True` | +| **DEBUG** | Whether to enable debug logs (output to browser console). | `False` | +| **LOG_LEVEL** | Copilot CLI log level: none, error, warning, info, debug, all. | `error` | +| **SHOW_THINKING** | Show model reasoning/thinking process (requires streaming + model support). | `True` | +| **SHOW_WORKSPACE_INFO** | Show session workspace path and summary in debug mode. | `True` | | **EXCLUDE_KEYWORDS** | Exclude models containing these keywords (comma separated). | - | | **WORKSPACE_DIR** | Restricted workspace directory for file operations. | - | | **INFINITE_SESSION** | Enable Infinite Sessions (automatic context compaction). | `True` | | **COMPACTION_THRESHOLD** | Background compaction threshold (0.0-1.0). | `0.8` | | **BUFFER_THRESHOLD** | Buffer exhaustion threshold (0.0-1.0). | `0.95` | | **TIMEOUT** | Timeout for each stream chunk (seconds). | `300` | +| **CUSTOM_ENV_VARS** | Custom environment variables (JSON format). | - | +| **REASONING_EFFORT** | Reasoning effort level: low, medium, high. `xhigh` is supported for gpt-5.2-codex. | `medium` | +| **ENFORCE_FORMATTING** | Add formatting instructions to system prompt for better readability. | `True` | +| **ENABLE_TOOLS** | Enable custom tools (example: random number). | `False` | +| **AVAILABLE_TOOLS** | Available tools: 'all' or comma-separated list. | `all` | -### 3. Get GH_TOKEN +#### User Valves (per-user overrides) + +These optional settings can be set per user (overrides global Valves): + +| Parameter | Description | Default | +| :--- | :--- | :--- | +| **REASONING_EFFORT** | Reasoning effort level (low/medium/high/xhigh). | - | +| **CLI_PATH** | Custom path to Copilot CLI. | - | +| **DEBUG** | Enable technical debug logs. | `False` | +| **SHOW_THINKING** | Show model reasoning/thinking process (requires streaming + model support). | `True` | +| **MODEL_ID** | Custom model ID. | - | + +### 3. Using Custom Tools (🆕 Optional) + +This pipe includes **1 example tool** to demonstrate tool calling: + +* **🎲 generate_random_number**: Generate random integers + +**To enable:** + +1. Set `ENABLE_TOOLS: true` in Valves +2. Try: "Give me a random number" + +**📚 For detailed usage and creating your own tools, see [TOOLS_USAGE.md](TOOLS_USAGE.md)** + +### 4. Get GH_TOKEN For security, it is recommended to use a **Fine-grained Personal Access Token**: @@ -76,6 +107,8 @@ This Pipe will automatically attempt to install the following dependencies: * Check if `GH_TOKEN` is correct and has `Copilot Requests` permission. * **Images not recognized**: * Ensure `MODEL_ID` is a model that supports multimodal input. +* **Thinking not shown**: + * Ensure **streaming is enabled** and the selected model supports reasoning output. * **CLI Installation Failed**: * Ensure the OpenWebUI container has internet access. * You can manually download the CLI and specify `CLI_PATH` in Valves. diff --git a/plugins/pipes/github-copilot-sdk/README_CN.md b/plugins/pipes/github-copilot-sdk/README_CN.md index 1d175c5..9aee8ad 100644 --- a/plugins/pipes/github-copilot-sdk/README_CN.md +++ b/plugins/pipes/github-copilot-sdk/README_CN.md @@ -1,26 +1,25 @@ # GitHub Copilot SDK 官方管道 -**作者:** [Fu-Jie](https://github.com/Fu-Jie/awesome-openwebui) | **版本:** 0.1.1 | **项目:** [Awesome OpenWebUI](https://github.com/Fu-Jie/awesome-openwebui) | **许可证:** MIT +**作者:** [Fu-Jie](https://github.com/Fu-Jie/awesome-openwebui) | **版本:** 0.2.3 | **项目:** [Awesome OpenWebUI](https://github.com/Fu-Jie/awesome-openwebui) | **许可证:** MIT 这是一个用于 [OpenWebUI](https://github.com/open-webui/open-webui) 的高级 Pipe 函数,允许你直接在 OpenWebUI 中使用 GitHub Copilot 模型(如 `gpt-5`, `gpt-5-mini`, `claude-sonnet-4.5`)。它基于官方 [GitHub Copilot SDK for Python](https://github.com/github/copilot-sdk) 构建,提供了原生级的集成体验。 -## 🚀 最新特性 (v0.1.1) +## 🚀 最新特性 (v0.2.3) -* **♾️ 无限会话 (Infinite Sessions)**:支持长对话的自动上下文压缩,告别上下文超限错误! -* **🧠 思考过程展示**:实时显示模型的推理/思考过程(需模型支持)。 -* **📂 工作目录控制**:支持设置受限工作目录,确保文件操作安全。 -* **🔍 模型过滤**:支持通过关键词排除特定模型(如 `codex`, `haiku`)。 -* **💾 会话持久化**: 改进的会话恢复逻辑,直接关联 OpenWebUI 聊天 ID,连接更稳定。 +* **🧩 用户级覆盖**:新增 `REASONING_EFFORT`、`CLI_PATH`、`DEBUG`、`SHOW_THINKING`、`MODEL_ID` 的用户级覆盖。 +* **🧠 思考输出可靠性**:思考显示会遵循用户设置,并正确传递到流式输出中。 +* **📝 格式化输出增强**:自动优化输出格式(短句、段落、列表),并解决了在某些界面下显示过于紧凑的问题。 ## ✨ 核心特性 * **🚀 官方 SDK 集成**:基于官方 SDK,稳定可靠。 +* **🛠️ 自定义工具支持**:内置示例工具(随机数)。易于扩展自定义工具。 * **💬 多轮对话支持**:自动拼接历史上下文,Copilot 能理解你的前文。 * **🌊 流式输出 (Streaming)**:支持打字机效果,响应迅速。 * **🖼️ 多模态支持**:支持上传图片,自动转换为附件发送给 Copilot(需模型支持)。 * **🛠️ 零配置安装**:自动检测并下载 GitHub Copilot CLI,开箱即用。 * **🔑 安全认证**:支持 Fine-grained Personal Access Tokens,权限最小化。 -* **🐛 调试模式**:内置详细的日志输出,方便排查连接问题。 +* **🐛 调试模式**:内置详细的日志输出(浏览器控制台),方便排查问题。 * **⚠️ 仅支持单节点**:由于会话状态存储在本地,本插件目前仅支持 OpenWebUI 单节点部署,或开启了会话粘性 (Sticky Session) 的多节点集群。 ## 📦 安装与使用 @@ -42,16 +41,48 @@ | **GH_TOKEN** | **(必填)** 你的 GitHub Token。 | - | | **MODEL_ID** | 使用的模型名称。 | `gpt-5-mini` | | **CLI_PATH** | Copilot CLI 的路径。如果未找到会自动下载。 | `/usr/local/bin/copilot` | -| **DEBUG** | 是否开启调试日志(输出到对话框)。 | `True` | -| **SHOW_THINKING** | 是否显示模型推理/思考过程。 | `True` | +| **DEBUG** | 是否开启调试日志(输出到浏览器控制台)。 | `False` | +| **LOG_LEVEL** | Copilot CLI 日志级别: none, error, warning, info, debug, all。 | `error` | +| **SHOW_THINKING** | 是否显示模型推理/思考过程(需开启流式 + 模型支持)。 | `True` | +| **SHOW_WORKSPACE_INFO** | 在调试模式下显示会话工作空间路径和摘要。 | `True` | | **EXCLUDE_KEYWORDS** | 排除包含这些关键词的模型 (逗号分隔)。 | - | | **WORKSPACE_DIR** | 文件操作的受限工作目录。 | - | | **INFINITE_SESSION** | 启用无限会话 (自动上下文压缩)。 | `True` | | **COMPACTION_THRESHOLD** | 后台压缩阈值 (0.0-1.0)。 | `0.8` | | **BUFFER_THRESHOLD** | 缓冲耗尽阈值 (0.0-1.0)。 | `0.95` | | **TIMEOUT** | 流式数据块超时时间 (秒)。 | `300` | +| **CUSTOM_ENV_VARS** | 自定义环境变量 (JSON 格式)。 | - | +| **ENABLE_TOOLS** | 启用自定义工具 (示例:随机数)。 | `False` | +| **AVAILABLE_TOOLS** | 可用工具: 'all' 或逗号分隔列表。 | `all` | +| **REASONING_EFFORT** | 推理强度级别:low, medium, high。`gpt-5.2-codex`额外支持`xhigh`。 | `medium` | +| **ENFORCE_FORMATTING** | 是否强制添加格式化指导,以提高输出可读性。 | `True` | -### 3. 获取 GH_TOKEN +#### 用户 Valves(按用户覆盖) + +以下设置可按用户单独配置(覆盖全局 Valves): + +| 参数 | 说明 | 默认值 | +| :--- | :--- | :--- | +| **REASONING_EFFORT** | 推理强度级别(low/medium/high/xhigh)。 | - | +| **CLI_PATH** | 自定义 Copilot CLI 路径。 | - | +| **DEBUG** | 是否启用技术调试日志。 | `False` | +| **SHOW_THINKING** | 是否显示思考过程(需开启流式 + 模型支持)。 | `True` | +| **MODEL_ID** | 自定义模型 ID。 | - | + +### 3. 使用自定义工具 (🆕 可选) + +本 Pipe 内置了 **1 个示例工具**来展示工具调用功能: + +* **🎲 generate_random_number**:生成随机整数 + +**启用方法:** + +1. 在 Valves 中设置 `ENABLE_TOOLS: true` +2. 尝试问:“给我一个随机数” + +**📚 详细使用说明和创建自定义工具,请参阅 [TOOLS_USAGE.md](TOOLS_USAGE.md)** + +### 4. 获取 GH_TOKEN 为了安全起见,推荐使用 **Fine-grained Personal Access Token**: @@ -79,3 +110,5 @@ * **CLI 安装失败**: * 确保 OpenWebUI 容器有外网访问权限。 * 你可以手动下载 CLI 并挂载到容器中,然后在 Valves 中指定 `CLI_PATH`。 +* **看不到思考过程**: + * 确认已开启**流式输出**,且所选模型支持推理输出。 diff --git a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py index 0d948b9..a1f5e59 100644 --- a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py +++ b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py @@ -4,13 +4,12 @@ author: Fu-Jie author_url: https://github.com/Fu-Jie/awesome-openwebui funding_url: https://github.com/open-webui openwebui_id: ce96f7b4-12fc-4ac3-9a01-875713e69359 -description: Integrate GitHub Copilot SDK. Supports dynamic models, multi-turn conversation, streaming, multimodal input, and infinite sessions (context compaction). -version: 0.1.1 +description: Integrate GitHub Copilot SDK. Supports dynamic models, multi-turn conversation, streaming, multimodal input, infinite sessions, and frontend debug logging. +version: 0.2.3 requirements: github-copilot-sdk """ import os -import time import json import base64 import tempfile @@ -18,19 +17,29 @@ import asyncio import logging import shutil import subprocess -import sys from typing import Optional, Union, AsyncGenerator, List, Any, Dict from pydantic import BaseModel, Field -from datetime import datetime, timezone -import contextlib + +# Import copilot SDK modules +from copilot import CopilotClient, define_tool # Setup logger logger = logging.getLogger(__name__) -# Global client storage -_SHARED_CLIENT = None -_SHARED_TOKEN = "" -_CLIENT_LOCK = asyncio.Lock() + +class RandomNumberParams(BaseModel): + min: int = Field(description="Minimum value (inclusive)") + max: int = Field(description="Maximum value (inclusive)") + + +@define_tool(description="Generate a random integer within a specified range.") +async def generate_random_number(params: RandomNumberParams) -> str: + import random + + if params.min >= params.max: + raise ValueError("min must be less than max") + number = random.randint(params.min, params.max) + return f"Generated random number: {number}" class Pipe: @@ -40,7 +49,7 @@ class Pipe: description="GitHub Fine-grained Token (Requires 'Copilot Requests' permission)", ) MODEL_ID: str = Field( - default="claude-sonnet-4.5", + default="gpt-5-mini", description="Default Copilot model name (used when dynamic fetching fails)", ) CLI_PATH: str = Field( @@ -51,10 +60,18 @@ class Pipe: default=False, description="Enable technical debug logs (connection info, etc.)", ) + LOG_LEVEL: str = Field( + default="error", + description="Copilot CLI log level: none, error, warning, info, debug, all", + ) SHOW_THINKING: bool = Field( default=True, description="Show model reasoning/thinking process", ) + SHOW_WORKSPACE_INFO: bool = Field( + default=True, + description="Show session workspace path and summary in debug mode", + ) EXCLUDE_KEYWORDS: str = Field( default="", description="Exclude models containing these keywords (comma separated, e.g.: codex, haiku)", @@ -79,6 +96,48 @@ class Pipe: default=300, description="Timeout for each stream chunk (seconds)", ) + CUSTOM_ENV_VARS: str = Field( + default="", + description='Custom environment variables (JSON format, e.g., {"VAR": "value"})', + ) + ENABLE_TOOLS: bool = Field( + default=False, + description="Enable custom tools (example: random number)", + ) + AVAILABLE_TOOLS: str = Field( + default="all", + description="Available tools: 'all' or comma-separated list (e.g., 'generate_random_number')", + ) + REASONING_EFFORT: str = Field( + default="medium", + description="Reasoning effort level: low, medium, high. (gpt-5.2-codex also supports xhigh)", + ) + ENFORCE_FORMATTING: bool = Field( + default=True, + description="Add formatting instructions to system prompt for better readability (paragraphs, line breaks, structure).", + ) + + class UserValves(BaseModel): + REASONING_EFFORT: str = Field( + default="", + description="Reasoning effort level (low, medium, high, xhigh). Leave empty to use global setting.", + ) + CLI_PATH: str = Field( + default="", + description="Custom path to Copilot CLI. Leave empty to use global setting.", + ) + DEBUG: bool = Field( + default=False, + description="Enable technical debug logs (connection info, etc.)", + ) + SHOW_THINKING: bool = Field( + default=True, + description="Show model reasoning/thinking process", + ) + MODEL_ID: str = Field( + default="", + description="Custom model ID (e.g. gpt-4o). Leave empty to use global default.", + ) def __init__(self): self.type = "pipe" @@ -95,17 +154,334 @@ class Pipe: except: pass - def _emit_debug_log(self, message: str): - """Emit debug log to frontend if DEBUG valve is enabled.""" - if self.valves.DEBUG: - print(f"[Copilot Pipe] {message}") + # ==================== Fixed System Entry ==================== + # pipe() is the stable entry point used by OpenWebUI to handle requests. + # Keep this section near the top for quick navigation and maintenance. + # ============================================================= + async def pipe( + self, + body: dict, + __metadata__: Optional[dict] = None, + __user__: Optional[dict] = None, + __event_emitter__=None, + __event_call__=None, + ) -> Union[str, AsyncGenerator]: + return await self._pipe_impl( + body, + __metadata__=__metadata__, + __user__=__user__, + __event_emitter__=__event_emitter__, + __event_call__=__event_call__, + ) + + # ==================== Functional Areas ==================== + # 1) Tool registration: define tools and register in _initialize_custom_tools + # 2) Debug logging: _emit_debug_log / _emit_debug_log_sync + # 3) Prompt/session: _extract_system_prompt / _build_session_config / _build_prompt + # 4) Runtime flow: pipe() for request, stream_response() for streaming + # ============================================================ + # ==================== Custom Tool Examples ==================== + # Tool registration: Add @define_tool decorated functions at module level, + # then register them in _initialize_custom_tools() -> all_tools dict. + def _initialize_custom_tools(self): + """Initialize custom tools based on configuration""" + if not self.valves.ENABLE_TOOLS: + return [] + + # Define all available tools (register new tools here) + all_tools = { + "generate_random_number": generate_random_number, + } + + # Filter based on configuration + if self.valves.AVAILABLE_TOOLS == "all": + return list(all_tools.values()) + + # Only enable specified tools + enabled = [t.strip() for t in self.valves.AVAILABLE_TOOLS.split(",")] + return [all_tools[name] for name in enabled if name in all_tools] + + async def _emit_debug_log(self, message: str, __event_call__=None): + """Emit debug log to frontend (console) when DEBUG is enabled.""" + # Check user config first if available (will need to be passed down or stored) + # For now we only check global valves in this helper, but in pipe implementation we respect user setting. + # This helper might need refactoring to accept user_debug_setting + if not self.valves.DEBUG: + return + + logger.debug(f"[Copilot Pipe] {message}") + + if not __event_call__: + return + + try: + js_code = f""" + (async function() {{ + console.debug("%c[Copilot Pipe] " + {json.dumps(message, ensure_ascii=False)}, "color: #3b82f6;"); + }})(); + """ + await __event_call__({"type": "execute", "data": {"code": js_code}}) + except Exception as e: + logger.debug(f"[Copilot Pipe] Frontend debug log failed: {e}") + + def _emit_debug_log_sync(self, message: str, __event_call__=None): + """Sync wrapper for debug logging in non-async contexts.""" + if not self.valves.DEBUG: + return + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + logger.debug(f"[Copilot Pipe] {message}") + return + + loop.create_task(self._emit_debug_log(message, __event_call__)) + + def _extract_text_from_content(self, content) -> str: + """Extract text content from various message content formats.""" + if isinstance(content, str): + return content + elif isinstance(content, list): + text_parts = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_parts.append(item.get("text", "")) + return " ".join(text_parts) + return "" + + def _apply_formatting_hint(self, prompt: str) -> str: + """Append a lightweight formatting hint to the user prompt when enabled.""" + if not self.valves.ENFORCE_FORMATTING: + return prompt + + if not prompt: + return prompt + + if "[Formatting Guidelines]" in prompt or "[Formatting Request]" in prompt: + return prompt + + formatting_hint = ( + "\n\n[Formatting Request]\n" + "Please format your response with clear paragraph breaks, short sentences, " + "and bullet lists when appropriate." + ) + return f"{prompt}{formatting_hint}" + + def _dedupe_preserve_order(self, items: List[str]) -> List[str]: + """Deduplicate while preserving order.""" + seen = set() + result = [] + for item in items: + if not item or item in seen: + continue + seen.add(item) + result.append(item) + return result + + def _collect_model_ids( + self, body: dict, request_model: str, real_model_id: str + ) -> List[str]: + """Collect possible model IDs from request/metadata/body params.""" + model_ids: List[str] = [] + if request_model: + model_ids.append(request_model) + if request_model.startswith(f"{self.id}-"): + model_ids.append(request_model[len(f"{self.id}-") :]) + if real_model_id: + model_ids.append(real_model_id) + + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + meta_model_id = metadata.get("model_id") + if isinstance(meta_model, str): + model_ids.append(meta_model) + if isinstance(meta_model_id, str): + model_ids.append(meta_model_id) + + body_params = body.get("params", {}) + if isinstance(body_params, dict): + for key in ("model", "model_id", "modelId"): + val = body_params.get(key) + if isinstance(val, str): + model_ids.append(val) + + return self._dedupe_preserve_order(model_ids) + + async def _extract_system_prompt( + self, + body: dict, + messages: List[dict], + request_model: str, + real_model_id: str, + __event_call__=None, + ) -> tuple[Optional[str], str]: + """Extract system prompt from metadata/model DB/body/messages.""" + system_prompt_content: Optional[str] = None + system_prompt_source = "" + + # 1) metadata.model.params.system + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + if isinstance(meta_model, dict): + meta_params = meta_model.get("params") + if isinstance(meta_params, dict) and meta_params.get("system"): + system_prompt_content = meta_params.get("system") + system_prompt_source = "metadata.model.params" + await self._emit_debug_log( + f"Extracted system prompt from metadata.model.params (length: {len(system_prompt_content)})", + __event_call__, + ) + + # 2) model DB lookup + if not system_prompt_content: + try: + from open_webui.models.models import Models + + model_ids_to_try = self._collect_model_ids( + body, request_model, real_model_id + ) + for mid in model_ids_to_try: + model_record = Models.get_model_by_id(mid) + if model_record and hasattr(model_record, "params"): + params = model_record.params + if isinstance(params, dict): + system_prompt_content = params.get("system") + if system_prompt_content: + system_prompt_source = f"model_db:{mid}" + await self._emit_debug_log( + f"Extracted system prompt from model DB (length: {len(system_prompt_content)})", + __event_call__, + ) + break + except Exception as e: + await self._emit_debug_log( + f"Failed to extract system prompt from model DB: {e}", + __event_call__, + ) + + # 3) body.params.system + if not system_prompt_content: + body_params = body.get("params", {}) + if isinstance(body_params, dict): + system_prompt_content = body_params.get("system") + if system_prompt_content: + system_prompt_source = "body_params" + await self._emit_debug_log( + f"Extracted system prompt from body.params (length: {len(system_prompt_content)})", + __event_call__, + ) + + # 4) messages (role=system) + if not system_prompt_content: + for msg in messages: + if msg.get("role") == "system": + system_prompt_content = self._extract_text_from_content( + msg.get("content", "") + ) + if system_prompt_content: + system_prompt_source = "messages_system" + await self._emit_debug_log( + f"Extracted system prompt from messages (length: {len(system_prompt_content)})", + __event_call__, + ) + break + + return system_prompt_content, system_prompt_source + + def _build_client_config(self, body: dict) -> dict: + """Build CopilotClient config from valves and request body.""" + cwd = self.valves.WORKSPACE_DIR if self.valves.WORKSPACE_DIR else os.getcwd() + client_config = {} + if os.environ.get("COPILOT_CLI_PATH"): + client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] + client_config["cwd"] = cwd + + if self.valves.LOG_LEVEL: + client_config["log_level"] = self.valves.LOG_LEVEL + + if self.valves.CUSTOM_ENV_VARS: + try: + custom_env = json.loads(self.valves.CUSTOM_ENV_VARS) + if isinstance(custom_env, dict): + client_config["env"] = custom_env + except: + pass + + return client_config + + def _build_session_config( + self, + chat_id: Optional[str], + real_model_id: str, + custom_tools: List[Any], + system_prompt_content: Optional[str], + is_streaming: bool, + reasoning_effort: str = "", + ): + """Build SessionConfig for Copilot SDK.""" + from copilot.types import SessionConfig, InfiniteSessionConfig + + infinite_session_config = None + if self.valves.INFINITE_SESSION: + infinite_session_config = InfiniteSessionConfig( + enabled=True, + background_compaction_threshold=self.valves.COMPACTION_THRESHOLD, + buffer_exhaustion_threshold=self.valves.BUFFER_THRESHOLD, + ) + + system_message_config = None + if system_prompt_content or self.valves.ENFORCE_FORMATTING: + # Build system message content + system_parts = [] + + if system_prompt_content: + system_parts.append(system_prompt_content) + + if self.valves.ENFORCE_FORMATTING: + formatting_instruction = ( + "\n\n[Formatting Guidelines]\n" + "When providing explanations or descriptions:\n" + "- Use clear paragraph breaks (double line breaks)\n" + "- Break long sentences into multiple shorter ones\n" + "- Use bullet points or numbered lists for multiple items\n" + "- Add headings (##, ###) for major sections\n" + "- Ensure proper spacing between different topics" + ) + system_parts.append(formatting_instruction) + logger.info( + f"[ENFORCE_FORMATTING] Added formatting instructions to system prompt" + ) + + if system_parts: + system_message_config = { + "mode": "append", + "content": "\n".join(system_parts), + } + + # Prepare session config parameters + session_params = { + "session_id": chat_id if chat_id else None, + "model": real_model_id, + "streaming": is_streaming, + "tools": custom_tools, + "system_message": system_message_config, + "infinite_sessions": infinite_session_config, + } + + # Add reasoning_effort if not default (medium) + if reasoning_effort and reasoning_effort.lower() != "medium": + session_params["reasoning_effort"] = reasoning_effort.lower() + + return SessionConfig(**session_params) def _get_user_context(self): """Helper to get user context (placeholder for future use).""" return {} def _get_chat_context( - self, body: dict, __metadata__: Optional[dict] = None + self, body: dict, __metadata__: Optional[dict] = None, __event_call__=None ) -> Dict[str, str]: """ Highly reliable chat context extraction logic. @@ -136,12 +512,15 @@ class Pipe: # Debug: Log ID source if chat_id: - self._emit_debug_log(f"Extracted ChatID: {chat_id} (Source: {source})") + self._emit_debug_log_sync( + f"Extracted ChatID: {chat_id} (Source: {source})", __event_call__ + ) else: # If still not found, log body keys for troubleshooting keys = list(body.keys()) if isinstance(body, dict) else "not a dict" - self._emit_debug_log( - f"Warning: Failed to extract ChatID. Body keys: {keys}" + self._emit_debug_log_sync( + f"Warning: Failed to extract ChatID. Body keys: {keys}", + __event_call__, ) return { @@ -154,14 +533,12 @@ class Pipe: if self._model_cache: return self._model_cache - self._emit_debug_log("Fetching model list dynamically...") + await self._emit_debug_log("Fetching model list dynamically...") try: self._setup_env() if not self.valves.GH_TOKEN: return [{"id": f"{self.id}-error", "name": "Error: GH_TOKEN not set"}] - from copilot import CopilotClient - client_config = {} if os.environ.get("COPILOT_CLI_PATH"): client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] @@ -242,12 +619,12 @@ class Pipe: {"id": m["id"], "name": m["name"]} for m in models_with_info ] - self._emit_debug_log( + await self._emit_debug_log( f"Successfully fetched {len(self._model_cache)} models (filtered)" ) return self._model_cache except Exception as e: - self._emit_debug_log(f"Failed to fetch model list: {e}") + await self._emit_debug_log(f"Failed to fetch model list: {e}") # Return default model on failure return [ { @@ -258,7 +635,7 @@ class Pipe: finally: await client.stop() except Exception as e: - self._emit_debug_log(f"Pipes Error: {e}") + await self._emit_debug_log(f"Pipes Error: {e}") return [ { "id": f"{self.id}-{self.valves.MODEL_ID}", @@ -268,8 +645,6 @@ class Pipe: async def _get_client(self): """Helper to get or create a CopilotClient instance.""" - from copilot import CopilotClient - client_config = {} if os.environ.get("COPILOT_CLI_PATH"): client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] @@ -278,7 +653,7 @@ class Pipe: await client.start() return client - def _setup_env(self): + def _setup_env(self, __event_call__=None): cli_path = self.valves.CLI_PATH found = False @@ -310,11 +685,44 @@ class Pipe: if cli_dir not in os.environ["PATH"]: os.environ["PATH"] = f"{cli_dir}:{os.environ['PATH']}" + if self.valves.DEBUG: + self._emit_debug_log_sync( + f"Copilot CLI found at: {cli_path}", __event_call__ + ) + try: + # Try to get version to confirm it's executable + ver = ( + subprocess.check_output( + [cli_path, "--version"], stderr=subprocess.STDOUT + ) + .decode() + .strip() + ) + self._emit_debug_log_sync( + f"Copilot CLI Version: {ver}", __event_call__ + ) + except Exception as e: + self._emit_debug_log_sync( + f"Warning: Copilot CLI found but failed to run: {e}", + __event_call__, + ) + else: + if self.valves.DEBUG: + self._emit_debug_log_sync( + "Error: Copilot CLI NOT found. Agent capabilities will be disabled.", + __event_call__, + ) + if self.valves.GH_TOKEN: os.environ["GH_TOKEN"] = self.valves.GH_TOKEN os.environ["GITHUB_TOKEN"] = self.valves.GH_TOKEN + else: + if self.valves.DEBUG: + self._emit_debug_log_sync( + "Warning: GH_TOKEN is not set.", __event_call__ + ) - def _process_images(self, messages): + def _process_images(self, messages, __event_call__=None): attachments = [] text_content = "" if not messages: @@ -343,17 +751,90 @@ class Pipe: "display_name": file_name, } ) - self._emit_debug_log(f"Image processed: {file_path}") + self._emit_debug_log_sync( + f"Image processed: {file_path}", __event_call__ + ) except Exception as e: - self._emit_debug_log(f"Image error: {e}") + self._emit_debug_log_sync( + f"Image error: {e}", __event_call__ + ) else: text_content = str(content) return text_content, attachments - async def pipe( - self, body: dict, __metadata__: Optional[dict] = None + def _sync_copilot_config(self, reasoning_effort: str, __event_call__=None): + """ + Dynamically update ~/.copilot/config.json if REASONING_EFFORT is set. + This provides a fallback if API injection is ignored by the server. + """ + if not reasoning_effort: + return + + effort = reasoning_effort + + # Check model support for xhigh + # Only gpt-5.2-codex supports xhigh currently + if effort == "xhigh": + if ( + "gpt-5.2-codex" + not in self._collect_model_ids( + body={}, + request_model=self.valves.MODEL_ID, + real_model_id=self.valves.MODEL_ID, + )[0].lower() + ): + # Fallback to high if not supported + effort = "high" + + try: + # Target standard path ~/.copilot/config.json + config_path = os.path.expanduser("~/.copilot/config.json") + config_dir = os.path.dirname(config_path) + + # Only proceed if directory exists (avoid creating trash types of files if path is wrong) + if not os.path.exists(config_dir): + return + + data = {} + # Read existing config + if os.path.exists(config_path): + try: + with open(config_path, "r") as f: + data = json.load(f) + except Exception: + data = {} + + # Update if changed + current_val = data.get("reasoning_effort") + if current_val != effort: + data["reasoning_effort"] = effort + try: + with open(config_path, "w") as f: + json.dump(data, f, indent=4) + + self._emit_debug_log_sync( + f"Dynamically updated ~/.copilot/config.json: reasoning_effort='{effort}'", + __event_call__, + ) + except Exception as e: + self._emit_debug_log_sync( + f"Failed to write config.json: {e}", __event_call__ + ) + except Exception as e: + self._emit_debug_log_sync(f"Config sync check failed: {e}", __event_call__) + + # ==================== Internal Implementation ==================== + # _pipe_impl() contains the main request handling logic. + # ================================================================ + async def _pipe_impl( + self, + body: dict, + __metadata__: Optional[dict] = None, + __user__: Optional[dict] = None, + __event_emitter__=None, + __event_call__=None, ) -> Union[str, AsyncGenerator]: - self._setup_env() + self._setup_env(__event_call__) if not self.valves.GH_TOKEN: return "Error: Please configure GH_TOKEN in Valves." @@ -361,277 +842,540 @@ class Pipe: request_model = body.get("model", "") real_model_id = self.valves.MODEL_ID # Default value + # Determine effective reasoning effort and debug setting + if __user__: + raw_valves = __user__.get("valves", {}) + if isinstance(raw_valves, self.UserValves): + user_valves = raw_valves + elif isinstance(raw_valves, dict): + user_valves = self.UserValves(**raw_valves) + else: + user_valves = self.UserValves() + else: + user_valves = self.UserValves() + effective_reasoning_effort = ( + user_valves.REASONING_EFFORT + if user_valves.REASONING_EFFORT + else self.valves.REASONING_EFFORT + ) + # Apply DEBUG user setting override if set to True (if False, respect global) + # Actually user setting should probably override strictly. + # But boolean fields in UserValves default to False, so we can't distinguish "not set" from "off" easily without Optional[bool] + # Let's assume if user sets DEBUG=True, it wins. + if user_valves.DEBUG: + self.valves.DEBUG = True + + # Apply SHOW_THINKING user setting (prefer user override when provided) + show_thinking = ( + user_valves.SHOW_THINKING + if user_valves.SHOW_THINKING is not None + else self.valves.SHOW_THINKING + ) + if request_model.startswith(f"{self.id}-"): real_model_id = request_model[len(f"{self.id}-") :] - self._emit_debug_log(f"Using selected model: {real_model_id}") + await self._emit_debug_log( + f"Using selected model: {real_model_id}", __event_call__ + ) messages = body.get("messages", []) if not messages: return "No messages." # Get Chat ID using improved helper - chat_ctx = self._get_chat_context(body, __metadata__) + chat_ctx = self._get_chat_context(body, __metadata__, __event_call__) chat_id = chat_ctx.get("chat_id") - is_streaming = body.get("stream", False) - self._emit_debug_log(f"Request Streaming: {is_streaming}") + # Extract system prompt from multiple sources + system_prompt_content, system_prompt_source = await self._extract_system_prompt( + body, messages, request_model, real_model_id, __event_call__ + ) - last_text, attachments = self._process_images(messages) + if system_prompt_content: + preview = system_prompt_content[:60].replace("\n", " ") + await self._emit_debug_log( + f"System prompt confirmed (source: {system_prompt_source}, length: {len(system_prompt_content)}, preview: {preview})", + __event_call__, + ) + + is_streaming = body.get("stream", False) + await self._emit_debug_log(f"Request Streaming: {is_streaming}", __event_call__) + + last_text, attachments = self._process_images(messages, __event_call__) # Determine prompt strategy # If we have a chat_id, we try to resume session. # If resumed, we assume the session has history, so we only send the last message. - # If new session, we send full history (or at least the last few turns if we want to be safe, but let's send full for now). + # If new session, we send full (accumulated) messages. - # However, to be robust against history edits in OpenWebUI, we might want to always send full history? - # Copilot SDK `create_session` doesn't take history. `session.send` appends. - # If we resume, we append. - # If user edited history, the session state is stale. - # For now, we implement "Resume if possible, else Create". - - prompt = "" - is_new_session = True + # Ensure we have the latest config + self._sync_copilot_config(effective_reasoning_effort, __event_call__) + # Initialize Client + client = CopilotClient(self._build_client_config(body)) + should_stop_client = True try: - client = await self._get_client() - session = None + await client.start() - if chat_id: - try: - # Try to resume session using chat_id as session_id - session = await client.resume_session(chat_id) - self._emit_debug_log(f"Resumed session using ChatID: {chat_id}") - is_new_session = False - except Exception: - # Resume failed, session might not exist on disk - self._emit_debug_log( - f"Session {chat_id} not found or expired, creating new." - ) - session = None - - if session is None: - # Create new session - from copilot.types import SessionConfig, InfiniteSessionConfig - - # Infinite Session Config - infinite_session_config = None - if self.valves.INFINITE_SESSION: - infinite_session_config = InfiniteSessionConfig( - enabled=True, - background_compaction_threshold=self.valves.COMPACTION_THRESHOLD, - buffer_exhaustion_threshold=self.valves.BUFFER_THRESHOLD, - ) - - session_config = SessionConfig( - session_id=( - chat_id if chat_id else None - ), # Use chat_id as session_id - model=real_model_id, - streaming=body.get("stream", False), - infinite_sessions=infinite_session_config, + # Initialize custom tools + custom_tools = self._initialize_custom_tools() + if custom_tools: + tool_names = [t.name for t in custom_tools] + await self._emit_debug_log( + f"Enabled {len(custom_tools)} custom tools: {tool_names}", + __event_call__, ) - session = await client.create_session(config=session_config) + # Create or Resume Session + session = None + if chat_id: + try: + session = await client.resume_session(chat_id) + await self._emit_debug_log( + f"Resumed session: {chat_id} (Note: Formatting guidelines only apply to NEW sessions. Create a new chat to use updated formatting.)", + __event_call__, + ) - new_sid = getattr(session, "session_id", getattr(session, "id", None)) - self._emit_debug_log(f"Created new session: {new_sid}") + # Show workspace info if available + if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO: + if session.workspace_path: + await self._emit_debug_log( + f"Session workspace: {session.workspace_path}", + __event_call__, + ) - # Construct prompt - if is_new_session: - # For new session, send full conversation history - full_conversation = [] - for msg in messages[:-1]: - role = msg.get("role", "user").upper() - content = msg.get("content", "") - if isinstance(content, list): - content = " ".join( - [ - c.get("text", "") - for c in content - if c.get("type") == "text" - ] + is_new_session = False + except Exception as e: + await self._emit_debug_log( + f"Session {chat_id} not found ({str(e)}), creating new.", + __event_call__, + ) + + if session is None: + session_config = self._build_session_config( + chat_id, + real_model_id, + custom_tools, + system_prompt_content, + is_streaming, + ) + if system_prompt_content or self.valves.ENFORCE_FORMATTING: + # Build preview of what's being sent + preview_parts = [] + if system_prompt_content: + preview_parts.append( + f"custom_prompt: {system_prompt_content[:100]}..." ) - full_conversation.append(f"{role}: {content}") - full_conversation.append(f"User: {last_text}") - prompt = "\n\n".join(full_conversation) - else: - # For resumed session, only send the last message - prompt = last_text + if self.valves.ENFORCE_FORMATTING: + preview_parts.append("formatting_guidelines: enabled") + + if isinstance(session_config, dict): + system_config = session_config.get("system_message", {}) + else: + system_config = getattr(session_config, "system_message", None) + + if isinstance(system_config, dict): + full_content = system_config.get("content", "") + else: + full_content = "" + + await self._emit_debug_log( + f"System message config - {', '.join(preview_parts)} (total length: {len(full_content)} chars)", + __event_call__, + ) + session = await client.create_session(config=session_config) + await self._emit_debug_log( + f"Created new session with model: {real_model_id}", + __event_call__, + ) + + # Show workspace info for new sessions + if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO: + if session.workspace_path: + await self._emit_debug_log( + f"Session workspace: {session.workspace_path}", + __event_call__, + ) + + # Construct Prompt (session-based: send only latest user input) + prompt = self._apply_formatting_hint(last_text) + + await self._emit_debug_log( + f"Sending prompt ({len(prompt)} chars) to Agent...", + __event_call__, + ) send_payload = {"prompt": prompt, "mode": "immediate"} if attachments: send_payload["attachments"] = attachments if body.get("stream", False): - # Determine session status message for UI init_msg = "" if self.valves.DEBUG: - if is_new_session: - new_sid = getattr( - session, "session_id", getattr(session, "id", "unknown") - ) - init_msg = f"> [Debug] Created new session: {new_sid}\n" - else: - init_msg = ( - f"> [Debug] Resumed session using ChatID: {chat_id}\n" - ) + init_msg = f"> [Debug] Agent working in: {os.getcwd()}\n" - return self.stream_response(client, session, send_payload, init_msg) + # Transfer client ownership to stream_response + should_stop_client = False + return self.stream_response( + client, + session, + send_payload, + init_msg, + __event_call__, + reasoning_effort=effective_reasoning_effort, + show_thinking=show_thinking, + ) else: try: response = await session.send_and_wait(send_payload) return response.data.content if response else "Empty response." finally: - # Destroy session object to free memory, but KEEP data on disk - await session.destroy() - + # Cleanup: destroy session if no chat_id (temporary session) + if not chat_id: + try: + await session.destroy() + except Exception as cleanup_error: + await self._emit_debug_log( + f"Session cleanup warning: {cleanup_error}", + __event_call__, + ) except Exception as e: - self._emit_debug_log(f"Request Error: {e}") + await self._emit_debug_log(f"Request Error: {e}", __event_call__) return f"Error: {str(e)}" + finally: + # Cleanup client if not transferred to stream + if should_stop_client: + try: + await client.stop() + except Exception as e: + await self._emit_debug_log( + f"Client cleanup warning: {e}", __event_call__ + ) async def stream_response( - self, client, session, send_payload, init_message: str = "" + self, + client, + session, + send_payload, + init_message: str = "", + __event_call__=None, + reasoning_effort: str = "", + show_thinking: bool = True, ) -> AsyncGenerator: + """ + Stream response from Copilot SDK, handling various event types. + Follows official SDK patterns for event handling and streaming. + """ + from copilot.generated.session_events import SessionEventType + queue = asyncio.Queue() done = asyncio.Event() - self.thinking_started = False + SENTINEL = object() + # Use local state to handle concurrency and tracking + state = {"thinking_started": False, "content_sent": False} has_content = False # Track if any content has been yielded + active_tools = {} # Map tool_call_id to tool_name - def get_event_data(event, attr, default=""): - if hasattr(event, "data"): - data = event.data - if data is None: - return default - if isinstance(data, (str, int, float, bool)): - return str(data) if attr == "value" else default + def get_event_type(event) -> str: + """Extract event type as string, handling both enum and string types.""" + if hasattr(event, "type"): + event_type = event.type + # Handle SessionEventType enum + if hasattr(event_type, "value"): + return event_type.value + return str(event_type) + return "unknown" - if isinstance(data, dict): - val = data.get(attr) - if val is None: - alt_attr = attr.replace("_", "") if "_" in attr else attr - val = data.get(alt_attr) - if val is None and "_" not in attr: - # Try snake_case if camelCase failed - import re + def safe_get_data_attr(event, attr: str, default=None): + """ + Safely extract attribute from event.data. + Handles both dict access and object attribute access. + """ + if not hasattr(event, "data") or event.data is None: + return default - snake_attr = re.sub(r"(?\n") + state["thinking_started"] = False + queue.put_nowait(delta) + + # === Complete Message Event (Non-streaming response) === + elif event_type == "assistant.message": + # Handle complete message (when SDK returns full content instead of deltas) + content = safe_get_data_attr(event, "content") or safe_get_data_attr( + event, "message" + ) + if content: + state["content_sent"] = True + if state["thinking_started"]: + queue.put_nowait("\n\n") + state["thinking_started"] = False + queue.put_nowait(content) + + # === Reasoning Delta Events (Chain-of-thought streaming) === + elif event_type == "assistant.reasoning_delta": + delta = safe_get_data_attr( + event, "delta_content" + ) or safe_get_data_attr(event, "deltaContent") + if delta: + # Suppress late-arriving reasoning if content already started + if state["content_sent"]: + return + + # Use UserValves or Global Valve for thinking visibility + if not state["thinking_started"] and show_thinking: + queue.put_nowait("\n") + state["thinking_started"] = True + if state["thinking_started"]: + queue.put_nowait(delta) + + # === Complete Reasoning Event (Non-streaming reasoning) === + elif event_type == "assistant.reasoning": + # Handle complete reasoning content + reasoning = safe_get_data_attr(event, "content") or safe_get_data_attr( + event, "reasoning" + ) + if reasoning: + # Suppress late-arriving reasoning if content already started + if state["content_sent"]: + return + + if not state["thinking_started"] and show_thinking: + queue.put_nowait("\n") + state["thinking_started"] = True + if state["thinking_started"]: + queue.put_nowait(reasoning) + + # === Tool Execution Events === + elif event_type == "tool.execution_start": + tool_name = ( + safe_get_data_attr(event, "name") + or safe_get_data_attr(event, "tool_name") + or "Unknown Tool" + ) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + + # Get tool arguments + tool_args = {} try: - data_str = str(event.data) if hasattr(event, "data") else "no data" - self._emit_debug_log(f"Tool Event [{event_type}]: {data_str}") + args_obj = safe_get_data_attr(event, "arguments") + if isinstance(args_obj, dict): + tool_args = args_obj + elif isinstance(args_obj, str): + tool_args = json.loads(args_obj) except: pass - self._emit_debug_log(f"Event: {event_type}") + if tool_call_id: + active_tools[tool_call_id] = { + "name": tool_name, + "arguments": tool_args, + } - # Handle message content (delta or full) - if event_type in [ - "assistant.message_delta", - "assistant.message.delta", - "assistant.message", - ]: - # Log full message event for troubleshooting why there's no delta - if event_type == "assistant.message": - self._emit_debug_log( - f"Received full message event (non-Delta): {get_event_data(event, 'content')[:50]}..." - ) + # Close thinking tag if open before showing tool + if state["thinking_started"]: + queue.put_nowait("\n\n") + state["thinking_started"] = False - delta = ( - get_event_data(event, "delta_content") - or get_event_data(event, "deltaContent") - or get_event_data(event, "content") - or get_event_data(event, "text") - ) - if delta: - if self.thinking_started: - queue.put_nowait("\n\n") - self.thinking_started = False - queue.put_nowait(delta) + # Display tool call with improved formatting + if tool_args: + tool_args_json = json.dumps(tool_args, indent=2, ensure_ascii=False) + tool_display = f"\n\n
\n🔧 Executing Tool: {tool_name}\n\n**Parameters:**\n\n```json\n{tool_args_json}\n```\n\n
\n\n" + else: + tool_display = f"\n\n
\n🔧 Executing Tool: {tool_name}\n\n*No parameters*\n\n
\n\n" - elif event_type in [ - "assistant.reasoning_delta", - "assistant.reasoning.delta", - "assistant.reasoning", - ]: - delta = ( - get_event_data(event, "delta_content") - or get_event_data(event, "deltaContent") - or get_event_data(event, "content") - or get_event_data(event, "text") - ) - if delta: - if not self.thinking_started and self.valves.SHOW_THINKING: - queue.put_nowait("\n") - self.thinking_started = True - if self.thinking_started: - queue.put_nowait(delta) + queue.put_nowait(tool_display) - elif event_type == "tool.execution_start": - # Try multiple possible fields for tool name/description - tool_name = ( - get_event_data(event, "toolName") - or get_event_data(event, "name") - or get_event_data(event, "description") - or get_event_data(event, "tool_name") - or "Unknown Tool" - ) - if not self.thinking_started and self.valves.SHOW_THINKING: - queue.put_nowait("\n") - self.thinking_started = True - if self.thinking_started: - queue.put_nowait(f"\nRunning Tool: {tool_name}...\n") - self._emit_debug_log(f"Tool Start: {tool_name}") + self._emit_debug_log_sync(f"Tool Start: {tool_name}", __event_call__) elif event_type == "tool.execution_complete": - if self.thinking_started: - queue.put_nowait("Tool Completed.\n") - self._emit_debug_log("Tool Complete") + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) - elif event_type == "session.compaction_start": - self._emit_debug_log("Session Compaction Started") + # Handle both old string format and new dict format + if isinstance(tool_info, str): + tool_name = tool_info + elif isinstance(tool_info, dict): + tool_name = tool_info.get("name", "Unknown Tool") + else: + tool_name = "Unknown Tool" + + # Try to get result content + result_content = "" + result_type = "success" + try: + result_obj = safe_get_data_attr(event, "result") + if hasattr(result_obj, "content"): + result_content = result_obj.content + elif isinstance(result_obj, dict): + result_content = result_obj.get("content", "") + result_type = result_obj.get("result_type", "success") + if not result_content: + # Try to serialize the entire dict if no content field + result_content = json.dumps( + result_obj, indent=2, ensure_ascii=False + ) + except Exception as e: + self._emit_debug_log_sync( + f"Error extracting result: {e}", __event_call__ + ) + result_type = "failure" + result_content = f"Error: {str(e)}" + + # Display tool result with improved formatting + if result_content: + status_icon = "✅" if result_type == "success" else "❌" + + # Try to detect content type for better formatting + is_json = False + try: + json_obj = ( + json.loads(result_content) + if isinstance(result_content, str) + else result_content + ) + if isinstance(json_obj, (dict, list)): + result_content = json.dumps( + json_obj, indent=2, ensure_ascii=False + ) + is_json = True + except: + pass + + # Format based on content type + if is_json: + # JSON content: use code block with syntax highlighting + result_display = f"\n
\n{status_icon} Tool Result: {tool_name}\n\n```json\n{result_content}\n```\n\n
\n\n" + else: + # Plain text: use text code block to preserve formatting and add line breaks + result_display = f"\n
\n{status_icon} Tool Result: {tool_name}\n\n```text\n{result_content}\n```\n\n
\n\n" + + queue.put_nowait(result_display) + + elif event_type == "tool.execution_progress": + # Tool execution progress update (for long-running tools) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) + tool_name = ( + tool_info.get("name", "Unknown Tool") + if isinstance(tool_info, dict) + else "Unknown Tool" + ) + + progress = safe_get_data_attr(event, "progress", 0) + message = safe_get_data_attr(event, "message", "") + + if message: + progress_display = f"\n> 🔄 **{tool_name}**: {message}\n" + queue.put_nowait(progress_display) + + self._emit_debug_log_sync( + f"Tool Progress: {tool_name} - {progress}%", __event_call__ + ) + + elif event_type == "tool.execution_partial_result": + # Streaming tool results (for tools that output incrementally) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) + tool_name = ( + tool_info.get("name", "Unknown Tool") + if isinstance(tool_info, dict) + else "Unknown Tool" + ) + + partial_content = safe_get_data_attr(event, "content", "") + if partial_content: + queue.put_nowait(partial_content) + + self._emit_debug_log_sync( + f"Tool Partial Result: {tool_name}", __event_call__ + ) + + # === Usage Statistics Events === + elif event_type == "assistant.usage": + # Token usage for current assistant turn + if self.valves.DEBUG: + input_tokens = safe_get_data_attr(event, "input_tokens", 0) + output_tokens = safe_get_data_attr(event, "output_tokens", 0) + total_tokens = safe_get_data_attr(event, "total_tokens", 0) + pass + + elif event_type == "session.usage_info": + # Cumulative session usage information + pass elif event_type == "session.compaction_complete": - self._emit_debug_log("Session Compaction Completed") + self._emit_debug_log_sync( + "Session Compaction Completed", __event_call__ + ) elif event_type == "session.idle": + # Session finished processing - signal completion done.set() + try: + queue.put_nowait(SENTINEL) + except: + pass + elif event_type == "session.error": - msg = get_event_data(event, "message", "Unknown Error") - queue.put_nowait(f"\n[Error: {msg}]") + error_msg = safe_get_data_attr(event, "message", "Unknown Error") + queue.put_nowait(f"\n[Error: {error_msg}]") done.set() + try: + queue.put_nowait(SENTINEL) + except: + pass unsubscribe = session.on(handler) - await session.send(send_payload) - if self.valves.DEBUG: - yield "\n" - if init_message: - yield init_message - yield "> [Debug] Connection established, waiting for response...\n" - self.thinking_started = True + self._emit_debug_log_sync( + f"Subscribed to events. Sending request...", __event_call__ + ) + + # Use asyncio.create_task used to prevent session.send from blocking the stream reading + # if the SDK implementation waits for completion. + send_task = asyncio.create_task(session.send(send_payload)) + self._emit_debug_log_sync(f"Prompt sent (async task started)", __event_call__) + + # Safe initial yield with error handling + try: + if self.valves.DEBUG: + yield "\n" + if init_message: + yield init_message + + if reasoning_effort and reasoning_effort != "off": + yield f"> [Debug] Reasoning Effort injected: {reasoning_effort.upper()}\n" + + yield "> [Debug] Connection established, waiting for response...\n" + state["thinking_started"] = True + except Exception as e: + # If initial yield fails, log but continue processing + self._emit_debug_log_sync(f"Initial yield warning: {e}", __event_call__) try: while not done.is_set(): @@ -639,52 +1383,67 @@ class Pipe: chunk = await asyncio.wait_for( queue.get(), timeout=float(self.valves.TIMEOUT) ) + if chunk is SENTINEL: + break if chunk: has_content = True - yield chunk + try: + yield chunk + except Exception as yield_error: + # Connection closed by client, stop gracefully + self._emit_debug_log_sync( + f"Yield error (client disconnected?): {yield_error}", + __event_call__, + ) + break except asyncio.TimeoutError: if done.is_set(): break - if self.thinking_started: - yield f"> [Debug] Waiting for response ({self.valves.TIMEOUT}s exceeded)...\n" + if state["thinking_started"]: + try: + yield f"> [Debug] Waiting for response ({self.valves.TIMEOUT}s exceeded)...\n" + except: + # If yield fails during timeout, connection is gone + break continue while not queue.empty(): chunk = queue.get_nowait() + if chunk is SENTINEL: + break if chunk: has_content = True - yield chunk + try: + yield chunk + except: + # Connection closed, stop yielding + break - if self.thinking_started: - yield "\n\n" - has_content = True + if state["thinking_started"]: + try: + yield "\n\n" + has_content = True + except: + pass # Connection closed # Core fix: If no content was yielded, return a fallback message to prevent OpenWebUI error if not has_content: - yield "⚠️ Copilot returned no content. Please check if the Model ID is correct or enable DEBUG mode in Valves for details." + try: + yield "⚠️ Copilot returned no content. Please check if the Model ID is correct or enable DEBUG mode in Valves for details." + except: + pass # Connection already closed except Exception as e: - yield f"\n[Stream Error: {str(e)}]" + try: + yield f"\n[Stream Error: {str(e)}]" + except: + pass # Connection already closed finally: unsubscribe() - # Only destroy session if it's not cached - # We can't easily check chat_id here without passing it, - # but stream_response is called within the scope where we decide persistence. - # Wait, stream_response takes session as arg. - # We need to know if we should destroy it. - # Let's assume if it's in _SESSIONS, we don't destroy it. - # But checking _SESSIONS here is race-prone or complex. - # Simplified: The caller (pipe) handles destruction logic? - # No, stream_response is a generator, pipe returns it. - # So pipe function exits before stream finishes. - # We need to handle destruction here. - pass - - # TODO: Proper session cleanup for streaming - # For now, we rely on the fact that if we mapped it, we keep it. - # If we didn't map it (no chat_id), we should destroy it. - # But we don't have chat_id here. - # Let's modify stream_response signature or just leave it open for GC? - # CopilotSession doesn't auto-close. - # Let's add a flag to stream_response. - pass + # Cleanup client and session + try: + # We do not destroy session here to allow persistence, + # but we must stop the client. + await client.stop() + except Exception as e: + pass diff --git a/plugins/pipes/github-copilot-sdk/github_copilot_sdk_cn.py b/plugins/pipes/github-copilot-sdk/github_copilot_sdk_cn.py index 4aff184..8f59abe 100644 --- a/plugins/pipes/github-copilot-sdk/github_copilot_sdk_cn.py +++ b/plugins/pipes/github-copilot-sdk/github_copilot_sdk_cn.py @@ -3,8 +3,9 @@ title: GitHub Copilot Official SDK Pipe author: Fu-Jie author_url: https://github.com/Fu-Jie/awesome-openwebui funding_url: https://github.com/open-webui -description: 集成 GitHub Copilot SDK。支持动态模型、多轮对话、流式输出、多模态输入及无限会话(上下文自动压缩)。 -version: 0.1.1 +openwebui_id: ce96f7b4-12fc-4ac3-9a01-875713e69359 +description: 集成 GitHub Copilot SDK。支持动态模型、多轮对话、流式输出、多模态输入、无限会话及前端调试日志。 +version: 0.2.3 requirements: github-copilot-sdk """ @@ -23,121 +24,37 @@ from pydantic import BaseModel, Field from datetime import datetime, timezone import contextlib +# 导入 Copilot SDK 模块 +from copilot import CopilotClient, define_tool + # Setup logger logger = logging.getLogger(__name__) -# Open WebUI internal database (re-use shared connection) -try: - from open_webui.internal import db as owui_db -except ModuleNotFoundError: - owui_db = None + +class RandomNumberParams(BaseModel): + min: int = Field(description="最小值(包含)") + max: int = Field(description="最大值(包含)") -def _discover_owui_engine(db_module: Any) -> Optional[Engine]: - """Discover the Open WebUI SQLAlchemy engine via provided db module helpers.""" - if db_module is None: - return None +@define_tool(description="在指定范围内生成随机整数。") +async def generate_random_number(params: RandomNumberParams) -> str: + import random - db_context = getattr(db_module, "get_db_context", None) or getattr( - db_module, "get_db", None - ) - if callable(db_context): - try: - with db_context() as session: - try: - return session.get_bind() - except AttributeError: - return getattr(session, "bind", None) or getattr( - session, "engine", None - ) - except Exception as exc: - logger.error(f"[DB Discover] get_db_context failed: {exc}") - - for attr in ("engine", "ENGINE", "bind", "BIND"): - candidate = getattr(db_module, attr, None) - if candidate is not None: - return candidate - - return None - - -def _discover_owui_schema(db_module: Any) -> Optional[str]: - """Discover the Open WebUI database schema name if configured.""" - if db_module is None: - return None - - try: - base = getattr(db_module, "Base", None) - metadata = getattr(base, "metadata", None) if base is not None else None - candidate = getattr(metadata, "schema", None) if metadata is not None else None - if isinstance(candidate, str) and candidate.strip(): - return candidate.strip() - except Exception as exc: - logger.error(f"[DB Discover] Base metadata schema lookup failed: {exc}") - - try: - metadata_obj = getattr(db_module, "metadata_obj", None) - candidate = ( - getattr(metadata_obj, "schema", None) if metadata_obj is not None else None - ) - if isinstance(candidate, str) and candidate.strip(): - return candidate.strip() - except Exception as exc: - logger.error(f"[DB Discover] metadata_obj schema lookup failed: {exc}") - - try: - from open_webui import env as owui_env - - candidate = getattr(owui_env, "DATABASE_SCHEMA", None) - if isinstance(candidate, str) and candidate.strip(): - return candidate.strip() - except Exception as exc: - logger.error(f"[DB Discover] env schema lookup failed: {exc}") - - return None - - -owui_engine = _discover_owui_engine(owui_db) -owui_schema = _discover_owui_schema(owui_db) -owui_Base = getattr(owui_db, "Base", None) if owui_db is not None else None -if owui_Base is None: - owui_Base = declarative_base() - - -class CopilotSessionMap(owui_Base): - """Copilot Session Mapping Table""" - - __tablename__ = "copilot_session_map" - __table_args__ = ( - {"extend_existing": True, "schema": owui_schema} - if owui_schema - else {"extend_existing": True} - ) - - id = Column(Integer, primary_key=True, autoincrement=True) - chat_id = Column(String(255), unique=True, nullable=False, index=True) - copilot_session_id = Column(String(255), nullable=False) - updated_at = Column( - DateTime, - default=lambda: datetime.now(timezone.utc), - onupdate=lambda: datetime.now(timezone.utc), - ) - - -# 全局客户端存储 -_SHARED_CLIENT = None -_SHARED_TOKEN = "" -_CLIENT_LOCK = asyncio.Lock() + if params.min >= params.max: + raise ValueError("min 必须小于 max") + number = random.randint(params.min, params.max) + return f"生成的随机数: {number}" class Pipe: class Valves(BaseModel): GH_TOKEN: str = Field( - default="", description="GitHub 细粒度令牌 (需开启 'Copilot Requests' 权限)" + default="", + description="GitHub 细粒度 Token(需要 Copilot Requests 权限)", ) MODEL_ID: str = Field( - default="claude-sonnet-4.5", - description="默认使用的 Copilot 模型名称 (当无法动态获取时使用)", + default="gpt-5-mini", + description="默认 Copilot 模型名(动态获取失败时使用)", ) CLI_PATH: str = Field( default="/usr/local/bin/copilot", @@ -145,23 +62,31 @@ class Pipe: ) DEBUG: bool = Field( default=False, - description="开启技术调试日志 (连接信息等)", + description="启用技术调试日志(连接信息等)", + ) + LOG_LEVEL: str = Field( + default="error", + description="Copilot CLI 日志级别:none, error, warning, info, debug, all", ) SHOW_THINKING: bool = Field( default=True, description="显示模型推理/思考过程", ) + SHOW_WORKSPACE_INFO: bool = Field( + default=True, + description="调试模式下显示会话工作空间路径与摘要", + ) EXCLUDE_KEYWORDS: str = Field( default="", - description="排除包含这些关键词的模型 (逗号分隔,例如: codex, haiku)", + description="排除包含这些关键词的模型(逗号分隔,如:codex, haiku)", ) WORKSPACE_DIR: str = Field( default="", - description="文件操作的受限工作目录。如果为空,允许访问当前进程目录。", + description="文件操作的受限工作区目录;为空则使用当前进程目录", ) INFINITE_SESSION: bool = Field( default=True, - description="启用无限会话 (自动上下文压缩)", + description="启用无限会话(自动上下文压缩)", ) COMPACTION_THRESHOLD: float = Field( default=0.8, @@ -169,15 +94,58 @@ class Pipe: ) BUFFER_THRESHOLD: float = Field( default=0.95, - description="背景压缩缓冲区阈值 (0.0-1.0)", + description="缓冲区耗尽阈值 (0.0-1.0)", ) TIMEOUT: int = Field( default=300, - description="流式数据块超时时间 (秒)", + description="每个流式分块超时(秒)", + ) + CUSTOM_ENV_VARS: str = Field( + default="", + description='自定义环境变量(JSON 格式,例如 {"VAR": "value"})', + ) + ENABLE_TOOLS: bool = Field( + default=False, + description="启用自定义工具(例如:随机数)", + ) + AVAILABLE_TOOLS: str = Field( + default="all", + description="可用工具:'all' 或逗号分隔列表(例如:'generate_random_number')", + ) + REASONING_EFFORT: str = Field( + default="medium", + description="推理强度级别: low, medium, high. (gpt-5.2-codex 额外支持 xhigh)", + ) + ENFORCE_FORMATTING: bool = Field( + default=True, + description="在系统提示词中添加格式化指导,以提高输出的可读性(段落、换行、结构)。", + ) + + class UserValves(BaseModel): + REASONING_EFFORT: str = Field( + default="", + description="推理强度级别 (low, medium, high, xhigh)。留空以使用全局设置。", + ) + CLI_PATH: str = Field( + default="", + description="自定义 Copilot CLI 路径。留空以使用全局设置。", + ) + DEBUG: bool = Field( + default=False, + description="启用技术调试日志(连接信息等)", + ) + SHOW_THINKING: bool = Field( + default=True, + description="显示模型推理/思考过程", + ) + MODEL_ID: str = Field( + default="", + description="自定义模型 ID (例如 gpt-4o)。留空以使用全局默认值。", ) def __init__(self): self.type = "pipe" + self.id = "copilotsdk" self.name = "copilotsdk" self.valves = self.Valves() self.temp_dir = tempfile.mkdtemp(prefix="copilot_images_") @@ -190,17 +158,468 @@ class Pipe: except: pass - def _emit_debug_log(self, message: str): - """Emit debug log to frontend if DEBUG valve is enabled.""" - if self.valves.DEBUG: - print(f"[Copilot Pipe] {message}") + # ==================== 系统固定入口 ==================== + # pipe() 是 OpenWebUI 调用的稳定入口。 + # 将该部分放在前面,便于快速定位与维护。 + # ====================================================== + async def pipe( + self, + body: dict, + __metadata__: Optional[dict] = None, + __user__: Optional[dict] = None, + __event_emitter__=None, + __event_call__=None, + ) -> Union[str, AsyncGenerator]: + return await self._pipe_impl( + body, + __metadata__=__metadata__, + __user__=__user__, + __event_emitter__=__event_emitter__, + __event_call__=__event_call__, + ) + + # ==================== 功能性分区说明 ==================== + # 1) 工具注册:定义工具并在 _initialize_custom_tools 中注册 + # 2) 调试日志:_emit_debug_log / _emit_debug_log_sync + # 3) 提示词/会话:_extract_system_prompt / _build_session_config / _build_prompt + # 4) 运行流程:pipe() 负责请求,stream_response() 负责流式输出 + # ====================================================== + # ==================== 自定义工具示例 ==================== + # 工具注册:在模块级别添加 @define_tool 装饰的函数, + # 然后在 _initialize_custom_tools() 的 all_tools 字典中注册。 + + def _extract_text_from_content(self, content) -> str: + """从各种消息内容格式中提取文本内容""" + if isinstance(content, str): + return content + elif isinstance(content, list): + text_parts = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_parts.append(item.get("text", "")) + return " ".join(text_parts) + return "" + + def _apply_formatting_hint(self, prompt: str) -> str: + """在启用格式化时,向用户提示词追加轻量格式化要求。""" + if not self.valves.ENFORCE_FORMATTING: + return prompt + + if not prompt: + return prompt + + if "[格式化指南]" in prompt or "[格式化要求]" in prompt: + return prompt + + formatting_hint = ( + "\n\n[格式化要求]\n" "请使用清晰的段落与换行,必要时使用项目符号列表。" + ) + return f"{prompt}{formatting_hint}" + + def _dedupe_preserve_order(self, items: List[str]) -> List[str]: + """去重保序""" + seen = set() + result = [] + for item in items: + if not item or item in seen: + continue + seen.add(item) + result.append(item) + return result + + def _collect_model_ids( + self, body: dict, request_model: str, real_model_id: str + ) -> List[str]: + """收集可能的模型 ID(来自请求/metadata/body params)""" + model_ids: List[str] = [] + if request_model: + model_ids.append(request_model) + if request_model.startswith(f"{self.id}-"): + model_ids.append(request_model[len(f"{self.id}-") :]) + if real_model_id: + model_ids.append(real_model_id) + + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + meta_model_id = metadata.get("model_id") + if isinstance(meta_model, str): + model_ids.append(meta_model) + if isinstance(meta_model_id, str): + model_ids.append(meta_model_id) + + body_params = body.get("params", {}) + if isinstance(body_params, dict): + for key in ("model", "model_id", "modelId"): + val = body_params.get(key) + if isinstance(val, str): + model_ids.append(val) + + return self._dedupe_preserve_order(model_ids) + + async def _extract_system_prompt( + self, + body: dict, + messages: List[dict], + request_model: str, + real_model_id: str, + __event_call__=None, + ) -> tuple[Optional[str], str]: + """从 metadata/模型 DB/body/messages 提取系统提示词""" + system_prompt_content: Optional[str] = None + system_prompt_source = "" + + # 1) metadata.model.params.system + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + if isinstance(meta_model, dict): + meta_params = meta_model.get("params") + if isinstance(meta_params, dict) and meta_params.get("system"): + system_prompt_content = meta_params.get("system") + system_prompt_source = "metadata.model.params" + await self._emit_debug_log( + f"从 metadata.model.params 提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + + # 2) 模型 DB + if not system_prompt_content: + try: + from open_webui.models.models import Models + + model_ids_to_try = self._collect_model_ids( + body, request_model, real_model_id + ) + for mid in model_ids_to_try: + model_record = Models.get_model_by_id(mid) + if model_record and hasattr(model_record, "params"): + params = model_record.params + if isinstance(params, dict): + system_prompt_content = params.get("system") + if system_prompt_content: + system_prompt_source = f"model_db:{mid}" + await self._emit_debug_log( + f"从模型数据库提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + break + except Exception as e: + await self._emit_debug_log( + f"从模型数据库提取系统提示词失败: {e}", + __event_call__, + ) + + # 3) body.params.system + if not system_prompt_content: + body_params = body.get("params", {}) + if isinstance(body_params, dict): + system_prompt_content = body_params.get("system") + if system_prompt_content: + system_prompt_source = "body_params" + await self._emit_debug_log( + f"从 body.params 提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + + # 4) messages (role=system) + if not system_prompt_content: + for msg in messages: + if msg.get("role") == "system": + system_prompt_content = self._extract_text_from_content( + msg.get("content", "") + ) + if system_prompt_content: + system_prompt_source = "messages_system" + await self._emit_debug_log( + f"从消息中提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + break + + return system_prompt_content, system_prompt_source + + def _build_client_config(self, body: dict) -> dict: + """根据 Valves 和请求构建 CopilotClient 配置""" + cwd = self.valves.WORKSPACE_DIR if self.valves.WORKSPACE_DIR else os.getcwd() + client_config = {} + if os.environ.get("COPILOT_CLI_PATH"): + client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] + client_config["cwd"] = cwd + + if self.valves.LOG_LEVEL: + client_config["log_level"] = self.valves.LOG_LEVEL + + if self.valves.CUSTOM_ENV_VARS: + try: + custom_env = json.loads(self.valves.CUSTOM_ENV_VARS) + if isinstance(custom_env, dict): + client_config["env"] = custom_env + except: + pass + + return client_config + + def _build_session_config( + self, + chat_id: Optional[str], + real_model_id: str, + custom_tools: List[Any], + system_prompt_content: Optional[str], + is_streaming: bool, + reasoning_effort: str = "", + ): + """构建 Copilot SDK 的 SessionConfig""" + from copilot.types import SessionConfig, InfiniteSessionConfig + + infinite_session_config = None + if self.valves.INFINITE_SESSION: + infinite_session_config = InfiniteSessionConfig( + enabled=True, + background_compaction_threshold=self.valves.COMPACTION_THRESHOLD, + buffer_exhaustion_threshold=self.valves.BUFFER_THRESHOLD, + ) + + system_message_config = None + if system_prompt_content or self.valves.ENFORCE_FORMATTING: + # 构建系统消息内容 + system_parts = [] + + if system_prompt_content: + system_parts.append(system_prompt_content) + + if self.valves.ENFORCE_FORMATTING: + formatting_instruction = ( + "\n\n[格式化指南]\n" + "在提供解释或描述时:\n" + "- 使用清晰的段落分隔(双换行)\n" + "- 将长句拆分为多个短句\n" + "- 对多个要点使用项目符号或编号列表\n" + "- 为主要部分添加标题(##、###)\n" + "- 确保不同主题之间有适当的间距" + ) + system_parts.append(formatting_instruction) + logger.info(f"[ENFORCE_FORMATTING] 已添加格式化指导到系统提示词") + + if system_parts: + system_message_config = { + "mode": "append", + "content": "\n".join(system_parts), + } + + # 准备会话配置参数 + session_params = { + "session_id": chat_id if chat_id else None, + "model": real_model_id, + "streaming": is_streaming, + "tools": custom_tools, + "system_message": system_message_config, + "infinite_sessions": infinite_session_config, + } + + # 如果不是默认值(medium),添加 reasoning_effort + if reasoning_effort and reasoning_effort.lower() != "medium": + session_params["reasoning_effort"] = reasoning_effort.lower() + + return SessionConfig(**session_params) + + def _dedupe_preserve_order(self, items: List[str]) -> List[str]: + """去重保序""" + seen = set() + result = [] + for item in items: + if not item or item in seen: + continue + seen.add(item) + result.append(item) + return result + + def _collect_model_ids( + self, body: dict, request_model: str, real_model_id: str + ) -> List[str]: + """收集可能的模型 ID(来自请求/metadata/body params)""" + model_ids: List[str] = [] + if request_model: + model_ids.append(request_model) + if request_model.startswith(f"{self.id}-"): + model_ids.append(request_model[len(f"{self.id}-") :]) + if real_model_id: + model_ids.append(real_model_id) + + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + meta_model_id = metadata.get("model_id") + if isinstance(meta_model, str): + model_ids.append(meta_model) + if isinstance(meta_model_id, str): + model_ids.append(meta_model_id) + + body_params = body.get("params", {}) + if isinstance(body_params, dict): + for key in ("model", "model_id", "modelId"): + val = body_params.get(key) + if isinstance(val, str): + model_ids.append(val) + + return self._dedupe_preserve_order(model_ids) + + async def _extract_system_prompt( + self, + body: dict, + messages: List[dict], + request_model: str, + real_model_id: str, + __event_call__=None, + ) -> tuple[Optional[str], str]: + """从 metadata/模型 DB/body/messages 提取系统提示词""" + system_prompt_content: Optional[str] = None + system_prompt_source = "" + + # 1) metadata.model.params.system + metadata = body.get("metadata", {}) + if isinstance(metadata, dict): + meta_model = metadata.get("model") + if isinstance(meta_model, dict): + meta_params = meta_model.get("params") + if isinstance(meta_params, dict) and meta_params.get("system"): + system_prompt_content = meta_params.get("system") + system_prompt_source = "metadata.model.params" + await self._emit_debug_log( + f"从 metadata.model.params 提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + + # 2) 模型 DB + if not system_prompt_content: + try: + from open_webui.models.models import Models + + model_ids_to_try = self._collect_model_ids( + body, request_model, real_model_id + ) + for mid in model_ids_to_try: + model_record = Models.get_model_by_id(mid) + if model_record and hasattr(model_record, "params"): + params = model_record.params + if isinstance(params, dict): + system_prompt_content = params.get("system") + if system_prompt_content: + system_prompt_source = f"model_db:{mid}" + await self._emit_debug_log( + f"从模型数据库提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + break + except Exception as e: + await self._emit_debug_log( + f"从模型数据库提取系统提示词失败: {e}", + __event_call__, + ) + + # 3) body.params.system + if not system_prompt_content: + body_params = body.get("params", {}) + if isinstance(body_params, dict): + system_prompt_content = body_params.get("system") + if system_prompt_content: + system_prompt_source = "body_params" + await self._emit_debug_log( + f"从 body.params 提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + + # 4) messages (role=system) + if not system_prompt_content: + for msg in messages: + if msg.get("role") == "system": + system_prompt_content = self._extract_text_from_content( + msg.get("content", "") + ) + if system_prompt_content: + system_prompt_source = "messages_system" + await self._emit_debug_log( + f"从消息中提取系统提示词(长度: {len(system_prompt_content)})", + __event_call__, + ) + break + + return system_prompt_content, system_prompt_source + + def _initialize_custom_tools(self): + """根据配置初始化自定义工具""" + if not self.valves.ENABLE_TOOLS: + return [] + + # 定义所有可用工具(在此注册新工具) + all_tools = { + "generate_random_number": generate_random_number, + } + + # 根据配置过滤 + if self.valves.AVAILABLE_TOOLS == "all": + return list(all_tools.values()) + + # 仅启用指定的工具 + enabled = [t.strip() for t in self.valves.AVAILABLE_TOOLS.split(",")] + return [all_tools[name] for name in enabled if name in all_tools] + + async def _emit_debug_log(self, message: str, __event_call__=None): + """在 DEBUG 开启时将日志输出到前端控制台。""" + if not self.valves.DEBUG: + return + + logger.debug(f"[Copilot Pipe] {message}") + + if not __event_call__: + return + + try: + js_code = f""" + (async function() {{ + console.debug("%c[Copilot Pipe] " + {json.dumps(message, ensure_ascii=False)}, "color: #3b82f6;"); + }})(); + """ + await __event_call__({"type": "execute", "data": {"code": js_code}}) + except Exception as e: + logger.debug(f"[Copilot Pipe] 前端调试日志失败: {e}") + + def _emit_debug_log_sync(self, message: str, __event_call__=None): + """在非异步上下文中输出调试日志。""" + if not self.valves.DEBUG: + return + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + logger.debug(f"[Copilot Pipe] {message}") + return + + loop.create_task(self._emit_debug_log(message, __event_call__)) + + async def _emit_native_message(self, message_data: dict, __event_call__=None): + """发送原生 OpenAI 格式消息事件用于工具调用/结果。""" + if not __event_call__: + return + + try: + await __event_call__({"type": "message", "data": message_data}) + await self._emit_debug_log( + f"已发送原生消息: {message_data.get('role')} - {list(message_data.keys())}", + __event_call__, + ) + except Exception as e: + logger.warning(f"发送原生消息失败: {e}") + await self._emit_debug_log( + f"原生消息发送失败: {e}。回退到文本显示。", __event_call__ + ) def _get_user_context(self): - """Helper to get user context (placeholder for future use).""" + """获取用户上下文(占位,预留)。""" return {} def _get_chat_context( - self, body: dict, __metadata__: Optional[dict] = None + self, body: dict, __metadata__: Optional[dict] = None, __event_call__=None ) -> Dict[str, str]: """ 高度可靠的聊天上下文提取逻辑。 @@ -231,11 +650,15 @@ class Pipe: # 调试:记录 ID 来源 if chat_id: - self._emit_debug_log(f"提取到 ChatID: {chat_id} (来源: {source})") + self._emit_debug_log_sync( + f"提取到 ChatID: {chat_id} (来源: {source})", __event_call__ + ) else: # 如果还是没找到,记录一下 body 的键,方便排查 keys = list(body.keys()) if isinstance(body, dict) else "not a dict" - self._emit_debug_log(f"警告: 未能提取到 ChatID。Body 键: {keys}") + self._emit_debug_log_sync( + f"警告: 未能提取到 ChatID。Body 键: {keys}", __event_call__ + ) return { "chat_id": str(chat_id).strip(), @@ -247,14 +670,12 @@ class Pipe: if self._model_cache: return self._model_cache - self._emit_debug_log("正在动态获取模型列表...") + await self._emit_debug_log("正在动态获取模型列表...") try: self._setup_env() if not self.valves.GH_TOKEN: return [{"id": f"{self.id}-error", "name": "Error: GH_TOKEN not set"}] - from copilot import CopilotClient - client_config = {} if os.environ.get("COPILOT_CLI_PATH"): client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] @@ -335,12 +756,12 @@ class Pipe: {"id": m["id"], "name": m["name"]} for m in models_with_info ] - self._emit_debug_log( + await self._emit_debug_log( f"成功获取 {len(self._model_cache)} 个模型 (已过滤)" ) return self._model_cache except Exception as e: - self._emit_debug_log(f"获取模型列表失败: {e}") + await self._emit_debug_log(f"获取模型列表失败: {e}") # 失败时返回默认模型 return [ { @@ -351,7 +772,7 @@ class Pipe: finally: await client.stop() except Exception as e: - self._emit_debug_log(f"Pipes Error: {e}") + await self._emit_debug_log(f"Pipes Error: {e}") return [ { "id": f"{self.id}-{self.valves.MODEL_ID}", @@ -361,17 +782,32 @@ class Pipe: async def _get_client(self): """Helper to get or create a CopilotClient instance.""" - from copilot import CopilotClient + # 确定工作空间目录 + cwd = self.valves.WORKSPACE_DIR if self.valves.WORKSPACE_DIR else os.getcwd() client_config = {} if os.environ.get("COPILOT_CLI_PATH"): client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] + client_config["cwd"] = cwd + + # 设置日志级别 + if self.valves.LOG_LEVEL: + client_config["log_level"] = self.valves.LOG_LEVEL + + # 添加自定义环境变量 + if self.valves.CUSTOM_ENV_VARS: + try: + custom_env = json.loads(self.valves.CUSTOM_ENV_VARS) + if isinstance(custom_env, dict): + client_config["env"] = custom_env + except: + pass # 静默失败,因为这是同步方法且不应影响主流程 client = CopilotClient(client_config) await client.start() return client - def _setup_env(self): + def _setup_env(self, __event_call__=None): cli_path = self.valves.CLI_PATH found = False @@ -403,11 +839,19 @@ class Pipe: if cli_dir not in os.environ["PATH"]: os.environ["PATH"] = f"{cli_dir}:{os.environ['PATH']}" + if self.valves.DEBUG: + self._emit_debug_log_sync( + f"Copilot CLI 已定位: {cli_path}", __event_call__ + ) + if self.valves.GH_TOKEN: os.environ["GH_TOKEN"] = self.valves.GH_TOKEN os.environ["GITHUB_TOKEN"] = self.valves.GH_TOKEN + else: + if self.valves.DEBUG: + self._emit_debug_log_sync("Warning: GH_TOKEN 未设置。", __event_call__) - def _process_images(self, messages): + def _process_images(self, messages, __event_call__=None): attachments = [] text_content = "" if not messages: @@ -436,17 +880,83 @@ class Pipe: "display_name": file_name, } ) - self._emit_debug_log(f"Image processed: {file_path}") + self._emit_debug_log_sync( + f"Image processed: {file_path}", __event_call__ + ) except Exception as e: - self._emit_debug_log(f"Image error: {e}") + self._emit_debug_log_sync( + f"Image error: {e}", __event_call__ + ) else: text_content = str(content) return text_content, attachments - async def pipe( - self, body: dict, __metadata__: Optional[dict] = None + def _sync_copilot_config(self, reasoning_effort: str, __event_call__=None): + """ + 动态更新 ~/.copilot/config.json 中的 reasoning_effort 设置。 + 如果 API 注入被忽略,这提供了最后的保障。 + """ + if not reasoning_effort: + return + + effort = reasoning_effort + + # 检查模型对 xhigh 的支持 + # 目前仅 gpt-5.2-codex 支持 xhigh + # 在 _sync_copilot_config 中很难获得准确的当前模型 ID, + # 因此这里我们放宽限制,允许写入 xhigh。 + # 如果模型不支持,Copilot CLI 可能会忽略或降级处理,但这比在这里硬编码判断更安全, + # 因为获取当前请求的 body 需要修改函数签名。 + + try: + # 目标标准路径 ~/.copilot/config.json + config_path = os.path.expanduser("~/.copilot/config.json") + config_dir = os.path.dirname(config_path) + + # 仅在目录存在时执行(避免在错误环境创建垃圾文件) + if not os.path.exists(config_dir): + return + + data = {} + # 读取现有配置 + if os.path.exists(config_path): + try: + with open(config_path, "r") as f: + data = json.load(f) + except Exception: + data = {} + + # 如果值有变化则更新 + current_val = data.get("reasoning_effort") + if current_val != effort: + data["reasoning_effort"] = effort + try: + with open(config_path, "w") as f: + json.dump(data, f, indent=4) + + self._emit_debug_log_sync( + f"已动态更新 ~/.copilot/config.json: reasoning_effort='{effort}'", + __event_call__, + ) + except Exception as e: + self._emit_debug_log_sync( + f"写入 config.json 失败: {e}", __event_call__ + ) + except Exception as e: + self._emit_debug_log_sync(f"配置同步检查失败: {e}", __event_call__) + + # ==================== 内部实现 ==================== + # _pipe_impl() 包含主请求处理逻辑。 + # ================================================ + async def _pipe_impl( + self, + body: dict, + __metadata__: Optional[dict] = None, + __user__: Optional[dict] = None, + __event_emitter__=None, + __event_call__=None, ) -> Union[str, AsyncGenerator]: - self._setup_env() + self._setup_env(__event_call__) if not self.valves.GH_TOKEN: return "Error: 请在 Valves 中配置 GH_TOKEN。" @@ -454,97 +964,159 @@ class Pipe: request_model = body.get("model", "") real_model_id = self.valves.MODEL_ID # 默认值 + # 确定有效的推理强度和调试设置 + if __user__: + raw_valves = __user__.get("valves", {}) + if isinstance(raw_valves, self.UserValves): + user_valves = raw_valves + elif isinstance(raw_valves, dict): + user_valves = self.UserValves(**raw_valves) + else: + user_valves = self.UserValves() + else: + user_valves = self.UserValves() + effective_reasoning_effort = ( + user_valves.REASONING_EFFORT + if user_valves.REASONING_EFFORT + else self.valves.REASONING_EFFORT + ) + # 如果用户启用了 DEBUG,则覆盖全局设置 + if user_valves.DEBUG: + self.valves.DEBUG = True + + # 处理 SHOW_THINKING(优先使用用户设置) + show_thinking = ( + user_valves.SHOW_THINKING + if user_valves.SHOW_THINKING is not None + else self.valves.SHOW_THINKING + ) + if request_model.startswith(f"{self.id}-"): real_model_id = request_model[len(f"{self.id}-") :] - self._emit_debug_log(f"使用选择的模型: {real_model_id}") + await self._emit_debug_log( + f"使用选择的模型: {real_model_id}", __event_call__ + ) messages = body.get("messages", []) if not messages: return "No messages." # 使用改进的助手获取 Chat ID - chat_ctx = self._get_chat_context(body, __metadata__) + chat_ctx = self._get_chat_context(body, __metadata__, __event_call__) chat_id = chat_ctx.get("chat_id") + # 从多个来源提取系统提示词 + system_prompt_content, system_prompt_source = await self._extract_system_prompt( + body, messages, request_model, real_model_id, __event_call__ + ) + + if system_prompt_content: + preview = system_prompt_content[:60].replace("\n", " ") + await self._emit_debug_log( + f"系统提示词已确认(来源: {system_prompt_source}, 长度: {len(system_prompt_content)}, 预览: {preview})", + __event_call__, + ) + is_streaming = body.get("stream", False) - self._emit_debug_log(f"请求流式传输: {is_streaming}") - - last_text, attachments = self._process_images(messages) - - # 确定 Prompt 策略 - # 如果有 chat_id,尝试恢复会话。 - # 如果恢复成功,假设会话已有历史,只发送最后一条消息。 - # 如果是新会话,发送完整历史。 - - prompt = "" - is_new_session = True + await self._emit_debug_log(f"请求流式传输: {is_streaming}", __event_call__) + client = CopilotClient(self._build_client_config(body)) + should_stop_client = True try: - client = await self._get_client() + await client.start() + + # 初始化自定义工具 + custom_tools = self._initialize_custom_tools() + if custom_tools: + tool_names = [t.name for t in custom_tools] + await self._emit_debug_log( + f"已启用 {len(custom_tools)} 个自定义工具: {tool_names}", + __event_call__, + ) + session = None if chat_id: try: # 尝试直接使用 chat_id 作为 session_id 恢复会话 session = await client.resume_session(chat_id) - self._emit_debug_log(f"已通过 ChatID 恢复会话: {chat_id}") + await self._emit_debug_log( + f"已通过 ChatID 恢复会话: {chat_id}", __event_call__ + ) + + # 显示工作空间信息(如果可用) + if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO: + if session.workspace_path: + await self._emit_debug_log( + f"会话工作空间: {session.workspace_path}", + __event_call__, + ) + is_new_session = False - except Exception: + except Exception as e: # 恢复失败,磁盘上可能不存在该会话 - self._emit_debug_log( - f"会话 {chat_id} 不存在或已过期,将创建新会话。" + reasoning_effort = (effective_reasoning_effort,) + await self._emit_debug_log( + f"会话 {chat_id} 不存在或已过期 ({str(e)}),将创建新会话。", + __event_call__, ) session = None if session is None: - # 创建新会话 - from copilot.types import SessionConfig, InfiniteSessionConfig - - # 无限会话配置 - infinite_session_config = None - if self.valves.INFINITE_SESSION: - infinite_session_config = InfiniteSessionConfig( - enabled=True, - background_compaction_threshold=self.valves.COMPACTION_THRESHOLD, - buffer_exhaustion_threshold=self.valves.BUFFER_THRESHOLD, + session_config = self._build_session_config( + chat_id, + real_model_id, + custom_tools, + system_prompt_content, + is_streaming, + ) + if system_prompt_content: + await self._emit_debug_log( + f"配置系统消息(模式: append)", + __event_call__, ) - session_config = SessionConfig( - session_id=( - chat_id if chat_id else None - ), # 使用 chat_id 作为 session_id - model=real_model_id, - streaming=body.get("stream", False), - infinite_sessions=infinite_session_config, - ) + # 显示系统配置预览 + if system_prompt_content or self.valves.ENFORCE_FORMATTING: + preview_parts = [] + if system_prompt_content: + preview_parts.append( + f"自定义提示词: {system_prompt_content[:100]}..." + ) + if self.valves.ENFORCE_FORMATTING: + preview_parts.append("格式化指导: 已启用") + + if isinstance(session_config, dict): + system_config = session_config.get("system_message", {}) + else: + system_config = getattr(session_config, "system_message", None) + + if isinstance(system_config, dict): + full_content = system_config.get("content", "") + else: + full_content = "" + + await self._emit_debug_log( + f"系统消息配置 - {', '.join(preview_parts)} (总长度: {len(full_content)} 字符)", + __event_call__, + ) session = await client.create_session(config=session_config) # 获取新会话 ID new_sid = getattr(session, "session_id", getattr(session, "id", None)) - self._emit_debug_log(f"创建了新会话: {new_sid}") + await self._emit_debug_log(f"创建了新会话: {new_sid}", __event_call__) - # 构建 Prompt - if is_new_session: - # 新会话,发送完整历史 - full_conversation = [] - for msg in messages[:-1]: - role = msg.get("role", "user").upper() - content = msg.get("content", "") - if isinstance(content, list): - content = " ".join( - [ - c.get("text", "") - for c in content - if c.get("type") == "text" - ] + # 显示新会话的工作空间信息 + if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO: + if session.workspace_path: + await self._emit_debug_log( + f"会话工作空间: {session.workspace_path}", + __event_call__, ) - full_conversation.append(f"{role}: {content}") - full_conversation.append(f"User: {last_text}") - prompt = "\n\n".join(full_conversation) - else: - # 恢复的会话,只发送最后一条消息 - prompt = last_text + + # 构建 Prompt(基于会话:仅发送最新用户输入) + prompt = self._apply_formatting_hint(last_text) send_payload = {"prompt": prompt, "mode": "immediate"} if attachments: @@ -562,162 +1134,358 @@ class Pipe: else: init_msg = f"> [Debug] 已通过 ChatID 恢复会话: {chat_id}\n" - return self.stream_response(client, session, send_payload, init_msg) + return self.stream_response( + client, + session, + send_payload, + init_msg, + __event_call__, + reasoning_effort=effective_reasoning_effort, + show_thinking=show_thinking, + ) else: try: response = await session.send_and_wait(send_payload) return response.data.content if response else "Empty response." finally: - # 销毁会话对象以释放内存,但保留磁盘数据 - await session.destroy() - + # 清理:如果没有 chat_id(临时会话),销毁会话 + if not chat_id: + try: + await session.destroy() + except Exception as cleanup_error: + await self._emit_debug_log( + f"会话清理警告: {cleanup_error}", + __event_call__, + ) except Exception as e: - self._emit_debug_log(f"请求错误: {e}") + await self._emit_debug_log(f"请求错误: {e}", __event_call__) return f"Error: {str(e)}" - - async def stream_response( - self, client, session, send_payload, init_message: str = "" - ) -> AsyncGenerator: - queue = asyncio.Queue() - done = asyncio.Event() - self.thinking_started = False - has_content = False # 追踪是否已经输出了内容 - - def get_event_data(event, attr, default=""): - if hasattr(event, "data"): - data = event.data - if data is None: - return default - if isinstance(data, (str, int, float, bool)): - return str(data) if attr == "value" else default - - if isinstance(data, dict): - val = data.get(attr) - if val is None: - alt_attr = attr.replace("_", "") if "_" in attr else attr - val = data.get(alt_attr) - if val is None and "_" not in attr: - # 尝试将 camelCase 转换为 snake_case - import re - - snake_attr = re.sub(r"(? AsyncGenerator: + """ + 从 Copilot SDK 流式传输响应,处理各种事件类型。 + 遵循官方 SDK 模式进行事件处理和流式传输。 + """ + from copilot.generated.session_events import SessionEventType - # 处理消息内容 (增量或全量) - if event_type in [ - "assistant.message_delta", - "assistant.message.delta", - "assistant.message", - ]: - # 记录全量消息事件的特殊日志,帮助排查为什么没有 delta - if event_type == "assistant.message": - self._emit_debug_log( - f"收到全量消息事件 (非 Delta): {get_event_data(event, 'content')[:50]}..." - ) + queue = asyncio.Queue() + done = asyncio.Event() + SENTINEL = object() + # 使用本地状态来处理并发和跟踪 + state = {"thinking_started": False, "content_sent": False} + has_content = False # 追踪是否已经输出了内容 + active_tools = {} # 映射 tool_call_id 到工具名称 - delta = ( - get_event_data(event, "delta_content") - or get_event_data(event, "deltaContent") - or get_event_data(event, "content") - or get_event_data(event, "text") - ) + def get_event_type(event) -> str: + """提取事件类型为字符串,处理枚举和字符串类型。""" + if hasattr(event, "type"): + event_type = event.type + # 处理 SessionEventType 枚举 + if hasattr(event_type, "value"): + return event_type.value + return str(event_type) + return "unknown" + + def safe_get_data_attr(event, attr: str, default=None): + """ + 安全地从 event.data 提取属性。 + 同时处理字典访问和对象属性访问。 + """ + if not hasattr(event, "data") or event.data is None: + return default + + data = event.data + + # 首先尝试作为字典 + if isinstance(data, dict): + return data.get(attr, default) + + # 尝试作为对象属性 + return getattr(data, attr, default) + + def handler(event): + """ + 事件处理器,遵循官方 SDK 模式。 + 处理流式增量、推理、工具事件和会话状态。 + """ + event_type = get_event_type(event) + + # === 消息增量事件(主要流式内容)=== + if event_type == "assistant.message_delta": + # 官方:Python SDK 使用 event.data.delta_content + delta = safe_get_data_attr( + event, "delta_content" + ) or safe_get_data_attr(event, "deltaContent") if delta: - if self.thinking_started: + state["content_sent"] = True + if state["thinking_started"]: queue.put_nowait("\n
\n") - self.thinking_started = False + state["thinking_started"] = False queue.put_nowait(delta) - elif event_type in [ - "assistant.reasoning_delta", - "assistant.reasoning.delta", - "assistant.reasoning", - ]: - delta = ( - get_event_data(event, "delta_content") - or get_event_data(event, "deltaContent") - or get_event_data(event, "content") - or get_event_data(event, "text") + # === 完整消息事件(非流式响应) === + elif event_type == "assistant.message": + # 处理完整消息(当 SDK 返回完整内容而不是增量时) + content = safe_get_data_attr(event, "content") or safe_get_data_attr( + event, "message" ) + if content: + state["content_sent"] = True + if state["thinking_started"]: + queue.put_nowait("\n
\n") + state["thinking_started"] = False + queue.put_nowait(content) + + # === 推理增量事件(思维链流式传输)=== + elif event_type == "assistant.reasoning_delta": + delta = safe_get_data_attr( + event, "delta_content" + ) or safe_get_data_attr(event, "deltaContent") if delta: - if not self.thinking_started and self.valves.SHOW_THINKING: + # 如果内容已经开始,抑制迟到的推理 + if state["content_sent"]: + return + + if not state["thinking_started"] and show_thinking: queue.put_nowait("\n") - self.thinking_started = True - if self.thinking_started: + state["thinking_started"] = True + if state["thinking_started"]: queue.put_nowait(delta) - elif event_type == "tool.execution_start": - # 尝试多个可能的字段来获取工具名称或描述 - tool_name = ( - get_event_data(event, "toolName") - or get_event_data(event, "name") - or get_event_data(event, "description") - or get_event_data(event, "tool_name") - or "Unknown Tool" + # === 完整推理事件(非流式推理) === + elif event_type == "assistant.reasoning": + # 处理完整推理内容 + reasoning = safe_get_data_attr(event, "content") or safe_get_data_attr( + event, "reasoning" ) - if not self.thinking_started and self.valves.SHOW_THINKING: - queue.put_nowait("\n") - self.thinking_started = True - if self.thinking_started: - queue.put_nowait(f"\n正在运行工具: {tool_name}...\n") - self._emit_debug_log(f"Tool Start: {tool_name}") + if reasoning: + # 如果内容已经开始,抑制延迟到达的推理 + if state["content_sent"]: + return + + if not state["thinking_started"] and show_thinking: + queue.put_nowait("\n") + state["thinking_started"] = True + if state["thinking_started"]: + queue.put_nowait(reasoning) + + # === 工具执行事件 === + elif event_type == "tool.execution_start": + tool_name = ( + safe_get_data_attr(event, "name") + or safe_get_data_attr(event, "tool_name") + or "未知工具" + ) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + + # 获取工具参数 + tool_args = {} + try: + args_obj = safe_get_data_attr(event, "arguments") + if isinstance(args_obj, dict): + tool_args = args_obj + elif isinstance(args_obj, str): + tool_args = json.loads(args_obj) + except: + pass + + if tool_call_id: + active_tools[tool_call_id] = { + "name": tool_name, + "arguments": tool_args, + } + + # 在显示工具前关闭思考标签 + if state["thinking_started"]: + queue.put_nowait("\n\n") + state["thinking_started"] = False + + # 使用改进的格式展示工具调用 + if tool_args: + tool_args_json = json.dumps(tool_args, indent=2, ensure_ascii=False) + tool_display = f"\n\n
\n🔧 执行工具: {tool_name}\n\n**参数:**\n\n```json\n{tool_args_json}\n```\n\n
\n\n" + else: + tool_display = f"\n\n
\n🔧 执行工具: {tool_name}\n\n*无参数*\n\n
\n\n" + + queue.put_nowait(tool_display) + + self._emit_debug_log_sync(f"工具开始: {tool_name}", __event_call__) elif event_type == "tool.execution_complete": - if self.thinking_started: - queue.put_nowait("工具运行完成。\n") - self._emit_debug_log("Tool Complete") + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) + # 处理旧的字符串格式和新的字典格式 + if isinstance(tool_info, str): + tool_name = tool_info + elif isinstance(tool_info, dict): + tool_name = tool_info.get("name", "未知工具") + else: + tool_name = "未知工具" + + # 尝试获取结果内容 + result_content = "" + result_type = "success" + try: + result_obj = safe_get_data_attr(event, "result") + if hasattr(result_obj, "content"): + result_content = result_obj.content + elif isinstance(result_obj, dict): + result_content = result_obj.get("content", "") + result_type = result_obj.get("result_type", "success") + if not result_content: + # 如果没有 content 字段,尝试序列化整个字典 + result_content = json.dumps( + result_obj, indent=2, ensure_ascii=False + ) + except Exception as e: + self._emit_debug_log_sync(f"提取结果时出错: {e}", __event_call__) + result_type = "failure" + result_content = f"错误: {str(e)}" + + # 使用改进的格式展示工具结果 + if result_content: + status_icon = "✅" if result_type == "success" else "❌" + + # 尝试检测内容类型以便更好地格式化 + is_json = False + try: + json_obj = ( + json.loads(result_content) + if isinstance(result_content, str) + else result_content + ) + if isinstance(json_obj, (dict, list)): + result_content = json.dumps( + json_obj, indent=2, ensure_ascii=False + ) + is_json = True + except: + pass + + # 根据内容类型格式化 + if is_json: + # JSON 内容:使用代码块和语法高亮 + result_display = f"\n
\n{status_icon} 执行结果: {tool_name}\n\n```json\n{result_content}\n```\n\n
\n\n" + else: + # 纯文本:保留格式,不使用代码块 + result_display = f"\n
\n{status_icon} 执行结果: {tool_name}\n\n{result_content}\n\n
\n\n" + + queue.put_nowait(result_display) + + elif event_type == "tool.execution_progress": + # 工具执行进度更新(用于长时间运行的工具) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) + tool_name = ( + tool_info.get("name", "未知工具") + if isinstance(tool_info, dict) + else "未知工具" + ) + + progress = safe_get_data_attr(event, "progress", 0) + message = safe_get_data_attr(event, "message", "") + + if message: + progress_display = f"\n> 🔄 **{tool_name}**: {message}\n" + queue.put_nowait(progress_display) + + self._emit_debug_log_sync( + f"工具进度: {tool_name} - {progress}%", __event_call__ + ) + + elif event_type == "tool.execution_partial_result": + # 流式工具结果(用于增量输出的工具) + tool_call_id = safe_get_data_attr(event, "tool_call_id", "") + tool_info = active_tools.get(tool_call_id) + tool_name = ( + tool_info.get("name", "未知工具") + if isinstance(tool_info, dict) + else "未知工具" + ) + + partial_content = safe_get_data_attr(event, "content", "") + if partial_content: + queue.put_nowait(partial_content) + + self._emit_debug_log_sync(f"工具部分结果: {tool_name}", __event_call__) + + # === 使用统计事件 === + elif event_type == "assistant.usage": + # 当前助手回合的 token 使用量 + if self.valves.DEBUG: + input_tokens = safe_get_data_attr(event, "input_tokens", 0) + output_tokens = safe_get_data_attr(event, "output_tokens", 0) + total_tokens = safe_get_data_attr(event, "total_tokens", 0) + pass + + elif event_type == "session.usage_info": + # 会话累计使用信息 + pass + + # === 会话状态事件 === elif event_type == "session.compaction_start": - self._emit_debug_log("会话压缩开始") + self._emit_debug_log_sync("会话压缩已开始", __event_call__) elif event_type == "session.compaction_complete": - self._emit_debug_log("会话压缩完成") + self._emit_debug_log_sync("会话压缩已完成", __event_call__) elif event_type == "session.idle": + # 会话处理完成 - 发出完成信号 done.set() + try: + queue.put_nowait(SENTINEL) + except: + pass + elif event_type == "session.error": - msg = get_event_data(event, "message", "Unknown Error") - queue.put_nowait(f"\n[Error: {msg}]") + error_msg = safe_get_data_attr(event, "message", "未知错误") + queue.put_nowait(f"\n[错误: {error_msg}]") done.set() + try: + queue.put_nowait(SENTINEL) + except: + pass unsubscribe = session.on(handler) - await session.send(send_payload) - if self.valves.DEBUG: - yield "\n" - if init_message: - yield init_message - yield "> [Debug] 连接已建立,等待响应...\n" - self.thinking_started = True + self._emit_debug_log_sync(f"已订阅事件。正在发送请求...", __event_call__) + + # 使用 asyncio.create_task 防止 session.send 阻塞流读取 + # 如果 SDK 实现等待完成。 + send_task = asyncio.create_task(session.send(send_payload)) + self._emit_debug_log_sync(f"Prompt 已发送 (异步任务已启动)", __event_call__) + + # 安全的初始 yield,带错误处理 + try: + if self.valves.DEBUG: + yield "\n" + if init_message: + yield init_message + + if reasoning_effort and reasoning_effort != "off": + yield f"> [Debug] 已注入推理强度 (Reasoning Effort): {reasoning_effort.upper()}\n" + + yield "> [Debug] 连接已建立,等待响应...\n" + self.thinking_started = True + except Exception as e: + # 如果初始 yield 失败,记录但继续处理 + self._emit_debug_log_sync(f"初始 yield 警告: {e}", __event_call__) try: while not done.is_set(): @@ -725,32 +1493,61 @@ class Pipe: chunk = await asyncio.wait_for( queue.get(), timeout=float(self.valves.TIMEOUT) ) + if chunk is SENTINEL: + break if chunk: has_content = True - yield chunk + try: + yield chunk + except Exception as yield_error: + # 客户端关闭连接,优雅停止 + self._emit_debug_log_sync( + f"Yield 错误(客户端断开连接?): {yield_error}", + __event_call__, + ) + break except asyncio.TimeoutError: if done.is_set(): break if self.thinking_started: - yield f"> [Debug] 等待响应中 (已超过 {self.valves.TIMEOUT} 秒)...\n" + try: + yield f"> [Debug] 等待响应中 (已超过 {self.valves.TIMEOUT} 秒)...\n" + except: + # 如果超时期间 yield 失败,连接已断开 + break continue while not queue.empty(): chunk = queue.get_nowait() + if chunk is SENTINEL: + break if chunk: has_content = True - yield chunk + try: + yield chunk + except: + # 连接关闭,停止 yielding + break if self.thinking_started: - yield "\n\n" - has_content = True + try: + yield "\n\n" + has_content = True + except: + pass # 连接已关闭 # 核心修复:如果整个过程没有任何输出,返回一个提示,防止 OpenWebUI 报错 if not has_content: - yield "⚠️ Copilot 未返回任何内容。请检查模型 ID 是否正确,或尝试在 Valves 中开启 DEBUG 模式查看详细日志。" + try: + yield "⚠️ Copilot 未返回任何内容。请检查模型 ID 是否正确,或尝试在 Valves 中开启 DEBUG 模式查看详细日志。" + except: + pass # 连接已关闭 except Exception as e: - yield f"\n[Stream Error: {str(e)}]" + try: + yield f"\n[Stream Error: {str(e)}]" + except: + pass # 连接已关闭 finally: unsubscribe() # 销毁会话对象以释放内存,但保留磁盘数据 diff --git a/scripts/openwebui_community_client.py b/scripts/openwebui_community_client.py index 110a9cc..0f34b8e 100644 --- a/scripts/openwebui_community_client.py +++ b/scripts/openwebui_community_client.py @@ -473,10 +473,27 @@ class OpenWebUICommunityClient: # 查找 README readme_content = self._find_readme(file_path) + # 获取远程帖子信息(提前获取,用于判断是否需要上传图片) + remote_post = None + if post_id: + remote_post = self.get_post(post_id) + # 查找并上传图片 media_urls = None image_path = self._find_image(file_path) - if image_path: + + # 决定是否上传图片 + should_upload_image = True + if remote_post: + remote_media = remote_post.get("media", []) + if remote_media and len(remote_media) > 0: + # 远程已有图片,跳过上传以避免覆盖(防止出现空白图片问题) + print( + f" ℹ️ Remote post already has images. Skipping auto-upload to preserve existing media." + ) + should_upload_image = False + + if image_path and should_upload_image: print(f" Found image: {os.path.basename(image_path)}") image_url = self.upload_image(image_path) if image_url: @@ -500,7 +517,8 @@ class OpenWebUICommunityClient: post_id = existing_post.get("id") print(f" Found existing post: {title} (ID: {post_id})") self._inject_id_to_file(file_path, post_id) - # post_id 已设置,后续将进入更新流程 + # post_id 已设置,重新获取 remote_post 以便后续版本检查 + remote_post = self.get_post(post_id) else: # 2. 如果没找到,且允许自动创建,则创建 @@ -522,11 +540,6 @@ class OpenWebUICommunityClient: return True, f"Created new post (ID: {new_post_id})" return False, "Failed to create new post" - # 获取远程帖子信息(只需获取一次) - remote_post = None - if post_id: - remote_post = self.get_post(post_id) - # 版本检查(仅对更新有效) if not force and local_version and remote_post: remote_version = (