fix(github-copilot-sdk): fix duplicate responses and add fine-grained TTFT profiling
This commit is contained in:
25
.agent/learnings/github-copilot-sdk-stream-finalization.md
Normal file
25
.agent/learnings/github-copilot-sdk-stream-finalization.md
Normal file
@@ -0,0 +1,25 @@
|
||||
# GitHub Copilot SDK Stream Finalization
|
||||
|
||||
> Discovered: 2026-03-20
|
||||
|
||||
## Context
|
||||
Applies to `plugins/pipes/github-copilot-sdk/github_copilot_sdk.py` when streaming assistant output, handling `pending_embeds`, and waiting for `session.idle`.
|
||||
|
||||
## Finding
|
||||
Two non-obvious issues can make the pipe feel permanently stuck even when useful work already finished:
|
||||
|
||||
1. If the main `queue.get()` wait uses the full user-configured `TIMEOUT` (for example 300s), watchdog logic, "still working" status updates, and synthetic finalization checks only wake up at that same coarse interval.
|
||||
2. If `pending_embeds` are flushed only in the `session.idle` branch, any timeout/error/missing-idle path can lose already-prepared embeds even though file publishing itself succeeded.
|
||||
|
||||
## Solution / Pattern
|
||||
- Keep the *inactivity limit* controlled by `TIMEOUT`, but poll the local stream queue on a short fixed cadence (for example max 5s) so watchdogs and fallback finalization stay responsive.
|
||||
- Track `assistant.turn_end`; if `session.idle` does not arrive shortly afterward, synthesize finalization instead of waiting for the full inactivity timeout.
|
||||
- Flush `pending_embeds` exactly once via a shared helper that can run from both normal idle finalization and error/timeout finalization paths.
|
||||
- For streamed text/reasoning, use conservative overlap trimming: only strip an overlapping prefix when the incoming chunk still contains new suffix content. Do not drop fully repeated chunks blindly, or legitimate repeated text can be corrupted.
|
||||
|
||||
## Gotchas
|
||||
- RichUI embed success and streamed-text success are separate paths; a file can be published correctly while chat output still hangs or duplicates.
|
||||
- If `assistant.reasoning_delta` is streamed, the later complete `assistant.reasoning` event must be suppressed just like `assistant.message`, or the thinking block can duplicate.
|
||||
|
||||
## 🛠️ Update 2026-03-21
|
||||
- **Fixed Stream Duplication**: Fixed text stream overlays (e.g., `🎉 删 🎉 删除成功`) when resuming conversation session. Strictly applied `_dedupe_stream_chunk(delta, "message_stream_tail")` inside `assistant.message_delta` event handler to prevent concurrent history re-play or multiple stream delivery bug overlays, solving previous gaps in the deployment pipeline.
|
||||
@@ -124,13 +124,30 @@ RICHUI_BRIDGE_SCRIPT = r"""
|
||||
return null;
|
||||
}
|
||||
|
||||
function classHasThemeToken(className, token) {
|
||||
return String(className || '').indexOf(token) !== -1;
|
||||
}
|
||||
|
||||
function applyTheme(theme) {
|
||||
if (theme !== 'dark' && theme !== 'light') {
|
||||
return;
|
||||
}
|
||||
var isDark = theme === 'dark';
|
||||
var shouldMirrorTheme = Boolean(getParentDocument());
|
||||
if (
|
||||
root.getAttribute('data-openwebui-applied-theme') === theme &&
|
||||
(!shouldMirrorTheme ||
|
||||
(root.getAttribute('data-theme') === theme &&
|
||||
root.classList.contains('dark') === isDark)) &&
|
||||
String(root.style.colorScheme || '') === theme
|
||||
) {
|
||||
return;
|
||||
}
|
||||
root.setAttribute('data-openwebui-applied-theme', theme);
|
||||
root.setAttribute('data-theme', theme);
|
||||
root.classList.toggle('dark', theme === 'dark');
|
||||
if (shouldMirrorTheme) {
|
||||
root.setAttribute('data-theme', theme);
|
||||
root.classList.toggle('dark', isDark);
|
||||
}
|
||||
root.style.colorScheme = theme;
|
||||
}
|
||||
|
||||
@@ -187,66 +204,57 @@ RICHUI_BRIDGE_SCRIPT = r"""
|
||||
return null;
|
||||
}
|
||||
|
||||
function getThemeFromDocument(doc, options) {
|
||||
function getThemeFromDocument(doc) {
|
||||
if (!doc) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
var useHtmlSignals = !options || options.useHtmlSignals !== false;
|
||||
var html = doc.documentElement;
|
||||
var body = doc.body;
|
||||
var htmlClass = useHtmlSignals && html ? String(html.className || '') : '';
|
||||
var htmlClass = html ? String(html.className || '') : '';
|
||||
var bodyClass = body ? String(body.className || '') : '';
|
||||
var htmlDataTheme =
|
||||
useHtmlSignals && html ? html.getAttribute('data-theme') : '';
|
||||
var bodyDataTheme = body ? body.getAttribute('data-theme') : '';
|
||||
var htmlDataTheme = html ? html.getAttribute('data-theme') : '';
|
||||
|
||||
if (
|
||||
htmlDataTheme === 'dark' ||
|
||||
bodyDataTheme === 'dark' ||
|
||||
htmlClass.indexOf('dark') !== -1 ||
|
||||
bodyClass.indexOf('dark') !== -1
|
||||
classHasThemeToken(bodyClass, 'dark') ||
|
||||
classHasThemeToken(htmlClass, 'dark')
|
||||
) {
|
||||
return 'dark';
|
||||
}
|
||||
if (
|
||||
htmlDataTheme === 'light' ||
|
||||
bodyDataTheme === 'light' ||
|
||||
htmlClass.indexOf('light') !== -1 ||
|
||||
bodyClass.indexOf('light') !== -1
|
||||
classHasThemeToken(bodyClass, 'light') ||
|
||||
classHasThemeToken(htmlClass, 'light')
|
||||
) {
|
||||
return 'light';
|
||||
}
|
||||
|
||||
var metaTheme = getThemeFromMeta(doc);
|
||||
if (metaTheme) {
|
||||
return metaTheme;
|
||||
}
|
||||
|
||||
var computedTarget = useHtmlSignals ? html || body : body || html;
|
||||
if (computedTarget) {
|
||||
var computedScheme = window.getComputedStyle(computedTarget).colorScheme;
|
||||
if (computedScheme === 'dark' || computedScheme === 'light') {
|
||||
return computedScheme;
|
||||
}
|
||||
}
|
||||
} catch (e) {}
|
||||
return null;
|
||||
}
|
||||
|
||||
function resolveTheme() {
|
||||
var parentTheme = getThemeFromDocument(getParentDocument(), {
|
||||
useHtmlSignals: true,
|
||||
});
|
||||
if (parentTheme) {
|
||||
return parentTheme;
|
||||
var parentDoc = getParentDocument();
|
||||
var parentMetaTheme = parentDoc ? getThemeFromMeta(parentDoc) : null;
|
||||
if (parentMetaTheme) {
|
||||
return parentMetaTheme;
|
||||
}
|
||||
|
||||
var localTheme = getThemeFromDocument(document, {
|
||||
useHtmlSignals: false,
|
||||
});
|
||||
if (localTheme) {
|
||||
return localTheme;
|
||||
var parentClassTheme = parentDoc ? getThemeFromDocument(parentDoc) : null;
|
||||
if (parentClassTheme) {
|
||||
return parentClassTheme;
|
||||
}
|
||||
|
||||
if (!parentDoc) {
|
||||
var localMetaTheme = getThemeFromMeta(document);
|
||||
if (localMetaTheme) {
|
||||
return localMetaTheme;
|
||||
}
|
||||
|
||||
var localClassTheme = getThemeFromDocument(document);
|
||||
if (localClassTheme) {
|
||||
return localClassTheme;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -366,26 +374,26 @@ function measureHeight() {
|
||||
}
|
||||
|
||||
function attachThemeObservers() {
|
||||
var parentDoc = getParentDocument();
|
||||
if (!window.MutationObserver || !parentDoc || parentThemeObserver) {
|
||||
var themeDoc = getParentDocument() || document;
|
||||
if (!window.MutationObserver || !themeDoc || parentThemeObserver) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
parentThemeObserver = new MutationObserver(syncTheme);
|
||||
if (parentDoc.documentElement) {
|
||||
parentThemeObserver.observe(parentDoc.documentElement, {
|
||||
if (themeDoc.documentElement) {
|
||||
parentThemeObserver.observe(themeDoc.documentElement, {
|
||||
attributes: true,
|
||||
attributeFilter: ['class', 'data-theme', 'style'],
|
||||
attributeFilter: ['class', 'data-theme'],
|
||||
});
|
||||
}
|
||||
if (parentDoc.body) {
|
||||
parentThemeObserver.observe(parentDoc.body, {
|
||||
if (themeDoc.body) {
|
||||
parentThemeObserver.observe(themeDoc.body, {
|
||||
attributes: true,
|
||||
attributeFilter: ['class', 'data-theme', 'style'],
|
||||
attributeFilter: ['class', 'data-theme'],
|
||||
});
|
||||
}
|
||||
if (parentDoc.head) {
|
||||
parentThemeObserver.observe(parentDoc.head, {
|
||||
if (themeDoc.head) {
|
||||
parentThemeObserver.observe(themeDoc.head, {
|
||||
childList: true,
|
||||
subtree: true,
|
||||
attributes: true,
|
||||
@@ -3027,6 +3035,8 @@ class Pipe:
|
||||
__message_id__: Optional[str] = None,
|
||||
):
|
||||
"""Initialize custom tools based on configuration"""
|
||||
import time
|
||||
t_start = time.perf_counter()
|
||||
# 1. Determine effective settings (User override > Global)
|
||||
uv = self._get_user_valves(__user__)
|
||||
enable_tools = uv.ENABLE_OPENWEBUI_TOOLS
|
||||
@@ -3057,8 +3067,11 @@ class Pipe:
|
||||
if todo_widget_tool:
|
||||
final_tools.append(todo_widget_tool)
|
||||
|
||||
t_base_tools_done = time.perf_counter()
|
||||
|
||||
# 3. If all OpenWebUI tool types are disabled, skip loading and return early
|
||||
if not enable_tools and not enable_openapi:
|
||||
logger.info(f"[Perf] _initialize_custom_tools (fast return): {(time.perf_counter() - t_start)*1000:.2f}ms")
|
||||
return final_tools
|
||||
|
||||
# 4. Extract chat-level tool selection (P4: user selection from Chat UI)
|
||||
@@ -3067,6 +3080,7 @@ class Pipe:
|
||||
)
|
||||
|
||||
# 5. Load OpenWebUI tools dynamically (always fresh, no cache)
|
||||
t_dyn_tools_start = time.perf_counter()
|
||||
openwebui_tools = await self._load_openwebui_tools(
|
||||
body=body,
|
||||
__user__=__user__,
|
||||
@@ -3085,6 +3099,7 @@ class Pipe:
|
||||
__chat_id__=__chat_id__,
|
||||
__message_id__=__message_id__,
|
||||
)
|
||||
t_dyn_tools_done = time.perf_counter()
|
||||
|
||||
if openwebui_tools:
|
||||
tool_names = [t.name for t in openwebui_tools]
|
||||
@@ -3101,6 +3116,9 @@ class Pipe:
|
||||
|
||||
final_tools.extend(openwebui_tools)
|
||||
|
||||
t_total = time.perf_counter() - t_start
|
||||
logger.info(f"[Perf] _initialize_custom_tools total: {t_total*1000:.2f}ms (Base tools: {(t_base_tools_done-t_start)*1000:.2f}ms, Dyn Load: {(t_dyn_tools_done-t_dyn_tools_start)*1000:.2f}ms)")
|
||||
|
||||
return final_tools
|
||||
|
||||
def _get_render_todo_widget_tool(
|
||||
@@ -4900,9 +4918,13 @@ class Pipe:
|
||||
__event_call__,
|
||||
)
|
||||
|
||||
import time
|
||||
t_get_opw_start = time.perf_counter()
|
||||
tools_dict = await get_openwebui_tools(
|
||||
tool_request, tool_ids, user, extra_params
|
||||
)
|
||||
t_get_opw_done = time.perf_counter()
|
||||
logger.info(f"[Perf] get_openwebui_tools ({len(tool_ids)} IDs): {(t_get_opw_done-t_get_opw_start)*1000:.2f}ms")
|
||||
|
||||
if self.valves.DEBUG:
|
||||
if tools_dict:
|
||||
@@ -5917,7 +5939,8 @@ class Pipe:
|
||||
--blk-c:#dc2626; --blk-b:#fef2f2;--blk-bd:#fecaca;
|
||||
--dot:#6366f1;
|
||||
}}
|
||||
html.dark{{
|
||||
html.dark,
|
||||
html[data-openwebui-applied-theme="dark"]{{
|
||||
--bg:#1e1e2e;--bd:#313244;--tx:#cdd6f4;--mu:#6c7086;--row-h:#232336;
|
||||
--done-c:#a6e3a1;--done-b:#1e3a2a;--done-bd:#2d5c3e;
|
||||
--prog-c:#89b4fa;--prog-b:#1a2a4a;--prog-bd:#2a4070;
|
||||
@@ -7641,6 +7664,7 @@ class Pipe:
|
||||
__chat_id__: Optional[str] = None,
|
||||
__message_id__: Optional[str] = None,
|
||||
) -> Union[str, AsyncGenerator]:
|
||||
request_start_ts = time.monotonic()
|
||||
# --- PROBE LOG ---
|
||||
if __event_call__:
|
||||
await self._emit_debug_log(
|
||||
@@ -8030,13 +8054,20 @@ class Pipe:
|
||||
# Shared state for delayed HTML embeds (Premium Experience)
|
||||
pending_embeds = []
|
||||
|
||||
# Use Shared Persistent Client Pool (Token-aware)
|
||||
client = await self._get_client(token)
|
||||
should_stop_client = False # Never stop the shared singleton pool!
|
||||
# ==================== REVERT TO 0.9.1 EPHEMERAL CLIENT ====================
|
||||
t_before_client = time.monotonic()
|
||||
client_config = self._build_client_config(
|
||||
user_id=user_id, chat_id=chat_id, token=effective_token
|
||||
)
|
||||
client_config["github_token"] = effective_token
|
||||
client = CopilotClient(client_config)
|
||||
should_stop_client = True
|
||||
try:
|
||||
# Note: client is already started in _get_client
|
||||
await client.start()
|
||||
t_after_client = time.monotonic()
|
||||
|
||||
# Initialize custom tools (Handles caching internally)
|
||||
t_before_tools = time.monotonic()
|
||||
custom_tools = await self._initialize_custom_tools(
|
||||
body=body,
|
||||
__user__=__user__,
|
||||
@@ -8054,6 +8085,8 @@ class Pipe:
|
||||
__chat_id__=__chat_id__,
|
||||
__message_id__=__message_id__,
|
||||
)
|
||||
t_after_tools = time.monotonic()
|
||||
|
||||
if custom_tools:
|
||||
await self._emit_debug_log(
|
||||
f"Enabled {len(custom_tools)} tools (Custom/Built-in)",
|
||||
@@ -8061,9 +8094,21 @@ class Pipe:
|
||||
)
|
||||
|
||||
# Check MCP Servers
|
||||
t_before_mcp = time.monotonic()
|
||||
mcp_servers = self._parse_mcp_servers(
|
||||
__event_call__, enable_mcp=effective_mcp, chat_tool_ids=chat_tool_ids
|
||||
)
|
||||
t_after_mcp = time.monotonic()
|
||||
|
||||
# Emit Profiling Log to Frontend
|
||||
self._emit_debug_log_sync(
|
||||
f"⏱️ [Profiling] Client.start: {t_after_client - t_before_client:.3f}s | "
|
||||
f"Tools Init: {t_after_tools - t_before_tools:.3f}s | "
|
||||
f"MCP Parse: {t_after_mcp - t_before_mcp:.3f}s",
|
||||
__event_call__,
|
||||
debug_enabled=True,
|
||||
)
|
||||
|
||||
mcp_server_names = list(mcp_servers.keys()) if mcp_servers else []
|
||||
if mcp_server_names:
|
||||
await self._emit_debug_log(
|
||||
@@ -8100,6 +8145,7 @@ class Pipe:
|
||||
provider_config["bearer_token"] = byok_bearer_token
|
||||
pass
|
||||
|
||||
|
||||
if chat_id:
|
||||
try:
|
||||
resolved_cwd = self._get_workspace_dir(
|
||||
@@ -8125,17 +8171,14 @@ class Pipe:
|
||||
# Re-use mapping logic or just pass it through
|
||||
resume_params["reasoning_effort"] = effective_reasoning_effort
|
||||
|
||||
mcp_servers = self._parse_mcp_servers(
|
||||
__event_call__,
|
||||
enable_mcp=effective_mcp,
|
||||
chat_tool_ids=chat_tool_ids,
|
||||
)
|
||||
# Use already parsed mcp_servers to avoid redundant heavy IO
|
||||
if mcp_servers:
|
||||
resume_params["mcp_servers"] = mcp_servers
|
||||
|
||||
# Always None: let CLI built-ins (bash etc.) remain available.
|
||||
resume_params["available_tools"] = None
|
||||
|
||||
t_skills_start = time.monotonic()
|
||||
resume_params.update(
|
||||
self._resolve_session_skill_config(
|
||||
resolved_cwd=resolved_cwd,
|
||||
@@ -8144,24 +8187,18 @@ class Pipe:
|
||||
disabled_skills=effective_disabled_skills,
|
||||
)
|
||||
)
|
||||
try:
|
||||
skill_dirs_dbg = resume_params.get("skill_directories") or []
|
||||
if skill_dirs_dbg:
|
||||
logger.info(
|
||||
f"[Copilot] resume skill_directories={skill_dirs_dbg}"
|
||||
)
|
||||
t_skills_elapsed = time.monotonic() - t_skills_start
|
||||
|
||||
# Only run heavy IO skill debugging if debug is actually on
|
||||
if effective_debug:
|
||||
try:
|
||||
skill_dirs_dbg = resume_params.get("skill_directories") or []
|
||||
for sd in skill_dirs_dbg:
|
||||
path = Path(sd)
|
||||
skill_md_count = sum(
|
||||
1 for p in path.glob("*/SKILL.md") if p.is_file()
|
||||
)
|
||||
logger.info(
|
||||
f"[Copilot] resume skill_dir check: {sd} exists={path.exists()} skill_md_count={skill_md_count}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"[Copilot] resume skill directory debug check failed: {e}"
|
||||
)
|
||||
skill_md_count = sum(1 for p in path.glob("*/SKILL.md") if p.is_file())
|
||||
logger.info(f"[Copilot] resume skill_dir check: {sd} skill_md_count={skill_md_count}")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Always inject the latest system prompt in 'replace' mode
|
||||
# This handles both custom models and user-defined system messages
|
||||
@@ -8201,7 +8238,16 @@ class Pipe:
|
||||
debug_enabled=effective_debug,
|
||||
)
|
||||
|
||||
t_before_rpc = time.monotonic()
|
||||
session = await client.resume_session(chat_id, resume_params)
|
||||
t_after_rpc = time.monotonic()
|
||||
|
||||
self._emit_debug_log_sync(
|
||||
f"⏱️ [Profiling] Skills Resolve: {t_skills_elapsed:.3f}s | RPC resume_session: {t_after_rpc - t_before_rpc:.3f}s",
|
||||
__event_call__,
|
||||
debug_enabled=True,
|
||||
)
|
||||
|
||||
await self._emit_debug_log(
|
||||
f"Successfully resumed session {chat_id} with model {real_model_id}",
|
||||
__event_call__,
|
||||
@@ -8242,7 +8288,15 @@ class Pipe:
|
||||
__event_call__,
|
||||
)
|
||||
|
||||
t_before_rpc2 = time.monotonic()
|
||||
session = await client.create_session(config=session_config)
|
||||
t_after_rpc2 = time.monotonic()
|
||||
|
||||
self._emit_debug_log_sync(
|
||||
f"⏱️ [Profiling] RPC create_session: {t_after_rpc2 - t_before_rpc2:.3f}s",
|
||||
__event_call__,
|
||||
debug_enabled=True,
|
||||
)
|
||||
|
||||
model_type_label = "BYOK" if is_byok_model else "Copilot"
|
||||
await self._emit_debug_log(
|
||||
@@ -8320,6 +8374,7 @@ class Pipe:
|
||||
debug_enabled=effective_debug,
|
||||
user_lang=user_lang,
|
||||
pending_embeds=pending_embeds,
|
||||
request_start_ts=request_start_ts,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
@@ -8367,6 +8422,7 @@ class Pipe:
|
||||
debug_enabled: bool = False,
|
||||
user_lang: str = "en-US",
|
||||
pending_embeds: List[dict] = None,
|
||||
request_start_ts: float = 0.0,
|
||||
) -> AsyncGenerator:
|
||||
"""
|
||||
Stream response from Copilot SDK, handling various event types.
|
||||
@@ -8384,8 +8440,13 @@ class Pipe:
|
||||
"idle_reached": False,
|
||||
"session_finalized": False,
|
||||
"turn_started": False,
|
||||
"turn_ended": False,
|
||||
"turn_started_ts": None,
|
||||
"turn_end_ts": None,
|
||||
"last_event_ts": stream_start_ts,
|
||||
"last_error_msg": "",
|
||||
"embeds_flushed": False,
|
||||
"reasoning_sent": False,
|
||||
"final_status_desc": self._get_translation(
|
||||
user_lang, "status_task_completed"
|
||||
),
|
||||
@@ -8396,6 +8457,7 @@ class Pipe:
|
||||
skill_invoked_in_turn = False
|
||||
last_wait_status_ts = 0.0
|
||||
wait_status_interval = 15.0
|
||||
queue_poll_interval = max(1.0, min(float(self.valves.TIMEOUT), 5.0))
|
||||
|
||||
IDLE_SENTINEL = object()
|
||||
ERROR_SENTINEL = object()
|
||||
@@ -8428,6 +8490,87 @@ class Pipe:
|
||||
# Try as object attribute
|
||||
return getattr(data, attr, default)
|
||||
|
||||
|
||||
async def _flush_pending_embeds(flush_source: str) -> List[str]:
|
||||
if state.get("embeds_flushed"):
|
||||
return []
|
||||
if not pending_embeds:
|
||||
return []
|
||||
|
||||
state["embeds_flushed"] = True
|
||||
artifacts_to_yield: List[str] = []
|
||||
embeds_to_process = list(pending_embeds)
|
||||
pending_embeds.clear()
|
||||
|
||||
self._emit_debug_log_sync(
|
||||
f"Flushing {len(embeds_to_process)} pending embed(s) via {flush_source}",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
|
||||
for embed in embeds_to_process:
|
||||
if not isinstance(embed, dict):
|
||||
continue
|
||||
|
||||
embed_type = embed.get("type")
|
||||
embed_filename = embed.get("filename", "")
|
||||
embed_content = embed.get("content", "")
|
||||
if embed_type not in {"richui", "artifacts"}:
|
||||
continue
|
||||
|
||||
if __event_emitter__:
|
||||
try:
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "status",
|
||||
"data": {
|
||||
"description": self._get_translation(
|
||||
user_lang,
|
||||
"status_publishing_file",
|
||||
filename=embed_filename,
|
||||
),
|
||||
"done": True,
|
||||
},
|
||||
}
|
||||
)
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "notification",
|
||||
"data": {
|
||||
"type": "success",
|
||||
"content": self._get_translation(
|
||||
user_lang, "publish_success"
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
except Exception as emit_error:
|
||||
self._emit_debug_log_sync(
|
||||
f"Embed status/notification emission failed via {flush_source}: {emit_error}",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
|
||||
if embed_type == "richui":
|
||||
if __event_emitter__:
|
||||
try:
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "embeds",
|
||||
"data": {"embeds": [embed_content]},
|
||||
}
|
||||
)
|
||||
except Exception as emit_error:
|
||||
self._emit_debug_log_sync(
|
||||
f"RichUI embed emission failed via {flush_source}: {emit_error}",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
elif embed_content:
|
||||
artifacts_to_yield.append(embed_content)
|
||||
|
||||
return artifacts_to_yield
|
||||
|
||||
def handler(event):
|
||||
"""
|
||||
Event handler following official SDK patterns.
|
||||
@@ -8509,7 +8652,27 @@ class Pipe:
|
||||
# === Turn Management Events ===
|
||||
if event_type == "assistant.turn_start":
|
||||
state["turn_started"] = True
|
||||
state["turn_ended"] = False
|
||||
state["turn_started_ts"] = state["last_event_ts"]
|
||||
state["turn_end_ts"] = None
|
||||
|
||||
# Calculate and log request latency (TTFT)
|
||||
if request_start_ts > 0:
|
||||
elapsed = state["last_event_ts"] - request_start_ts
|
||||
net_elapsed = ""
|
||||
if "send_start_ts" in state:
|
||||
net_roundtrip = state["last_event_ts"] - state["send_start_ts"]
|
||||
net_elapsed = f" | Cloud Network Trip: {net_roundtrip:.3f}s"
|
||||
self._emit_debug_log_sync(
|
||||
f"⏱️ [Copilot] Total TTFT: {elapsed:.3f}s{net_elapsed}",
|
||||
__event_call__,
|
||||
debug_enabled=True,
|
||||
)
|
||||
|
||||
state["message_stream_tail"] = ""
|
||||
state["reasoning_sent"] = False
|
||||
state["reasoning_stream_tail"] = ""
|
||||
state["content_sent"] = False
|
||||
self._emit_debug_log_sync(
|
||||
"Assistant Turn Started",
|
||||
__event_call__,
|
||||
@@ -8558,13 +8721,11 @@ class Pipe:
|
||||
if state["thinking_started"]:
|
||||
queue.put_nowait("\n</think>\n")
|
||||
state["thinking_started"] = False
|
||||
|
||||
queue.put_nowait(delta)
|
||||
|
||||
# === Complete Message Event (Non-streaming response) ===
|
||||
elif event_type == "assistant.message":
|
||||
# Handle complete message (when SDK returns full content instead of deltas)
|
||||
# IMPORTANT: Skip if we already received delta content to avoid duplication.
|
||||
# The SDK may emit both delta and full message events.
|
||||
if state["content_sent"]:
|
||||
return
|
||||
content = safe_get_data_attr(event, "content") or safe_get_data_attr(
|
||||
@@ -8572,10 +8733,8 @@ class Pipe:
|
||||
)
|
||||
if content:
|
||||
state["content_sent"] = True
|
||||
# Close current status
|
||||
if state.get("last_status_desc"):
|
||||
emit_status(state["last_status_desc"], is_done=True)
|
||||
|
||||
if state["thinking_started"]:
|
||||
queue.put_nowait("\n</think>\n")
|
||||
state["thinking_started"] = False
|
||||
@@ -8591,6 +8750,7 @@ class Pipe:
|
||||
if state["content_sent"]:
|
||||
return
|
||||
|
||||
state["reasoning_sent"] = True
|
||||
# Use UserValves or Global Valve for thinking visibility
|
||||
if not state["thinking_started"] and show_thinking:
|
||||
queue.put_nowait("<think>\n")
|
||||
@@ -8600,20 +8760,8 @@ class Pipe:
|
||||
|
||||
# === Complete Reasoning Event (Non-streaming reasoning) ===
|
||||
elif event_type == "assistant.reasoning":
|
||||
# Handle complete reasoning content
|
||||
reasoning = safe_get_data_attr(event, "content") or safe_get_data_attr(
|
||||
event, "reasoning"
|
||||
)
|
||||
if reasoning:
|
||||
# Suppress late-arriving reasoning if content already started
|
||||
if state["content_sent"]:
|
||||
return
|
||||
|
||||
if not state["thinking_started"] and show_thinking:
|
||||
queue.put_nowait("<think>\n")
|
||||
state["thinking_started"] = True
|
||||
if state["thinking_started"]:
|
||||
queue.put_nowait(reasoning)
|
||||
# 按同样架构要求,直接摒弃 reasoning 全包复读。
|
||||
pass
|
||||
|
||||
# === Skill Invocation Events ===
|
||||
elif event_type == "skill.invoked":
|
||||
@@ -8636,9 +8784,6 @@ class Pipe:
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
|
||||
# Make invocation visible in chat stream to avoid "skills loaded but feels unknown" confusion.
|
||||
queue.put_nowait(f"\n> 🧩 **{skill_status_text}**\n")
|
||||
|
||||
# Also send status bubble when possible.
|
||||
emit_status(skill_status_text, is_done=True)
|
||||
|
||||
@@ -8720,6 +8865,10 @@ class Pipe:
|
||||
}
|
||||
running_tool_calls.add(tool_call_id)
|
||||
|
||||
# Use the richer tool status text (which contains filenames/arguments context)
|
||||
# and ensure it's only printed to the stream ONCE per unique tool run.
|
||||
queue.put_nowait(f"\n> 🧩 **{tool_status_text}**\n")
|
||||
|
||||
# Close thinking tag if open before showing tool
|
||||
if state["thinking_started"]:
|
||||
queue.put_nowait("\n</think>\n")
|
||||
@@ -9051,6 +9200,9 @@ class Pipe:
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
running_tool_calls.clear()
|
||||
state["turn_started"] = False
|
||||
state["turn_ended"] = True
|
||||
state["turn_end_ts"] = state["last_event_ts"]
|
||||
|
||||
self._emit_debug_log_sync(
|
||||
"Assistant Turn Ended",
|
||||
@@ -9101,6 +9253,7 @@ class Pipe:
|
||||
# Session finished processing - signal to the generator loop to finalize
|
||||
state["idle_reached"] = True
|
||||
state["turn_started"] = False
|
||||
state["turn_ended"] = False
|
||||
try:
|
||||
queue.put_nowait(IDLE_SENTINEL)
|
||||
except:
|
||||
@@ -9108,10 +9261,12 @@ class Pipe:
|
||||
|
||||
elif event_type == "session.error":
|
||||
state["turn_started"] = False
|
||||
state["turn_ended"] = False
|
||||
# Fallback: clear orphaned tool call tracking on session error
|
||||
running_tool_calls.clear()
|
||||
error_msg = safe_get_data_attr(event, "message", "Unknown Error")
|
||||
state["last_error_msg"] = error_msg
|
||||
state["final_status_desc"] = error_msg
|
||||
emit_status(
|
||||
self._get_translation(
|
||||
user_lang, "status_session_error", error=error_msg
|
||||
@@ -9148,16 +9303,39 @@ class Pipe:
|
||||
)
|
||||
)
|
||||
|
||||
# Clear any stale handlers from abandoned generators before subscribing.
|
||||
# When OpenWebUI cancels a request without awaiting generator.aclose(), the
|
||||
# old handler stays registered on the session object. Clearing first ensures
|
||||
# we never have more than one active handler per stream.
|
||||
try:
|
||||
with session._event_handlers_lock:
|
||||
stale_count = len(session._event_handlers)
|
||||
if stale_count:
|
||||
session._event_handlers.clear()
|
||||
self._emit_debug_log_sync(
|
||||
f"Cleared {stale_count} stale handler(s) before subscribing",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
except Exception:
|
||||
pass # SDK internals may change; fail silently
|
||||
|
||||
unsubscribe = session.on(handler)
|
||||
|
||||
# Diagnostic: log handler and queue identity for duplicate-output debugging.
|
||||
# If you see multiple "[HandlerRegistered]" logs with different handler_id
|
||||
# values but the same session_id, multiple handlers are registered concurrently.
|
||||
self._emit_debug_log_sync(
|
||||
f"Subscribed to events. Sending request...",
|
||||
f"[HandlerRegistered] handler_id={id(handler)}, queue_id={id(queue)}, "
|
||||
f"session_id={getattr(session, 'session_id', '?')}, "
|
||||
f"total_handlers={len(getattr(session, '_event_handlers', set()))}",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
|
||||
# Use asyncio.create_task used to prevent session.send from blocking the stream reading
|
||||
# if the SDK implementation waits for completion.
|
||||
state["send_start_ts"] = time.monotonic()
|
||||
send_task = asyncio.create_task(session.send(send_payload))
|
||||
|
||||
def _handle_send_task_done(task: asyncio.Task):
|
||||
@@ -9172,6 +9350,8 @@ class Pipe:
|
||||
if not exc:
|
||||
return
|
||||
error_msg = f"Copilot send failed: {exc}"
|
||||
state["last_error_msg"] = error_msg
|
||||
state["final_status_desc"] = error_msg
|
||||
self._emit_debug_log_sync(
|
||||
error_msg,
|
||||
__event_call__,
|
||||
@@ -9236,13 +9416,14 @@ class Pipe:
|
||||
while not done.is_set():
|
||||
try:
|
||||
chunk = await asyncio.wait_for(
|
||||
queue.get(), timeout=float(self.valves.TIMEOUT)
|
||||
queue.get(), timeout=queue_poll_interval
|
||||
)
|
||||
if chunk is SENTINEL:
|
||||
done.set()
|
||||
break
|
||||
|
||||
if chunk is IDLE_SENTINEL:
|
||||
artifact_chunks = []
|
||||
# --- [FINAL STEP] Emit Rich UI Integrated View & Task Completion ---
|
||||
if __event_emitter__:
|
||||
try:
|
||||
@@ -9283,80 +9464,9 @@ class Pipe:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2. Emit UI components (richui or artifacts type)
|
||||
_idle_dbg = f"IDLE: pending_embeds count={len(pending_embeds) if pending_embeds else 0}, types={[e.get('type') for e in pending_embeds] if pending_embeds else []}"
|
||||
logger.info(f"[Copilot] {_idle_dbg}")
|
||||
if __event_call__:
|
||||
try:
|
||||
await __event_call__(
|
||||
{
|
||||
"type": "execute",
|
||||
"data": {
|
||||
"code": f'console.debug("%c[Copilot IDLE] {_idle_dbg}", "color: #f59e0b;");'
|
||||
},
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
if pending_embeds:
|
||||
for embed in pending_embeds:
|
||||
_embed_dbg = f"Processing embed type='{embed.get('type')}', filename='{embed.get('filename')}', content_len={len(embed.get('content', ''))}"
|
||||
logger.info(f"[Copilot] IDLE: {_embed_dbg}")
|
||||
if __event_call__:
|
||||
try:
|
||||
await __event_call__(
|
||||
{
|
||||
"type": "execute",
|
||||
"data": {
|
||||
"code": f'console.debug("%c[Copilot IDLE] {_embed_dbg}", "color: #f59e0b;");'
|
||||
},
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
if embed.get("type") in ["richui", "artifacts"]:
|
||||
# Status update
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "status",
|
||||
"data": {
|
||||
"description": self._get_translation(
|
||||
user_lang,
|
||||
"status_publishing_file",
|
||||
filename=embed["filename"],
|
||||
),
|
||||
"done": True,
|
||||
},
|
||||
}
|
||||
)
|
||||
# Success notification
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "notification",
|
||||
"data": {
|
||||
"type": "success",
|
||||
"content": self._get_translation(
|
||||
user_lang, "publish_success"
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if embed.get("type") == "richui":
|
||||
# Standard OpenWebUI Embed Structure: type: "embeds", data: {"embeds": [content]}
|
||||
await __event_emitter__(
|
||||
{
|
||||
"type": "embeds",
|
||||
"data": {
|
||||
"embeds": [embed["content"]]
|
||||
},
|
||||
}
|
||||
)
|
||||
elif embed.get("type") == "artifacts":
|
||||
# Directly yield the markdown block to the response stream.
|
||||
# This securely appends the code block to the final message,
|
||||
# tricking OpenWebUI into rendering it as an artifact seamlessly.
|
||||
yield embed["content"]
|
||||
artifact_chunks = await _flush_pending_embeds(
|
||||
"session.idle"
|
||||
)
|
||||
|
||||
# 3. LOCK internal status emission for background tasks
|
||||
# (Stray Task A from tool.execution_complete will now be discarded)
|
||||
@@ -9402,6 +9512,15 @@ class Pipe:
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
else:
|
||||
artifact_chunks = await _flush_pending_embeds(
|
||||
"session.idle.no_emitter"
|
||||
)
|
||||
|
||||
for artifact_chunk in artifact_chunks:
|
||||
if artifact_chunk:
|
||||
has_content = True
|
||||
yield artifact_chunk
|
||||
|
||||
done.set()
|
||||
break
|
||||
@@ -9409,6 +9528,12 @@ class Pipe:
|
||||
if chunk is ERROR_SENTINEL:
|
||||
# Extract error message if possible or use default
|
||||
error_desc = state.get("last_error_msg", "Error during processing")
|
||||
artifact_chunks = await _flush_pending_embeds("error")
|
||||
for artifact_chunk in artifact_chunks:
|
||||
if artifact_chunk:
|
||||
has_content = True
|
||||
yield artifact_chunk
|
||||
state["session_finalized"] = True
|
||||
if __event_emitter__:
|
||||
try:
|
||||
await __event_emitter__(
|
||||
@@ -9443,7 +9568,28 @@ class Pipe:
|
||||
|
||||
now_ts = time.monotonic()
|
||||
no_progress_timeout = min(float(self.valves.TIMEOUT), 90.0)
|
||||
time_since_last_event = now_ts - state.get("last_event_ts", stream_start_ts)
|
||||
time_since_last_event = now_ts - state.get(
|
||||
"last_event_ts", stream_start_ts
|
||||
)
|
||||
turn_end_ts = state.get("turn_end_ts") or now_ts
|
||||
time_since_turn_end = now_ts - turn_end_ts
|
||||
|
||||
if (
|
||||
state.get("turn_ended")
|
||||
and not state["idle_reached"]
|
||||
and time_since_turn_end >= 3.0
|
||||
):
|
||||
self._emit_debug_log_sync(
|
||||
"assistant.turn_end received but session.idle did not arrive in time; synthesizing idle for final flush",
|
||||
__event_call__,
|
||||
debug_enabled=debug_enabled,
|
||||
)
|
||||
state["idle_reached"] = True
|
||||
try:
|
||||
queue.put_nowait(IDLE_SENTINEL)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# --- Primary Stall Detection: no content started yet ---
|
||||
if (
|
||||
@@ -9467,6 +9613,8 @@ class Pipe:
|
||||
stall_msg = (
|
||||
f"Copilot stalled after assistant.turn_start. Ping failed ({ping_err}). The request was aborted."
|
||||
)
|
||||
state["last_error_msg"] = stall_msg
|
||||
state["final_status_desc"] = stall_msg
|
||||
self._emit_debug_log_sync(
|
||||
stall_msg,
|
||||
__event_call__,
|
||||
@@ -9518,6 +9666,8 @@ class Pipe:
|
||||
f"(limit: {int(absolute_inactivity_limit)}s). Ping failed ({ping_err}). "
|
||||
f"Force-aborting to prevent permanent hang."
|
||||
)
|
||||
state["last_error_msg"] = stall_msg
|
||||
state["final_status_desc"] = stall_msg
|
||||
self._emit_debug_log_sync(
|
||||
stall_msg,
|
||||
__event_call__,
|
||||
@@ -9678,10 +9828,10 @@ class Pipe:
|
||||
pass
|
||||
|
||||
unsubscribe()
|
||||
# In the current architecture, CopilotClient is a persistent singleton
|
||||
# shared across requests to eliminate the 1-2s startup latency.
|
||||
# Therefore, we NEVER stop the client here.
|
||||
# The session remains alive in the SDK for follow-up turns.
|
||||
try:
|
||||
await client.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# Triggering release after CI fix
|
||||
|
||||
@@ -9,7 +9,7 @@ SCRIPT_DIR = Path(__file__).parent
|
||||
ENV_FILE = SCRIPT_DIR / ".env"
|
||||
|
||||
URL = (
|
||||
"http://localhost:3000/api/v1/functions/id/github_copilot_official_sdk_pipe/update"
|
||||
"http://localhost:3003/api/v1/functions/id/github_copilot_official_sdk_pipe/update"
|
||||
)
|
||||
FILE_PATH = SCRIPT_DIR.parent / "plugins/pipes/github-copilot-sdk/github_copilot_sdk.py"
|
||||
|
||||
@@ -103,7 +103,7 @@ def deploy_pipe() -> None:
|
||||
print(
|
||||
f"⚠️ Update failed with status {response.status_code}, attempting to create instead..."
|
||||
)
|
||||
CREATE_URL = "http://localhost:3000/api/v1/functions/create"
|
||||
CREATE_URL = "http://localhost:3003/api/v1/functions/create"
|
||||
res_create = requests.post(
|
||||
CREATE_URL, headers=headers, data=json.dumps(payload)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user