- Fix thinking visibility by passing user overrides into streaming - Harden UserValves handling for mapping/instance inputs - Update bilingual README with per-user valves and troubleshooting
1450 lines
57 KiB
Python
1450 lines
57 KiB
Python
"""
|
|
title: GitHub Copilot Official SDK Pipe
|
|
author: Fu-Jie
|
|
author_url: https://github.com/Fu-Jie/awesome-openwebui
|
|
funding_url: https://github.com/open-webui
|
|
openwebui_id: ce96f7b4-12fc-4ac3-9a01-875713e69359
|
|
description: Integrate GitHub Copilot SDK. Supports dynamic models, multi-turn conversation, streaming, multimodal input, infinite sessions, and frontend debug logging.
|
|
version: 0.2.3
|
|
requirements: github-copilot-sdk
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
import tempfile
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
from typing import Optional, Union, AsyncGenerator, List, Any, Dict
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Import copilot SDK modules
|
|
from copilot import CopilotClient, define_tool
|
|
|
|
# Setup logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RandomNumberParams(BaseModel):
|
|
min: int = Field(description="Minimum value (inclusive)")
|
|
max: int = Field(description="Maximum value (inclusive)")
|
|
|
|
|
|
@define_tool(description="Generate a random integer within a specified range.")
|
|
async def generate_random_number(params: RandomNumberParams) -> str:
|
|
import random
|
|
|
|
if params.min >= params.max:
|
|
raise ValueError("min must be less than max")
|
|
number = random.randint(params.min, params.max)
|
|
return f"Generated random number: {number}"
|
|
|
|
|
|
class Pipe:
|
|
class Valves(BaseModel):
|
|
GH_TOKEN: str = Field(
|
|
default="",
|
|
description="GitHub Fine-grained Token (Requires 'Copilot Requests' permission)",
|
|
)
|
|
MODEL_ID: str = Field(
|
|
default="gpt-5-mini",
|
|
description="Default Copilot model name (used when dynamic fetching fails)",
|
|
)
|
|
CLI_PATH: str = Field(
|
|
default="/usr/local/bin/copilot",
|
|
description="Path to Copilot CLI",
|
|
)
|
|
DEBUG: bool = Field(
|
|
default=False,
|
|
description="Enable technical debug logs (connection info, etc.)",
|
|
)
|
|
LOG_LEVEL: str = Field(
|
|
default="error",
|
|
description="Copilot CLI log level: none, error, warning, info, debug, all",
|
|
)
|
|
SHOW_THINKING: bool = Field(
|
|
default=True,
|
|
description="Show model reasoning/thinking process",
|
|
)
|
|
SHOW_WORKSPACE_INFO: bool = Field(
|
|
default=True,
|
|
description="Show session workspace path and summary in debug mode",
|
|
)
|
|
EXCLUDE_KEYWORDS: str = Field(
|
|
default="",
|
|
description="Exclude models containing these keywords (comma separated, e.g.: codex, haiku)",
|
|
)
|
|
WORKSPACE_DIR: str = Field(
|
|
default="",
|
|
description="Restricted workspace directory for file operations. If empty, allows access to the current process directory.",
|
|
)
|
|
INFINITE_SESSION: bool = Field(
|
|
default=True,
|
|
description="Enable Infinite Sessions (automatic context compaction)",
|
|
)
|
|
COMPACTION_THRESHOLD: float = Field(
|
|
default=0.8,
|
|
description="Background compaction threshold (0.0-1.0)",
|
|
)
|
|
BUFFER_THRESHOLD: float = Field(
|
|
default=0.95,
|
|
description="Buffer exhaustion threshold (0.0-1.0)",
|
|
)
|
|
TIMEOUT: int = Field(
|
|
default=300,
|
|
description="Timeout for each stream chunk (seconds)",
|
|
)
|
|
CUSTOM_ENV_VARS: str = Field(
|
|
default="",
|
|
description='Custom environment variables (JSON format, e.g., {"VAR": "value"})',
|
|
)
|
|
ENABLE_TOOLS: bool = Field(
|
|
default=False,
|
|
description="Enable custom tools (example: random number)",
|
|
)
|
|
AVAILABLE_TOOLS: str = Field(
|
|
default="all",
|
|
description="Available tools: 'all' or comma-separated list (e.g., 'generate_random_number')",
|
|
)
|
|
REASONING_EFFORT: str = Field(
|
|
default="medium",
|
|
description="Reasoning effort level: low, medium, high. (gpt-5.2-codex also supports xhigh)",
|
|
)
|
|
ENFORCE_FORMATTING: bool = Field(
|
|
default=True,
|
|
description="Add formatting instructions to system prompt for better readability (paragraphs, line breaks, structure).",
|
|
)
|
|
|
|
class UserValves(BaseModel):
|
|
REASONING_EFFORT: str = Field(
|
|
default="",
|
|
description="Reasoning effort level (low, medium, high, xhigh). Leave empty to use global setting.",
|
|
)
|
|
CLI_PATH: str = Field(
|
|
default="",
|
|
description="Custom path to Copilot CLI. Leave empty to use global setting.",
|
|
)
|
|
DEBUG: bool = Field(
|
|
default=False,
|
|
description="Enable technical debug logs (connection info, etc.)",
|
|
)
|
|
SHOW_THINKING: bool = Field(
|
|
default=True,
|
|
description="Show model reasoning/thinking process",
|
|
)
|
|
MODEL_ID: str = Field(
|
|
default="",
|
|
description="Custom model ID (e.g. gpt-4o). Leave empty to use global default.",
|
|
)
|
|
|
|
def __init__(self):
|
|
self.type = "pipe"
|
|
self.id = "copilotsdk"
|
|
self.name = "copilotsdk"
|
|
self.valves = self.Valves()
|
|
self.temp_dir = tempfile.mkdtemp(prefix="copilot_images_")
|
|
self.thinking_started = False
|
|
self._model_cache = [] # Model list cache
|
|
|
|
def __del__(self):
|
|
try:
|
|
shutil.rmtree(self.temp_dir)
|
|
except:
|
|
pass
|
|
|
|
# ==================== Fixed System Entry ====================
|
|
# pipe() is the stable entry point used by OpenWebUI to handle requests.
|
|
# Keep this section near the top for quick navigation and maintenance.
|
|
# =============================================================
|
|
async def pipe(
|
|
self,
|
|
body: dict,
|
|
__metadata__: Optional[dict] = None,
|
|
__user__: Optional[dict] = None,
|
|
__event_emitter__=None,
|
|
__event_call__=None,
|
|
) -> Union[str, AsyncGenerator]:
|
|
return await self._pipe_impl(
|
|
body,
|
|
__metadata__=__metadata__,
|
|
__user__=__user__,
|
|
__event_emitter__=__event_emitter__,
|
|
__event_call__=__event_call__,
|
|
)
|
|
|
|
# ==================== Functional Areas ====================
|
|
# 1) Tool registration: define tools and register in _initialize_custom_tools
|
|
# 2) Debug logging: _emit_debug_log / _emit_debug_log_sync
|
|
# 3) Prompt/session: _extract_system_prompt / _build_session_config / _build_prompt
|
|
# 4) Runtime flow: pipe() for request, stream_response() for streaming
|
|
# ============================================================
|
|
# ==================== Custom Tool Examples ====================
|
|
# Tool registration: Add @define_tool decorated functions at module level,
|
|
# then register them in _initialize_custom_tools() -> all_tools dict.
|
|
def _initialize_custom_tools(self):
|
|
"""Initialize custom tools based on configuration"""
|
|
if not self.valves.ENABLE_TOOLS:
|
|
return []
|
|
|
|
# Define all available tools (register new tools here)
|
|
all_tools = {
|
|
"generate_random_number": generate_random_number,
|
|
}
|
|
|
|
# Filter based on configuration
|
|
if self.valves.AVAILABLE_TOOLS == "all":
|
|
return list(all_tools.values())
|
|
|
|
# Only enable specified tools
|
|
enabled = [t.strip() for t in self.valves.AVAILABLE_TOOLS.split(",")]
|
|
return [all_tools[name] for name in enabled if name in all_tools]
|
|
|
|
async def _emit_debug_log(self, message: str, __event_call__=None):
|
|
"""Emit debug log to frontend (console) when DEBUG is enabled."""
|
|
# Check user config first if available (will need to be passed down or stored)
|
|
# For now we only check global valves in this helper, but in pipe implementation we respect user setting.
|
|
# This helper might need refactoring to accept user_debug_setting
|
|
if not self.valves.DEBUG:
|
|
return
|
|
|
|
logger.debug(f"[Copilot Pipe] {message}")
|
|
|
|
if not __event_call__:
|
|
return
|
|
|
|
try:
|
|
js_code = f"""
|
|
(async function() {{
|
|
console.debug("%c[Copilot Pipe] " + {json.dumps(message, ensure_ascii=False)}, "color: #3b82f6;");
|
|
}})();
|
|
"""
|
|
await __event_call__({"type": "execute", "data": {"code": js_code}})
|
|
except Exception as e:
|
|
logger.debug(f"[Copilot Pipe] Frontend debug log failed: {e}")
|
|
|
|
def _emit_debug_log_sync(self, message: str, __event_call__=None):
|
|
"""Sync wrapper for debug logging in non-async contexts."""
|
|
if not self.valves.DEBUG:
|
|
return
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
logger.debug(f"[Copilot Pipe] {message}")
|
|
return
|
|
|
|
loop.create_task(self._emit_debug_log(message, __event_call__))
|
|
|
|
def _extract_text_from_content(self, content) -> str:
|
|
"""Extract text content from various message content formats."""
|
|
if isinstance(content, str):
|
|
return content
|
|
elif isinstance(content, list):
|
|
text_parts = []
|
|
for item in content:
|
|
if isinstance(item, dict) and item.get("type") == "text":
|
|
text_parts.append(item.get("text", ""))
|
|
return " ".join(text_parts)
|
|
return ""
|
|
|
|
def _apply_formatting_hint(self, prompt: str) -> str:
|
|
"""Append a lightweight formatting hint to the user prompt when enabled."""
|
|
if not self.valves.ENFORCE_FORMATTING:
|
|
return prompt
|
|
|
|
if not prompt:
|
|
return prompt
|
|
|
|
if "[Formatting Guidelines]" in prompt or "[Formatting Request]" in prompt:
|
|
return prompt
|
|
|
|
formatting_hint = (
|
|
"\n\n[Formatting Request]\n"
|
|
"Please format your response with clear paragraph breaks, short sentences, "
|
|
"and bullet lists when appropriate."
|
|
)
|
|
return f"{prompt}{formatting_hint}"
|
|
|
|
def _dedupe_preserve_order(self, items: List[str]) -> List[str]:
|
|
"""Deduplicate while preserving order."""
|
|
seen = set()
|
|
result = []
|
|
for item in items:
|
|
if not item or item in seen:
|
|
continue
|
|
seen.add(item)
|
|
result.append(item)
|
|
return result
|
|
|
|
def _collect_model_ids(
|
|
self, body: dict, request_model: str, real_model_id: str
|
|
) -> List[str]:
|
|
"""Collect possible model IDs from request/metadata/body params."""
|
|
model_ids: List[str] = []
|
|
if request_model:
|
|
model_ids.append(request_model)
|
|
if request_model.startswith(f"{self.id}-"):
|
|
model_ids.append(request_model[len(f"{self.id}-") :])
|
|
if real_model_id:
|
|
model_ids.append(real_model_id)
|
|
|
|
metadata = body.get("metadata", {})
|
|
if isinstance(metadata, dict):
|
|
meta_model = metadata.get("model")
|
|
meta_model_id = metadata.get("model_id")
|
|
if isinstance(meta_model, str):
|
|
model_ids.append(meta_model)
|
|
if isinstance(meta_model_id, str):
|
|
model_ids.append(meta_model_id)
|
|
|
|
body_params = body.get("params", {})
|
|
if isinstance(body_params, dict):
|
|
for key in ("model", "model_id", "modelId"):
|
|
val = body_params.get(key)
|
|
if isinstance(val, str):
|
|
model_ids.append(val)
|
|
|
|
return self._dedupe_preserve_order(model_ids)
|
|
|
|
async def _extract_system_prompt(
|
|
self,
|
|
body: dict,
|
|
messages: List[dict],
|
|
request_model: str,
|
|
real_model_id: str,
|
|
__event_call__=None,
|
|
) -> tuple[Optional[str], str]:
|
|
"""Extract system prompt from metadata/model DB/body/messages."""
|
|
system_prompt_content: Optional[str] = None
|
|
system_prompt_source = ""
|
|
|
|
# 1) metadata.model.params.system
|
|
metadata = body.get("metadata", {})
|
|
if isinstance(metadata, dict):
|
|
meta_model = metadata.get("model")
|
|
if isinstance(meta_model, dict):
|
|
meta_params = meta_model.get("params")
|
|
if isinstance(meta_params, dict) and meta_params.get("system"):
|
|
system_prompt_content = meta_params.get("system")
|
|
system_prompt_source = "metadata.model.params"
|
|
await self._emit_debug_log(
|
|
f"Extracted system prompt from metadata.model.params (length: {len(system_prompt_content)})",
|
|
__event_call__,
|
|
)
|
|
|
|
# 2) model DB lookup
|
|
if not system_prompt_content:
|
|
try:
|
|
from open_webui.models.models import Models
|
|
|
|
model_ids_to_try = self._collect_model_ids(
|
|
body, request_model, real_model_id
|
|
)
|
|
for mid in model_ids_to_try:
|
|
model_record = Models.get_model_by_id(mid)
|
|
if model_record and hasattr(model_record, "params"):
|
|
params = model_record.params
|
|
if isinstance(params, dict):
|
|
system_prompt_content = params.get("system")
|
|
if system_prompt_content:
|
|
system_prompt_source = f"model_db:{mid}"
|
|
await self._emit_debug_log(
|
|
f"Extracted system prompt from model DB (length: {len(system_prompt_content)})",
|
|
__event_call__,
|
|
)
|
|
break
|
|
except Exception as e:
|
|
await self._emit_debug_log(
|
|
f"Failed to extract system prompt from model DB: {e}",
|
|
__event_call__,
|
|
)
|
|
|
|
# 3) body.params.system
|
|
if not system_prompt_content:
|
|
body_params = body.get("params", {})
|
|
if isinstance(body_params, dict):
|
|
system_prompt_content = body_params.get("system")
|
|
if system_prompt_content:
|
|
system_prompt_source = "body_params"
|
|
await self._emit_debug_log(
|
|
f"Extracted system prompt from body.params (length: {len(system_prompt_content)})",
|
|
__event_call__,
|
|
)
|
|
|
|
# 4) messages (role=system)
|
|
if not system_prompt_content:
|
|
for msg in messages:
|
|
if msg.get("role") == "system":
|
|
system_prompt_content = self._extract_text_from_content(
|
|
msg.get("content", "")
|
|
)
|
|
if system_prompt_content:
|
|
system_prompt_source = "messages_system"
|
|
await self._emit_debug_log(
|
|
f"Extracted system prompt from messages (length: {len(system_prompt_content)})",
|
|
__event_call__,
|
|
)
|
|
break
|
|
|
|
return system_prompt_content, system_prompt_source
|
|
|
|
def _build_client_config(self, body: dict) -> dict:
|
|
"""Build CopilotClient config from valves and request body."""
|
|
cwd = self.valves.WORKSPACE_DIR if self.valves.WORKSPACE_DIR else os.getcwd()
|
|
client_config = {}
|
|
if os.environ.get("COPILOT_CLI_PATH"):
|
|
client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"]
|
|
client_config["cwd"] = cwd
|
|
|
|
if self.valves.LOG_LEVEL:
|
|
client_config["log_level"] = self.valves.LOG_LEVEL
|
|
|
|
if self.valves.CUSTOM_ENV_VARS:
|
|
try:
|
|
custom_env = json.loads(self.valves.CUSTOM_ENV_VARS)
|
|
if isinstance(custom_env, dict):
|
|
client_config["env"] = custom_env
|
|
except:
|
|
pass
|
|
|
|
return client_config
|
|
|
|
def _build_session_config(
|
|
self,
|
|
chat_id: Optional[str],
|
|
real_model_id: str,
|
|
custom_tools: List[Any],
|
|
system_prompt_content: Optional[str],
|
|
is_streaming: bool,
|
|
reasoning_effort: str = "",
|
|
):
|
|
"""Build SessionConfig for Copilot SDK."""
|
|
from copilot.types import SessionConfig, InfiniteSessionConfig
|
|
|
|
infinite_session_config = None
|
|
if self.valves.INFINITE_SESSION:
|
|
infinite_session_config = InfiniteSessionConfig(
|
|
enabled=True,
|
|
background_compaction_threshold=self.valves.COMPACTION_THRESHOLD,
|
|
buffer_exhaustion_threshold=self.valves.BUFFER_THRESHOLD,
|
|
)
|
|
|
|
system_message_config = None
|
|
if system_prompt_content or self.valves.ENFORCE_FORMATTING:
|
|
# Build system message content
|
|
system_parts = []
|
|
|
|
if system_prompt_content:
|
|
system_parts.append(system_prompt_content)
|
|
|
|
if self.valves.ENFORCE_FORMATTING:
|
|
formatting_instruction = (
|
|
"\n\n[Formatting Guidelines]\n"
|
|
"When providing explanations or descriptions:\n"
|
|
"- Use clear paragraph breaks (double line breaks)\n"
|
|
"- Break long sentences into multiple shorter ones\n"
|
|
"- Use bullet points or numbered lists for multiple items\n"
|
|
"- Add headings (##, ###) for major sections\n"
|
|
"- Ensure proper spacing between different topics"
|
|
)
|
|
system_parts.append(formatting_instruction)
|
|
logger.info(
|
|
f"[ENFORCE_FORMATTING] Added formatting instructions to system prompt"
|
|
)
|
|
|
|
if system_parts:
|
|
system_message_config = {
|
|
"mode": "append",
|
|
"content": "\n".join(system_parts),
|
|
}
|
|
|
|
# Prepare session config parameters
|
|
session_params = {
|
|
"session_id": chat_id if chat_id else None,
|
|
"model": real_model_id,
|
|
"streaming": is_streaming,
|
|
"tools": custom_tools,
|
|
"system_message": system_message_config,
|
|
"infinite_sessions": infinite_session_config,
|
|
}
|
|
|
|
# Add reasoning_effort if not default (medium)
|
|
if reasoning_effort and reasoning_effort.lower() != "medium":
|
|
session_params["reasoning_effort"] = reasoning_effort.lower()
|
|
|
|
return SessionConfig(**session_params)
|
|
|
|
def _get_user_context(self):
|
|
"""Helper to get user context (placeholder for future use)."""
|
|
return {}
|
|
|
|
def _get_chat_context(
|
|
self, body: dict, __metadata__: Optional[dict] = None, __event_call__=None
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Highly reliable chat context extraction logic.
|
|
Priority: __metadata__ > body['chat_id'] > body['metadata']['chat_id']
|
|
"""
|
|
chat_id = ""
|
|
source = "none"
|
|
|
|
# 1. Prioritize __metadata__ (most reliable source injected by OpenWebUI)
|
|
if __metadata__ and isinstance(__metadata__, dict):
|
|
chat_id = __metadata__.get("chat_id", "")
|
|
if chat_id:
|
|
source = "__metadata__"
|
|
|
|
# 2. Then try body root
|
|
if not chat_id and isinstance(body, dict):
|
|
chat_id = body.get("chat_id", "")
|
|
if chat_id:
|
|
source = "body_root"
|
|
|
|
# 3. Finally try body.metadata
|
|
if not chat_id and isinstance(body, dict):
|
|
body_metadata = body.get("metadata", {})
|
|
if isinstance(body_metadata, dict):
|
|
chat_id = body_metadata.get("chat_id", "")
|
|
if chat_id:
|
|
source = "body_metadata"
|
|
|
|
# Debug: Log ID source
|
|
if chat_id:
|
|
self._emit_debug_log_sync(
|
|
f"Extracted ChatID: {chat_id} (Source: {source})", __event_call__
|
|
)
|
|
else:
|
|
# If still not found, log body keys for troubleshooting
|
|
keys = list(body.keys()) if isinstance(body, dict) else "not a dict"
|
|
self._emit_debug_log_sync(
|
|
f"Warning: Failed to extract ChatID. Body keys: {keys}",
|
|
__event_call__,
|
|
)
|
|
|
|
return {
|
|
"chat_id": str(chat_id).strip(),
|
|
}
|
|
|
|
async def pipes(self) -> List[dict]:
|
|
"""Dynamically fetch model list"""
|
|
# Return cache if available
|
|
if self._model_cache:
|
|
return self._model_cache
|
|
|
|
await self._emit_debug_log("Fetching model list dynamically...")
|
|
try:
|
|
self._setup_env()
|
|
if not self.valves.GH_TOKEN:
|
|
return [{"id": f"{self.id}-error", "name": "Error: GH_TOKEN not set"}]
|
|
|
|
client_config = {}
|
|
if os.environ.get("COPILOT_CLI_PATH"):
|
|
client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"]
|
|
|
|
client = CopilotClient(client_config)
|
|
try:
|
|
await client.start()
|
|
models = await client.list_models()
|
|
|
|
# Update cache
|
|
self._model_cache = []
|
|
exclude_list = [
|
|
k.strip().lower()
|
|
for k in self.valves.EXCLUDE_KEYWORDS.split(",")
|
|
if k.strip()
|
|
]
|
|
|
|
models_with_info = []
|
|
for m in models:
|
|
# Compatible with dict and object access
|
|
m_id = (
|
|
m.get("id") if isinstance(m, dict) else getattr(m, "id", str(m))
|
|
)
|
|
m_name = (
|
|
m.get("name")
|
|
if isinstance(m, dict)
|
|
else getattr(m, "name", m_id)
|
|
)
|
|
m_policy = (
|
|
m.get("policy")
|
|
if isinstance(m, dict)
|
|
else getattr(m, "policy", {})
|
|
)
|
|
m_billing = (
|
|
m.get("billing")
|
|
if isinstance(m, dict)
|
|
else getattr(m, "billing", {})
|
|
)
|
|
|
|
# Check policy state
|
|
state = (
|
|
m_policy.get("state")
|
|
if isinstance(m_policy, dict)
|
|
else getattr(m_policy, "state", "enabled")
|
|
)
|
|
if state == "disabled":
|
|
continue
|
|
|
|
# Filtering logic
|
|
if any(kw in m_id.lower() for kw in exclude_list):
|
|
continue
|
|
|
|
# Get multiplier
|
|
multiplier = (
|
|
m_billing.get("multiplier", 1)
|
|
if isinstance(m_billing, dict)
|
|
else getattr(m_billing, "multiplier", 1)
|
|
)
|
|
|
|
# Format display name
|
|
if multiplier == 0:
|
|
display_name = f"-🔥 {m_id} (unlimited)"
|
|
else:
|
|
display_name = f"-{m_id} ({multiplier}x)"
|
|
|
|
models_with_info.append(
|
|
{
|
|
"id": f"{self.id}-{m_id}",
|
|
"name": display_name,
|
|
"multiplier": multiplier,
|
|
"raw_id": m_id,
|
|
}
|
|
)
|
|
|
|
# Sort: multiplier ascending, then raw_id ascending
|
|
models_with_info.sort(key=lambda x: (x["multiplier"], x["raw_id"]))
|
|
self._model_cache = [
|
|
{"id": m["id"], "name": m["name"]} for m in models_with_info
|
|
]
|
|
|
|
await self._emit_debug_log(
|
|
f"Successfully fetched {len(self._model_cache)} models (filtered)"
|
|
)
|
|
return self._model_cache
|
|
except Exception as e:
|
|
await self._emit_debug_log(f"Failed to fetch model list: {e}")
|
|
# Return default model on failure
|
|
return [
|
|
{
|
|
"id": f"{self.id}-{self.valves.MODEL_ID}",
|
|
"name": f"GitHub Copilot ({self.valves.MODEL_ID})",
|
|
}
|
|
]
|
|
finally:
|
|
await client.stop()
|
|
except Exception as e:
|
|
await self._emit_debug_log(f"Pipes Error: {e}")
|
|
return [
|
|
{
|
|
"id": f"{self.id}-{self.valves.MODEL_ID}",
|
|
"name": f"GitHub Copilot ({self.valves.MODEL_ID})",
|
|
}
|
|
]
|
|
|
|
async def _get_client(self):
|
|
"""Helper to get or create a CopilotClient instance."""
|
|
client_config = {}
|
|
if os.environ.get("COPILOT_CLI_PATH"):
|
|
client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"]
|
|
|
|
client = CopilotClient(client_config)
|
|
await client.start()
|
|
return client
|
|
|
|
def _setup_env(self, __event_call__=None):
|
|
cli_path = self.valves.CLI_PATH
|
|
found = False
|
|
|
|
if os.path.exists(cli_path):
|
|
found = True
|
|
|
|
if not found:
|
|
sys_path = shutil.which("copilot")
|
|
if sys_path:
|
|
cli_path = sys_path
|
|
found = True
|
|
|
|
if not found:
|
|
try:
|
|
subprocess.run(
|
|
"curl -fsSL https://gh.io/copilot-install | bash",
|
|
shell=True,
|
|
check=True,
|
|
)
|
|
if os.path.exists(self.valves.CLI_PATH):
|
|
cli_path = self.valves.CLI_PATH
|
|
found = True
|
|
except:
|
|
pass
|
|
|
|
if found:
|
|
os.environ["COPILOT_CLI_PATH"] = cli_path
|
|
cli_dir = os.path.dirname(cli_path)
|
|
if cli_dir not in os.environ["PATH"]:
|
|
os.environ["PATH"] = f"{cli_dir}:{os.environ['PATH']}"
|
|
|
|
if self.valves.DEBUG:
|
|
self._emit_debug_log_sync(
|
|
f"Copilot CLI found at: {cli_path}", __event_call__
|
|
)
|
|
try:
|
|
# Try to get version to confirm it's executable
|
|
ver = (
|
|
subprocess.check_output(
|
|
[cli_path, "--version"], stderr=subprocess.STDOUT
|
|
)
|
|
.decode()
|
|
.strip()
|
|
)
|
|
self._emit_debug_log_sync(
|
|
f"Copilot CLI Version: {ver}", __event_call__
|
|
)
|
|
except Exception as e:
|
|
self._emit_debug_log_sync(
|
|
f"Warning: Copilot CLI found but failed to run: {e}",
|
|
__event_call__,
|
|
)
|
|
else:
|
|
if self.valves.DEBUG:
|
|
self._emit_debug_log_sync(
|
|
"Error: Copilot CLI NOT found. Agent capabilities will be disabled.",
|
|
__event_call__,
|
|
)
|
|
|
|
if self.valves.GH_TOKEN:
|
|
os.environ["GH_TOKEN"] = self.valves.GH_TOKEN
|
|
os.environ["GITHUB_TOKEN"] = self.valves.GH_TOKEN
|
|
else:
|
|
if self.valves.DEBUG:
|
|
self._emit_debug_log_sync(
|
|
"Warning: GH_TOKEN is not set.", __event_call__
|
|
)
|
|
|
|
def _process_images(self, messages, __event_call__=None):
|
|
attachments = []
|
|
text_content = ""
|
|
if not messages:
|
|
return "", []
|
|
last_msg = messages[-1]
|
|
content = last_msg.get("content", "")
|
|
|
|
if isinstance(content, list):
|
|
for item in content:
|
|
if item.get("type") == "text":
|
|
text_content += item.get("text", "")
|
|
elif item.get("type") == "image_url":
|
|
image_url = item.get("image_url", {}).get("url", "")
|
|
if image_url.startswith("data:image"):
|
|
try:
|
|
header, encoded = image_url.split(",", 1)
|
|
ext = header.split(";")[0].split("/")[-1]
|
|
file_name = f"image_{len(attachments)}.{ext}"
|
|
file_path = os.path.join(self.temp_dir, file_name)
|
|
with open(file_path, "wb") as f:
|
|
f.write(base64.b64decode(encoded))
|
|
attachments.append(
|
|
{
|
|
"type": "file",
|
|
"path": file_path,
|
|
"display_name": file_name,
|
|
}
|
|
)
|
|
self._emit_debug_log_sync(
|
|
f"Image processed: {file_path}", __event_call__
|
|
)
|
|
except Exception as e:
|
|
self._emit_debug_log_sync(
|
|
f"Image error: {e}", __event_call__
|
|
)
|
|
else:
|
|
text_content = str(content)
|
|
return text_content, attachments
|
|
|
|
def _sync_copilot_config(self, reasoning_effort: str, __event_call__=None):
|
|
"""
|
|
Dynamically update ~/.copilot/config.json if REASONING_EFFORT is set.
|
|
This provides a fallback if API injection is ignored by the server.
|
|
"""
|
|
if not reasoning_effort:
|
|
return
|
|
|
|
effort = reasoning_effort
|
|
|
|
# Check model support for xhigh
|
|
# Only gpt-5.2-codex supports xhigh currently
|
|
if effort == "xhigh":
|
|
if (
|
|
"gpt-5.2-codex"
|
|
not in self._collect_model_ids(
|
|
body={},
|
|
request_model=self.valves.MODEL_ID,
|
|
real_model_id=self.valves.MODEL_ID,
|
|
)[0].lower()
|
|
):
|
|
# Fallback to high if not supported
|
|
effort = "high"
|
|
|
|
try:
|
|
# Target standard path ~/.copilot/config.json
|
|
config_path = os.path.expanduser("~/.copilot/config.json")
|
|
config_dir = os.path.dirname(config_path)
|
|
|
|
# Only proceed if directory exists (avoid creating trash types of files if path is wrong)
|
|
if not os.path.exists(config_dir):
|
|
return
|
|
|
|
data = {}
|
|
# Read existing config
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path, "r") as f:
|
|
data = json.load(f)
|
|
except Exception:
|
|
data = {}
|
|
|
|
# Update if changed
|
|
current_val = data.get("reasoning_effort")
|
|
if current_val != effort:
|
|
data["reasoning_effort"] = effort
|
|
try:
|
|
with open(config_path, "w") as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
self._emit_debug_log_sync(
|
|
f"Dynamically updated ~/.copilot/config.json: reasoning_effort='{effort}'",
|
|
__event_call__,
|
|
)
|
|
except Exception as e:
|
|
self._emit_debug_log_sync(
|
|
f"Failed to write config.json: {e}", __event_call__
|
|
)
|
|
except Exception as e:
|
|
self._emit_debug_log_sync(f"Config sync check failed: {e}", __event_call__)
|
|
|
|
# ==================== Internal Implementation ====================
|
|
# _pipe_impl() contains the main request handling logic.
|
|
# ================================================================
|
|
async def _pipe_impl(
|
|
self,
|
|
body: dict,
|
|
__metadata__: Optional[dict] = None,
|
|
__user__: Optional[dict] = None,
|
|
__event_emitter__=None,
|
|
__event_call__=None,
|
|
) -> Union[str, AsyncGenerator]:
|
|
self._setup_env(__event_call__)
|
|
if not self.valves.GH_TOKEN:
|
|
return "Error: Please configure GH_TOKEN in Valves."
|
|
|
|
# Parse user selected model
|
|
request_model = body.get("model", "")
|
|
real_model_id = self.valves.MODEL_ID # Default value
|
|
|
|
# Determine effective reasoning effort and debug setting
|
|
if __user__:
|
|
raw_valves = __user__.get("valves", {})
|
|
if isinstance(raw_valves, self.UserValves):
|
|
user_valves = raw_valves
|
|
elif isinstance(raw_valves, dict):
|
|
user_valves = self.UserValves(**raw_valves)
|
|
else:
|
|
user_valves = self.UserValves()
|
|
else:
|
|
user_valves = self.UserValves()
|
|
effective_reasoning_effort = (
|
|
user_valves.REASONING_EFFORT
|
|
if user_valves.REASONING_EFFORT
|
|
else self.valves.REASONING_EFFORT
|
|
)
|
|
# Apply DEBUG user setting override if set to True (if False, respect global)
|
|
# Actually user setting should probably override strictly.
|
|
# But boolean fields in UserValves default to False, so we can't distinguish "not set" from "off" easily without Optional[bool]
|
|
# Let's assume if user sets DEBUG=True, it wins.
|
|
if user_valves.DEBUG:
|
|
self.valves.DEBUG = True
|
|
|
|
# Apply SHOW_THINKING user setting (prefer user override when provided)
|
|
show_thinking = (
|
|
user_valves.SHOW_THINKING
|
|
if user_valves.SHOW_THINKING is not None
|
|
else self.valves.SHOW_THINKING
|
|
)
|
|
|
|
if request_model.startswith(f"{self.id}-"):
|
|
real_model_id = request_model[len(f"{self.id}-") :]
|
|
await self._emit_debug_log(
|
|
f"Using selected model: {real_model_id}", __event_call__
|
|
)
|
|
|
|
messages = body.get("messages", [])
|
|
if not messages:
|
|
return "No messages."
|
|
|
|
# Get Chat ID using improved helper
|
|
chat_ctx = self._get_chat_context(body, __metadata__, __event_call__)
|
|
chat_id = chat_ctx.get("chat_id")
|
|
|
|
# Extract system prompt from multiple sources
|
|
system_prompt_content, system_prompt_source = await self._extract_system_prompt(
|
|
body, messages, request_model, real_model_id, __event_call__
|
|
)
|
|
|
|
if system_prompt_content:
|
|
preview = system_prompt_content[:60].replace("\n", " ")
|
|
await self._emit_debug_log(
|
|
f"System prompt confirmed (source: {system_prompt_source}, length: {len(system_prompt_content)}, preview: {preview})",
|
|
__event_call__,
|
|
)
|
|
|
|
is_streaming = body.get("stream", False)
|
|
await self._emit_debug_log(f"Request Streaming: {is_streaming}", __event_call__)
|
|
|
|
last_text, attachments = self._process_images(messages, __event_call__)
|
|
|
|
# Determine prompt strategy
|
|
# If we have a chat_id, we try to resume session.
|
|
# If resumed, we assume the session has history, so we only send the last message.
|
|
# If new session, we send full (accumulated) messages.
|
|
|
|
# Ensure we have the latest config
|
|
self._sync_copilot_config(effective_reasoning_effort, __event_call__)
|
|
|
|
# Initialize Client
|
|
client = CopilotClient(self._build_client_config(body))
|
|
should_stop_client = True
|
|
try:
|
|
await client.start()
|
|
|
|
# Initialize custom tools
|
|
custom_tools = self._initialize_custom_tools()
|
|
if custom_tools:
|
|
tool_names = [t.name for t in custom_tools]
|
|
await self._emit_debug_log(
|
|
f"Enabled {len(custom_tools)} custom tools: {tool_names}",
|
|
__event_call__,
|
|
)
|
|
|
|
# Create or Resume Session
|
|
session = None
|
|
if chat_id:
|
|
try:
|
|
session = await client.resume_session(chat_id)
|
|
await self._emit_debug_log(
|
|
f"Resumed session: {chat_id} (Note: Formatting guidelines only apply to NEW sessions. Create a new chat to use updated formatting.)",
|
|
__event_call__,
|
|
)
|
|
|
|
# Show workspace info if available
|
|
if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO:
|
|
if session.workspace_path:
|
|
await self._emit_debug_log(
|
|
f"Session workspace: {session.workspace_path}",
|
|
__event_call__,
|
|
)
|
|
|
|
is_new_session = False
|
|
except Exception as e:
|
|
await self._emit_debug_log(
|
|
f"Session {chat_id} not found ({str(e)}), creating new.",
|
|
__event_call__,
|
|
)
|
|
|
|
if session is None:
|
|
session_config = self._build_session_config(
|
|
chat_id,
|
|
real_model_id,
|
|
custom_tools,
|
|
system_prompt_content,
|
|
is_streaming,
|
|
)
|
|
if system_prompt_content or self.valves.ENFORCE_FORMATTING:
|
|
# Build preview of what's being sent
|
|
preview_parts = []
|
|
if system_prompt_content:
|
|
preview_parts.append(
|
|
f"custom_prompt: {system_prompt_content[:100]}..."
|
|
)
|
|
if self.valves.ENFORCE_FORMATTING:
|
|
preview_parts.append("formatting_guidelines: enabled")
|
|
|
|
if isinstance(session_config, dict):
|
|
system_config = session_config.get("system_message", {})
|
|
else:
|
|
system_config = getattr(session_config, "system_message", None)
|
|
|
|
if isinstance(system_config, dict):
|
|
full_content = system_config.get("content", "")
|
|
else:
|
|
full_content = ""
|
|
|
|
await self._emit_debug_log(
|
|
f"System message config - {', '.join(preview_parts)} (total length: {len(full_content)} chars)",
|
|
__event_call__,
|
|
)
|
|
session = await client.create_session(config=session_config)
|
|
await self._emit_debug_log(
|
|
f"Created new session with model: {real_model_id}",
|
|
__event_call__,
|
|
)
|
|
|
|
# Show workspace info for new sessions
|
|
if self.valves.DEBUG and self.valves.SHOW_WORKSPACE_INFO:
|
|
if session.workspace_path:
|
|
await self._emit_debug_log(
|
|
f"Session workspace: {session.workspace_path}",
|
|
__event_call__,
|
|
)
|
|
|
|
# Construct Prompt (session-based: send only latest user input)
|
|
prompt = self._apply_formatting_hint(last_text)
|
|
|
|
await self._emit_debug_log(
|
|
f"Sending prompt ({len(prompt)} chars) to Agent...",
|
|
__event_call__,
|
|
)
|
|
|
|
send_payload = {"prompt": prompt, "mode": "immediate"}
|
|
if attachments:
|
|
send_payload["attachments"] = attachments
|
|
|
|
if body.get("stream", False):
|
|
init_msg = ""
|
|
if self.valves.DEBUG:
|
|
init_msg = f"> [Debug] Agent working in: {os.getcwd()}\n"
|
|
|
|
# Transfer client ownership to stream_response
|
|
should_stop_client = False
|
|
return self.stream_response(
|
|
client,
|
|
session,
|
|
send_payload,
|
|
init_msg,
|
|
__event_call__,
|
|
reasoning_effort=effective_reasoning_effort,
|
|
show_thinking=show_thinking,
|
|
)
|
|
else:
|
|
try:
|
|
response = await session.send_and_wait(send_payload)
|
|
return response.data.content if response else "Empty response."
|
|
finally:
|
|
# Cleanup: destroy session if no chat_id (temporary session)
|
|
if not chat_id:
|
|
try:
|
|
await session.destroy()
|
|
except Exception as cleanup_error:
|
|
await self._emit_debug_log(
|
|
f"Session cleanup warning: {cleanup_error}",
|
|
__event_call__,
|
|
)
|
|
except Exception as e:
|
|
await self._emit_debug_log(f"Request Error: {e}", __event_call__)
|
|
return f"Error: {str(e)}"
|
|
finally:
|
|
# Cleanup client if not transferred to stream
|
|
if should_stop_client:
|
|
try:
|
|
await client.stop()
|
|
except Exception as e:
|
|
await self._emit_debug_log(
|
|
f"Client cleanup warning: {e}", __event_call__
|
|
)
|
|
|
|
async def stream_response(
|
|
self,
|
|
client,
|
|
session,
|
|
send_payload,
|
|
init_message: str = "",
|
|
__event_call__=None,
|
|
reasoning_effort: str = "",
|
|
show_thinking: bool = True,
|
|
) -> AsyncGenerator:
|
|
"""
|
|
Stream response from Copilot SDK, handling various event types.
|
|
Follows official SDK patterns for event handling and streaming.
|
|
"""
|
|
from copilot.generated.session_events import SessionEventType
|
|
|
|
queue = asyncio.Queue()
|
|
done = asyncio.Event()
|
|
SENTINEL = object()
|
|
# Use local state to handle concurrency and tracking
|
|
state = {"thinking_started": False, "content_sent": False}
|
|
has_content = False # Track if any content has been yielded
|
|
active_tools = {} # Map tool_call_id to tool_name
|
|
|
|
def get_event_type(event) -> str:
|
|
"""Extract event type as string, handling both enum and string types."""
|
|
if hasattr(event, "type"):
|
|
event_type = event.type
|
|
# Handle SessionEventType enum
|
|
if hasattr(event_type, "value"):
|
|
return event_type.value
|
|
return str(event_type)
|
|
return "unknown"
|
|
|
|
def safe_get_data_attr(event, attr: str, default=None):
|
|
"""
|
|
Safely extract attribute from event.data.
|
|
Handles both dict access and object attribute access.
|
|
"""
|
|
if not hasattr(event, "data") or event.data is None:
|
|
return default
|
|
|
|
data = event.data
|
|
|
|
# Try as dict first
|
|
if isinstance(data, dict):
|
|
return data.get(attr, default)
|
|
|
|
# Try as object attribute
|
|
return getattr(data, attr, default)
|
|
|
|
def handler(event):
|
|
"""
|
|
Event handler following official SDK patterns.
|
|
Processes streaming deltas, reasoning, tool events, and session state.
|
|
"""
|
|
event_type = get_event_type(event)
|
|
|
|
# === Message Delta Events (Primary streaming content) ===
|
|
if event_type == "assistant.message_delta":
|
|
# Official: event.data.delta_content for Python SDK
|
|
delta = safe_get_data_attr(
|
|
event, "delta_content"
|
|
) or safe_get_data_attr(event, "deltaContent")
|
|
if delta:
|
|
state["content_sent"] = True
|
|
if state["thinking_started"]:
|
|
queue.put_nowait("\n</think>\n")
|
|
state["thinking_started"] = False
|
|
queue.put_nowait(delta)
|
|
|
|
# === Complete Message Event (Non-streaming response) ===
|
|
elif event_type == "assistant.message":
|
|
# Handle complete message (when SDK returns full content instead of deltas)
|
|
content = safe_get_data_attr(event, "content") or safe_get_data_attr(
|
|
event, "message"
|
|
)
|
|
if content:
|
|
state["content_sent"] = True
|
|
if state["thinking_started"]:
|
|
queue.put_nowait("\n</think>\n")
|
|
state["thinking_started"] = False
|
|
queue.put_nowait(content)
|
|
|
|
# === Reasoning Delta Events (Chain-of-thought streaming) ===
|
|
elif event_type == "assistant.reasoning_delta":
|
|
delta = safe_get_data_attr(
|
|
event, "delta_content"
|
|
) or safe_get_data_attr(event, "deltaContent")
|
|
if delta:
|
|
# Suppress late-arriving reasoning if content already started
|
|
if state["content_sent"]:
|
|
return
|
|
|
|
# Use UserValves or Global Valve for thinking visibility
|
|
if not state["thinking_started"] and show_thinking:
|
|
queue.put_nowait("<think>\n")
|
|
state["thinking_started"] = True
|
|
if state["thinking_started"]:
|
|
queue.put_nowait(delta)
|
|
|
|
# === Complete Reasoning Event (Non-streaming reasoning) ===
|
|
elif event_type == "assistant.reasoning":
|
|
# Handle complete reasoning content
|
|
reasoning = safe_get_data_attr(event, "content") or safe_get_data_attr(
|
|
event, "reasoning"
|
|
)
|
|
if reasoning:
|
|
# Suppress late-arriving reasoning if content already started
|
|
if state["content_sent"]:
|
|
return
|
|
|
|
if not state["thinking_started"] and show_thinking:
|
|
queue.put_nowait("<think>\n")
|
|
state["thinking_started"] = True
|
|
if state["thinking_started"]:
|
|
queue.put_nowait(reasoning)
|
|
|
|
# === Tool Execution Events ===
|
|
elif event_type == "tool.execution_start":
|
|
tool_name = (
|
|
safe_get_data_attr(event, "name")
|
|
or safe_get_data_attr(event, "tool_name")
|
|
or "Unknown Tool"
|
|
)
|
|
tool_call_id = safe_get_data_attr(event, "tool_call_id", "")
|
|
|
|
# Get tool arguments
|
|
tool_args = {}
|
|
try:
|
|
args_obj = safe_get_data_attr(event, "arguments")
|
|
if isinstance(args_obj, dict):
|
|
tool_args = args_obj
|
|
elif isinstance(args_obj, str):
|
|
tool_args = json.loads(args_obj)
|
|
except:
|
|
pass
|
|
|
|
if tool_call_id:
|
|
active_tools[tool_call_id] = {
|
|
"name": tool_name,
|
|
"arguments": tool_args,
|
|
}
|
|
|
|
# Close thinking tag if open before showing tool
|
|
if state["thinking_started"]:
|
|
queue.put_nowait("\n</think>\n")
|
|
state["thinking_started"] = False
|
|
|
|
# Display tool call with improved formatting
|
|
if tool_args:
|
|
tool_args_json = json.dumps(tool_args, indent=2, ensure_ascii=False)
|
|
tool_display = f"\n\n<details>\n<summary>🔧 Executing Tool: {tool_name}</summary>\n\n**Parameters:**\n\n```json\n{tool_args_json}\n```\n\n</details>\n\n"
|
|
else:
|
|
tool_display = f"\n\n<details>\n<summary>🔧 Executing Tool: {tool_name}</summary>\n\n*No parameters*\n\n</details>\n\n"
|
|
|
|
queue.put_nowait(tool_display)
|
|
|
|
self._emit_debug_log_sync(f"Tool Start: {tool_name}", __event_call__)
|
|
|
|
elif event_type == "tool.execution_complete":
|
|
tool_call_id = safe_get_data_attr(event, "tool_call_id", "")
|
|
tool_info = active_tools.get(tool_call_id)
|
|
|
|
# Handle both old string format and new dict format
|
|
if isinstance(tool_info, str):
|
|
tool_name = tool_info
|
|
elif isinstance(tool_info, dict):
|
|
tool_name = tool_info.get("name", "Unknown Tool")
|
|
else:
|
|
tool_name = "Unknown Tool"
|
|
|
|
# Try to get result content
|
|
result_content = ""
|
|
result_type = "success"
|
|
try:
|
|
result_obj = safe_get_data_attr(event, "result")
|
|
if hasattr(result_obj, "content"):
|
|
result_content = result_obj.content
|
|
elif isinstance(result_obj, dict):
|
|
result_content = result_obj.get("content", "")
|
|
result_type = result_obj.get("result_type", "success")
|
|
if not result_content:
|
|
# Try to serialize the entire dict if no content field
|
|
result_content = json.dumps(
|
|
result_obj, indent=2, ensure_ascii=False
|
|
)
|
|
except Exception as e:
|
|
self._emit_debug_log_sync(
|
|
f"Error extracting result: {e}", __event_call__
|
|
)
|
|
result_type = "failure"
|
|
result_content = f"Error: {str(e)}"
|
|
|
|
# Display tool result with improved formatting
|
|
if result_content:
|
|
status_icon = "✅" if result_type == "success" else "❌"
|
|
|
|
# Try to detect content type for better formatting
|
|
is_json = False
|
|
try:
|
|
json_obj = (
|
|
json.loads(result_content)
|
|
if isinstance(result_content, str)
|
|
else result_content
|
|
)
|
|
if isinstance(json_obj, (dict, list)):
|
|
result_content = json.dumps(
|
|
json_obj, indent=2, ensure_ascii=False
|
|
)
|
|
is_json = True
|
|
except:
|
|
pass
|
|
|
|
# Format based on content type
|
|
if is_json:
|
|
# JSON content: use code block with syntax highlighting
|
|
result_display = f"\n<details>\n<summary>{status_icon} Tool Result: {tool_name}</summary>\n\n```json\n{result_content}\n```\n\n</details>\n\n"
|
|
else:
|
|
# Plain text: use text code block to preserve formatting and add line breaks
|
|
result_display = f"\n<details>\n<summary>{status_icon} Tool Result: {tool_name}</summary>\n\n```text\n{result_content}\n```\n\n</details>\n\n"
|
|
|
|
queue.put_nowait(result_display)
|
|
|
|
elif event_type == "tool.execution_progress":
|
|
# Tool execution progress update (for long-running tools)
|
|
tool_call_id = safe_get_data_attr(event, "tool_call_id", "")
|
|
tool_info = active_tools.get(tool_call_id)
|
|
tool_name = (
|
|
tool_info.get("name", "Unknown Tool")
|
|
if isinstance(tool_info, dict)
|
|
else "Unknown Tool"
|
|
)
|
|
|
|
progress = safe_get_data_attr(event, "progress", 0)
|
|
message = safe_get_data_attr(event, "message", "")
|
|
|
|
if message:
|
|
progress_display = f"\n> 🔄 **{tool_name}**: {message}\n"
|
|
queue.put_nowait(progress_display)
|
|
|
|
self._emit_debug_log_sync(
|
|
f"Tool Progress: {tool_name} - {progress}%", __event_call__
|
|
)
|
|
|
|
elif event_type == "tool.execution_partial_result":
|
|
# Streaming tool results (for tools that output incrementally)
|
|
tool_call_id = safe_get_data_attr(event, "tool_call_id", "")
|
|
tool_info = active_tools.get(tool_call_id)
|
|
tool_name = (
|
|
tool_info.get("name", "Unknown Tool")
|
|
if isinstance(tool_info, dict)
|
|
else "Unknown Tool"
|
|
)
|
|
|
|
partial_content = safe_get_data_attr(event, "content", "")
|
|
if partial_content:
|
|
queue.put_nowait(partial_content)
|
|
|
|
self._emit_debug_log_sync(
|
|
f"Tool Partial Result: {tool_name}", __event_call__
|
|
)
|
|
|
|
# === Usage Statistics Events ===
|
|
elif event_type == "assistant.usage":
|
|
# Token usage for current assistant turn
|
|
if self.valves.DEBUG:
|
|
input_tokens = safe_get_data_attr(event, "input_tokens", 0)
|
|
output_tokens = safe_get_data_attr(event, "output_tokens", 0)
|
|
total_tokens = safe_get_data_attr(event, "total_tokens", 0)
|
|
pass
|
|
|
|
elif event_type == "session.usage_info":
|
|
# Cumulative session usage information
|
|
pass
|
|
|
|
elif event_type == "session.compaction_complete":
|
|
self._emit_debug_log_sync(
|
|
"Session Compaction Completed", __event_call__
|
|
)
|
|
|
|
elif event_type == "session.idle":
|
|
# Session finished processing - signal completion
|
|
done.set()
|
|
try:
|
|
queue.put_nowait(SENTINEL)
|
|
except:
|
|
pass
|
|
|
|
elif event_type == "session.error":
|
|
error_msg = safe_get_data_attr(event, "message", "Unknown Error")
|
|
queue.put_nowait(f"\n[Error: {error_msg}]")
|
|
done.set()
|
|
try:
|
|
queue.put_nowait(SENTINEL)
|
|
except:
|
|
pass
|
|
|
|
unsubscribe = session.on(handler)
|
|
|
|
self._emit_debug_log_sync(
|
|
f"Subscribed to events. Sending request...", __event_call__
|
|
)
|
|
|
|
# Use asyncio.create_task used to prevent session.send from blocking the stream reading
|
|
# if the SDK implementation waits for completion.
|
|
send_task = asyncio.create_task(session.send(send_payload))
|
|
self._emit_debug_log_sync(f"Prompt sent (async task started)", __event_call__)
|
|
|
|
# Safe initial yield with error handling
|
|
try:
|
|
if self.valves.DEBUG:
|
|
yield "<think>\n"
|
|
if init_message:
|
|
yield init_message
|
|
|
|
if reasoning_effort and reasoning_effort != "off":
|
|
yield f"> [Debug] Reasoning Effort injected: {reasoning_effort.upper()}\n"
|
|
|
|
yield "> [Debug] Connection established, waiting for response...\n"
|
|
state["thinking_started"] = True
|
|
except Exception as e:
|
|
# If initial yield fails, log but continue processing
|
|
self._emit_debug_log_sync(f"Initial yield warning: {e}", __event_call__)
|
|
|
|
try:
|
|
while not done.is_set():
|
|
try:
|
|
chunk = await asyncio.wait_for(
|
|
queue.get(), timeout=float(self.valves.TIMEOUT)
|
|
)
|
|
if chunk is SENTINEL:
|
|
break
|
|
if chunk:
|
|
has_content = True
|
|
try:
|
|
yield chunk
|
|
except Exception as yield_error:
|
|
# Connection closed by client, stop gracefully
|
|
self._emit_debug_log_sync(
|
|
f"Yield error (client disconnected?): {yield_error}",
|
|
__event_call__,
|
|
)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
if done.is_set():
|
|
break
|
|
if state["thinking_started"]:
|
|
try:
|
|
yield f"> [Debug] Waiting for response ({self.valves.TIMEOUT}s exceeded)...\n"
|
|
except:
|
|
# If yield fails during timeout, connection is gone
|
|
break
|
|
continue
|
|
|
|
while not queue.empty():
|
|
chunk = queue.get_nowait()
|
|
if chunk is SENTINEL:
|
|
break
|
|
if chunk:
|
|
has_content = True
|
|
try:
|
|
yield chunk
|
|
except:
|
|
# Connection closed, stop yielding
|
|
break
|
|
|
|
if state["thinking_started"]:
|
|
try:
|
|
yield "\n</think>\n"
|
|
has_content = True
|
|
except:
|
|
pass # Connection closed
|
|
|
|
# Core fix: If no content was yielded, return a fallback message to prevent OpenWebUI error
|
|
if not has_content:
|
|
try:
|
|
yield "⚠️ Copilot returned no content. Please check if the Model ID is correct or enable DEBUG mode in Valves for details."
|
|
except:
|
|
pass # Connection already closed
|
|
|
|
except Exception as e:
|
|
try:
|
|
yield f"\n[Stream Error: {str(e)}]"
|
|
except:
|
|
pass # Connection already closed
|
|
finally:
|
|
unsubscribe()
|
|
# Cleanup client and session
|
|
try:
|
|
# We do not destroy session here to allow persistence,
|
|
# but we must stop the client.
|
|
await client.stop()
|
|
except Exception as e:
|
|
pass
|