diff --git a/.agent/learnings/github-copilot-sdk-stream-finalization.md b/.agent/learnings/github-copilot-sdk-stream-finalization.md new file mode 100644 index 0000000..50ec820 --- /dev/null +++ b/.agent/learnings/github-copilot-sdk-stream-finalization.md @@ -0,0 +1,25 @@ +# GitHub Copilot SDK Stream Finalization + +> Discovered: 2026-03-20 + +## Context +Applies to `plugins/pipes/github-copilot-sdk/github_copilot_sdk.py` when streaming assistant output, handling `pending_embeds`, and waiting for `session.idle`. + +## Finding +Two non-obvious issues can make the pipe feel permanently stuck even when useful work already finished: + +1. If the main `queue.get()` wait uses the full user-configured `TIMEOUT` (for example 300s), watchdog logic, "still working" status updates, and synthetic finalization checks only wake up at that same coarse interval. +2. If `pending_embeds` are flushed only in the `session.idle` branch, any timeout/error/missing-idle path can lose already-prepared embeds even though file publishing itself succeeded. + +## Solution / Pattern +- Keep the *inactivity limit* controlled by `TIMEOUT`, but poll the local stream queue on a short fixed cadence (for example max 5s) so watchdogs and fallback finalization stay responsive. +- Track `assistant.turn_end`; if `session.idle` does not arrive shortly afterward, synthesize finalization instead of waiting for the full inactivity timeout. +- Flush `pending_embeds` exactly once via a shared helper that can run from both normal idle finalization and error/timeout finalization paths. +- For streamed text/reasoning, use conservative overlap trimming: only strip an overlapping prefix when the incoming chunk still contains new suffix content. Do not drop fully repeated chunks blindly, or legitimate repeated text can be corrupted. + +## Gotchas +- RichUI embed success and streamed-text success are separate paths; a file can be published correctly while chat output still hangs or duplicates. +- If `assistant.reasoning_delta` is streamed, the later complete `assistant.reasoning` event must be suppressed just like `assistant.message`, or the thinking block can duplicate. + +## πŸ› οΈ Update 2026-03-21 +- **Fixed Stream Duplication**: Fixed text stream overlays (e.g., `πŸŽ‰ 删 πŸŽ‰ εˆ ι™€ζˆεŠŸ`) when resuming conversation session. Strictly applied `_dedupe_stream_chunk(delta, "message_stream_tail")` inside `assistant.message_delta` event handler to prevent concurrent history re-play or multiple stream delivery bug overlays, solving previous gaps in the deployment pipeline. diff --git a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py index 7811bb0..ad14781 100644 --- a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py +++ b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py @@ -124,13 +124,30 @@ RICHUI_BRIDGE_SCRIPT = r""" return null; } + function classHasThemeToken(className, token) { + return String(className || '').indexOf(token) !== -1; + } + function applyTheme(theme) { if (theme !== 'dark' && theme !== 'light') { return; } + var isDark = theme === 'dark'; + var shouldMirrorTheme = Boolean(getParentDocument()); + if ( + root.getAttribute('data-openwebui-applied-theme') === theme && + (!shouldMirrorTheme || + (root.getAttribute('data-theme') === theme && + root.classList.contains('dark') === isDark)) && + String(root.style.colorScheme || '') === theme + ) { + return; + } root.setAttribute('data-openwebui-applied-theme', theme); - root.setAttribute('data-theme', theme); - root.classList.toggle('dark', theme === 'dark'); + if (shouldMirrorTheme) { + root.setAttribute('data-theme', theme); + root.classList.toggle('dark', isDark); + } root.style.colorScheme = theme; } @@ -187,66 +204,57 @@ RICHUI_BRIDGE_SCRIPT = r""" return null; } - function getThemeFromDocument(doc, options) { + function getThemeFromDocument(doc) { if (!doc) { return null; } try { - var useHtmlSignals = !options || options.useHtmlSignals !== false; var html = doc.documentElement; var body = doc.body; - var htmlClass = useHtmlSignals && html ? String(html.className || '') : ''; + var htmlClass = html ? String(html.className || '') : ''; var bodyClass = body ? String(body.className || '') : ''; - var htmlDataTheme = - useHtmlSignals && html ? html.getAttribute('data-theme') : ''; - var bodyDataTheme = body ? body.getAttribute('data-theme') : ''; + var htmlDataTheme = html ? html.getAttribute('data-theme') : ''; if ( htmlDataTheme === 'dark' || - bodyDataTheme === 'dark' || - htmlClass.indexOf('dark') !== -1 || - bodyClass.indexOf('dark') !== -1 + classHasThemeToken(bodyClass, 'dark') || + classHasThemeToken(htmlClass, 'dark') ) { return 'dark'; } if ( htmlDataTheme === 'light' || - bodyDataTheme === 'light' || - htmlClass.indexOf('light') !== -1 || - bodyClass.indexOf('light') !== -1 + classHasThemeToken(bodyClass, 'light') || + classHasThemeToken(htmlClass, 'light') ) { return 'light'; } - - var metaTheme = getThemeFromMeta(doc); - if (metaTheme) { - return metaTheme; - } - - var computedTarget = useHtmlSignals ? html || body : body || html; - if (computedTarget) { - var computedScheme = window.getComputedStyle(computedTarget).colorScheme; - if (computedScheme === 'dark' || computedScheme === 'light') { - return computedScheme; - } - } } catch (e) {} return null; } function resolveTheme() { - var parentTheme = getThemeFromDocument(getParentDocument(), { - useHtmlSignals: true, - }); - if (parentTheme) { - return parentTheme; + var parentDoc = getParentDocument(); + var parentMetaTheme = parentDoc ? getThemeFromMeta(parentDoc) : null; + if (parentMetaTheme) { + return parentMetaTheme; } - var localTheme = getThemeFromDocument(document, { - useHtmlSignals: false, - }); - if (localTheme) { - return localTheme; + var parentClassTheme = parentDoc ? getThemeFromDocument(parentDoc) : null; + if (parentClassTheme) { + return parentClassTheme; + } + + if (!parentDoc) { + var localMetaTheme = getThemeFromMeta(document); + if (localMetaTheme) { + return localMetaTheme; + } + + var localClassTheme = getThemeFromDocument(document); + if (localClassTheme) { + return localClassTheme; + } } try { @@ -366,26 +374,26 @@ function measureHeight() { } function attachThemeObservers() { - var parentDoc = getParentDocument(); - if (!window.MutationObserver || !parentDoc || parentThemeObserver) { + var themeDoc = getParentDocument() || document; + if (!window.MutationObserver || !themeDoc || parentThemeObserver) { return; } try { parentThemeObserver = new MutationObserver(syncTheme); - if (parentDoc.documentElement) { - parentThemeObserver.observe(parentDoc.documentElement, { + if (themeDoc.documentElement) { + parentThemeObserver.observe(themeDoc.documentElement, { attributes: true, - attributeFilter: ['class', 'data-theme', 'style'], + attributeFilter: ['class', 'data-theme'], }); } - if (parentDoc.body) { - parentThemeObserver.observe(parentDoc.body, { + if (themeDoc.body) { + parentThemeObserver.observe(themeDoc.body, { attributes: true, - attributeFilter: ['class', 'data-theme', 'style'], + attributeFilter: ['class', 'data-theme'], }); } - if (parentDoc.head) { - parentThemeObserver.observe(parentDoc.head, { + if (themeDoc.head) { + parentThemeObserver.observe(themeDoc.head, { childList: true, subtree: true, attributes: true, @@ -3027,6 +3035,8 @@ class Pipe: __message_id__: Optional[str] = None, ): """Initialize custom tools based on configuration""" + import time + t_start = time.perf_counter() # 1. Determine effective settings (User override > Global) uv = self._get_user_valves(__user__) enable_tools = uv.ENABLE_OPENWEBUI_TOOLS @@ -3057,8 +3067,11 @@ class Pipe: if todo_widget_tool: final_tools.append(todo_widget_tool) + t_base_tools_done = time.perf_counter() + # 3. If all OpenWebUI tool types are disabled, skip loading and return early if not enable_tools and not enable_openapi: + logger.info(f"[Perf] _initialize_custom_tools (fast return): {(time.perf_counter() - t_start)*1000:.2f}ms") return final_tools # 4. Extract chat-level tool selection (P4: user selection from Chat UI) @@ -3067,6 +3080,7 @@ class Pipe: ) # 5. Load OpenWebUI tools dynamically (always fresh, no cache) + t_dyn_tools_start = time.perf_counter() openwebui_tools = await self._load_openwebui_tools( body=body, __user__=__user__, @@ -3085,6 +3099,7 @@ class Pipe: __chat_id__=__chat_id__, __message_id__=__message_id__, ) + t_dyn_tools_done = time.perf_counter() if openwebui_tools: tool_names = [t.name for t in openwebui_tools] @@ -3101,6 +3116,9 @@ class Pipe: final_tools.extend(openwebui_tools) + t_total = time.perf_counter() - t_start + logger.info(f"[Perf] _initialize_custom_tools total: {t_total*1000:.2f}ms (Base tools: {(t_base_tools_done-t_start)*1000:.2f}ms, Dyn Load: {(t_dyn_tools_done-t_dyn_tools_start)*1000:.2f}ms)") + return final_tools def _get_render_todo_widget_tool( @@ -4900,9 +4918,13 @@ class Pipe: __event_call__, ) + import time + t_get_opw_start = time.perf_counter() tools_dict = await get_openwebui_tools( tool_request, tool_ids, user, extra_params ) + t_get_opw_done = time.perf_counter() + logger.info(f"[Perf] get_openwebui_tools ({len(tool_ids)} IDs): {(t_get_opw_done-t_get_opw_start)*1000:.2f}ms") if self.valves.DEBUG: if tools_dict: @@ -5917,7 +5939,8 @@ class Pipe: --blk-c:#dc2626; --blk-b:#fef2f2;--blk-bd:#fecaca; --dot:#6366f1; }} - html.dark{{ + html.dark, + html[data-openwebui-applied-theme="dark"]{{ --bg:#1e1e2e;--bd:#313244;--tx:#cdd6f4;--mu:#6c7086;--row-h:#232336; --done-c:#a6e3a1;--done-b:#1e3a2a;--done-bd:#2d5c3e; --prog-c:#89b4fa;--prog-b:#1a2a4a;--prog-bd:#2a4070; @@ -7641,6 +7664,7 @@ class Pipe: __chat_id__: Optional[str] = None, __message_id__: Optional[str] = None, ) -> Union[str, AsyncGenerator]: + request_start_ts = time.monotonic() # --- PROBE LOG --- if __event_call__: await self._emit_debug_log( @@ -8030,13 +8054,20 @@ class Pipe: # Shared state for delayed HTML embeds (Premium Experience) pending_embeds = [] - # Use Shared Persistent Client Pool (Token-aware) - client = await self._get_client(token) - should_stop_client = False # Never stop the shared singleton pool! + # ==================== REVERT TO 0.9.1 EPHEMERAL CLIENT ==================== + t_before_client = time.monotonic() + client_config = self._build_client_config( + user_id=user_id, chat_id=chat_id, token=effective_token + ) + client_config["github_token"] = effective_token + client = CopilotClient(client_config) + should_stop_client = True try: - # Note: client is already started in _get_client + await client.start() + t_after_client = time.monotonic() # Initialize custom tools (Handles caching internally) + t_before_tools = time.monotonic() custom_tools = await self._initialize_custom_tools( body=body, __user__=__user__, @@ -8054,6 +8085,8 @@ class Pipe: __chat_id__=__chat_id__, __message_id__=__message_id__, ) + t_after_tools = time.monotonic() + if custom_tools: await self._emit_debug_log( f"Enabled {len(custom_tools)} tools (Custom/Built-in)", @@ -8061,9 +8094,21 @@ class Pipe: ) # Check MCP Servers + t_before_mcp = time.monotonic() mcp_servers = self._parse_mcp_servers( __event_call__, enable_mcp=effective_mcp, chat_tool_ids=chat_tool_ids ) + t_after_mcp = time.monotonic() + + # Emit Profiling Log to Frontend + self._emit_debug_log_sync( + f"⏱️ [Profiling] Client.start: {t_after_client - t_before_client:.3f}s | " + f"Tools Init: {t_after_tools - t_before_tools:.3f}s | " + f"MCP Parse: {t_after_mcp - t_before_mcp:.3f}s", + __event_call__, + debug_enabled=True, + ) + mcp_server_names = list(mcp_servers.keys()) if mcp_servers else [] if mcp_server_names: await self._emit_debug_log( @@ -8100,6 +8145,7 @@ class Pipe: provider_config["bearer_token"] = byok_bearer_token pass + if chat_id: try: resolved_cwd = self._get_workspace_dir( @@ -8125,17 +8171,14 @@ class Pipe: # Re-use mapping logic or just pass it through resume_params["reasoning_effort"] = effective_reasoning_effort - mcp_servers = self._parse_mcp_servers( - __event_call__, - enable_mcp=effective_mcp, - chat_tool_ids=chat_tool_ids, - ) + # Use already parsed mcp_servers to avoid redundant heavy IO if mcp_servers: resume_params["mcp_servers"] = mcp_servers # Always None: let CLI built-ins (bash etc.) remain available. resume_params["available_tools"] = None + t_skills_start = time.monotonic() resume_params.update( self._resolve_session_skill_config( resolved_cwd=resolved_cwd, @@ -8144,24 +8187,18 @@ class Pipe: disabled_skills=effective_disabled_skills, ) ) - try: - skill_dirs_dbg = resume_params.get("skill_directories") or [] - if skill_dirs_dbg: - logger.info( - f"[Copilot] resume skill_directories={skill_dirs_dbg}" - ) + t_skills_elapsed = time.monotonic() - t_skills_start + + # Only run heavy IO skill debugging if debug is actually on + if effective_debug: + try: + skill_dirs_dbg = resume_params.get("skill_directories") or [] for sd in skill_dirs_dbg: path = Path(sd) - skill_md_count = sum( - 1 for p in path.glob("*/SKILL.md") if p.is_file() - ) - logger.info( - f"[Copilot] resume skill_dir check: {sd} exists={path.exists()} skill_md_count={skill_md_count}" - ) - except Exception as e: - logger.debug( - f"[Copilot] resume skill directory debug check failed: {e}" - ) + skill_md_count = sum(1 for p in path.glob("*/SKILL.md") if p.is_file()) + logger.info(f"[Copilot] resume skill_dir check: {sd} skill_md_count={skill_md_count}") + except: + pass # Always inject the latest system prompt in 'replace' mode # This handles both custom models and user-defined system messages @@ -8201,7 +8238,16 @@ class Pipe: debug_enabled=effective_debug, ) + t_before_rpc = time.monotonic() session = await client.resume_session(chat_id, resume_params) + t_after_rpc = time.monotonic() + + self._emit_debug_log_sync( + f"⏱️ [Profiling] Skills Resolve: {t_skills_elapsed:.3f}s | RPC resume_session: {t_after_rpc - t_before_rpc:.3f}s", + __event_call__, + debug_enabled=True, + ) + await self._emit_debug_log( f"Successfully resumed session {chat_id} with model {real_model_id}", __event_call__, @@ -8242,7 +8288,15 @@ class Pipe: __event_call__, ) + t_before_rpc2 = time.monotonic() session = await client.create_session(config=session_config) + t_after_rpc2 = time.monotonic() + + self._emit_debug_log_sync( + f"⏱️ [Profiling] RPC create_session: {t_after_rpc2 - t_before_rpc2:.3f}s", + __event_call__, + debug_enabled=True, + ) model_type_label = "BYOK" if is_byok_model else "Copilot" await self._emit_debug_log( @@ -8320,6 +8374,7 @@ class Pipe: debug_enabled=effective_debug, user_lang=user_lang, pending_embeds=pending_embeds, + request_start_ts=request_start_ts, ) else: try: @@ -8367,6 +8422,7 @@ class Pipe: debug_enabled: bool = False, user_lang: str = "en-US", pending_embeds: List[dict] = None, + request_start_ts: float = 0.0, ) -> AsyncGenerator: """ Stream response from Copilot SDK, handling various event types. @@ -8384,8 +8440,13 @@ class Pipe: "idle_reached": False, "session_finalized": False, "turn_started": False, + "turn_ended": False, "turn_started_ts": None, + "turn_end_ts": None, "last_event_ts": stream_start_ts, + "last_error_msg": "", + "embeds_flushed": False, + "reasoning_sent": False, "final_status_desc": self._get_translation( user_lang, "status_task_completed" ), @@ -8396,6 +8457,7 @@ class Pipe: skill_invoked_in_turn = False last_wait_status_ts = 0.0 wait_status_interval = 15.0 + queue_poll_interval = max(1.0, min(float(self.valves.TIMEOUT), 5.0)) IDLE_SENTINEL = object() ERROR_SENTINEL = object() @@ -8428,6 +8490,87 @@ class Pipe: # Try as object attribute return getattr(data, attr, default) + + async def _flush_pending_embeds(flush_source: str) -> List[str]: + if state.get("embeds_flushed"): + return [] + if not pending_embeds: + return [] + + state["embeds_flushed"] = True + artifacts_to_yield: List[str] = [] + embeds_to_process = list(pending_embeds) + pending_embeds.clear() + + self._emit_debug_log_sync( + f"Flushing {len(embeds_to_process)} pending embed(s) via {flush_source}", + __event_call__, + debug_enabled=debug_enabled, + ) + + for embed in embeds_to_process: + if not isinstance(embed, dict): + continue + + embed_type = embed.get("type") + embed_filename = embed.get("filename", "") + embed_content = embed.get("content", "") + if embed_type not in {"richui", "artifacts"}: + continue + + if __event_emitter__: + try: + await __event_emitter__( + { + "type": "status", + "data": { + "description": self._get_translation( + user_lang, + "status_publishing_file", + filename=embed_filename, + ), + "done": True, + }, + } + ) + await __event_emitter__( + { + "type": "notification", + "data": { + "type": "success", + "content": self._get_translation( + user_lang, "publish_success" + ), + }, + } + ) + except Exception as emit_error: + self._emit_debug_log_sync( + f"Embed status/notification emission failed via {flush_source}: {emit_error}", + __event_call__, + debug_enabled=debug_enabled, + ) + + if embed_type == "richui": + if __event_emitter__: + try: + await __event_emitter__( + { + "type": "embeds", + "data": {"embeds": [embed_content]}, + } + ) + except Exception as emit_error: + self._emit_debug_log_sync( + f"RichUI embed emission failed via {flush_source}: {emit_error}", + __event_call__, + debug_enabled=debug_enabled, + ) + elif embed_content: + artifacts_to_yield.append(embed_content) + + return artifacts_to_yield + def handler(event): """ Event handler following official SDK patterns. @@ -8509,7 +8652,27 @@ class Pipe: # === Turn Management Events === if event_type == "assistant.turn_start": state["turn_started"] = True + state["turn_ended"] = False state["turn_started_ts"] = state["last_event_ts"] + state["turn_end_ts"] = None + + # Calculate and log request latency (TTFT) + if request_start_ts > 0: + elapsed = state["last_event_ts"] - request_start_ts + net_elapsed = "" + if "send_start_ts" in state: + net_roundtrip = state["last_event_ts"] - state["send_start_ts"] + net_elapsed = f" | Cloud Network Trip: {net_roundtrip:.3f}s" + self._emit_debug_log_sync( + f"⏱️ [Copilot] Total TTFT: {elapsed:.3f}s{net_elapsed}", + __event_call__, + debug_enabled=True, + ) + + state["message_stream_tail"] = "" + state["reasoning_sent"] = False + state["reasoning_stream_tail"] = "" + state["content_sent"] = False self._emit_debug_log_sync( "Assistant Turn Started", __event_call__, @@ -8558,13 +8721,11 @@ class Pipe: if state["thinking_started"]: queue.put_nowait("\n\n") state["thinking_started"] = False + queue.put_nowait(delta) # === Complete Message Event (Non-streaming response) === elif event_type == "assistant.message": - # Handle complete message (when SDK returns full content instead of deltas) - # IMPORTANT: Skip if we already received delta content to avoid duplication. - # The SDK may emit both delta and full message events. if state["content_sent"]: return content = safe_get_data_attr(event, "content") or safe_get_data_attr( @@ -8572,10 +8733,8 @@ class Pipe: ) if content: state["content_sent"] = True - # Close current status if state.get("last_status_desc"): emit_status(state["last_status_desc"], is_done=True) - if state["thinking_started"]: queue.put_nowait("\n\n") state["thinking_started"] = False @@ -8591,6 +8750,7 @@ class Pipe: if state["content_sent"]: return + state["reasoning_sent"] = True # Use UserValves or Global Valve for thinking visibility if not state["thinking_started"] and show_thinking: queue.put_nowait("\n") @@ -8600,20 +8760,8 @@ class Pipe: # === Complete Reasoning Event (Non-streaming reasoning) === elif event_type == "assistant.reasoning": - # Handle complete reasoning content - reasoning = safe_get_data_attr(event, "content") or safe_get_data_attr( - event, "reasoning" - ) - if reasoning: - # Suppress late-arriving reasoning if content already started - if state["content_sent"]: - return - - if not state["thinking_started"] and show_thinking: - queue.put_nowait("\n") - state["thinking_started"] = True - if state["thinking_started"]: - queue.put_nowait(reasoning) + # ζŒ‰εŒζ ·ζžΆζž„θ¦ζ±‚οΌŒη›΄ζŽ₯ζ‘’εΌƒ reasoning ε…¨εŒ…ε€θ―»γ€‚ + pass # === Skill Invocation Events === elif event_type == "skill.invoked": @@ -8636,9 +8784,6 @@ class Pipe: debug_enabled=debug_enabled, ) - # Make invocation visible in chat stream to avoid "skills loaded but feels unknown" confusion. - queue.put_nowait(f"\n> 🧩 **{skill_status_text}**\n") - # Also send status bubble when possible. emit_status(skill_status_text, is_done=True) @@ -8720,6 +8865,10 @@ class Pipe: } running_tool_calls.add(tool_call_id) + # Use the richer tool status text (which contains filenames/arguments context) + # and ensure it's only printed to the stream ONCE per unique tool run. + queue.put_nowait(f"\n> 🧩 **{tool_status_text}**\n") + # Close thinking tag if open before showing tool if state["thinking_started"]: queue.put_nowait("\n\n") @@ -9051,6 +9200,9 @@ class Pipe: debug_enabled=debug_enabled, ) running_tool_calls.clear() + state["turn_started"] = False + state["turn_ended"] = True + state["turn_end_ts"] = state["last_event_ts"] self._emit_debug_log_sync( "Assistant Turn Ended", @@ -9101,6 +9253,7 @@ class Pipe: # Session finished processing - signal to the generator loop to finalize state["idle_reached"] = True state["turn_started"] = False + state["turn_ended"] = False try: queue.put_nowait(IDLE_SENTINEL) except: @@ -9108,10 +9261,12 @@ class Pipe: elif event_type == "session.error": state["turn_started"] = False + state["turn_ended"] = False # Fallback: clear orphaned tool call tracking on session error running_tool_calls.clear() error_msg = safe_get_data_attr(event, "message", "Unknown Error") state["last_error_msg"] = error_msg + state["final_status_desc"] = error_msg emit_status( self._get_translation( user_lang, "status_session_error", error=error_msg @@ -9148,16 +9303,39 @@ class Pipe: ) ) + # Clear any stale handlers from abandoned generators before subscribing. + # When OpenWebUI cancels a request without awaiting generator.aclose(), the + # old handler stays registered on the session object. Clearing first ensures + # we never have more than one active handler per stream. + try: + with session._event_handlers_lock: + stale_count = len(session._event_handlers) + if stale_count: + session._event_handlers.clear() + self._emit_debug_log_sync( + f"Cleared {stale_count} stale handler(s) before subscribing", + __event_call__, + debug_enabled=debug_enabled, + ) + except Exception: + pass # SDK internals may change; fail silently + unsubscribe = session.on(handler) + # Diagnostic: log handler and queue identity for duplicate-output debugging. + # If you see multiple "[HandlerRegistered]" logs with different handler_id + # values but the same session_id, multiple handlers are registered concurrently. self._emit_debug_log_sync( - f"Subscribed to events. Sending request...", + f"[HandlerRegistered] handler_id={id(handler)}, queue_id={id(queue)}, " + f"session_id={getattr(session, 'session_id', '?')}, " + f"total_handlers={len(getattr(session, '_event_handlers', set()))}", __event_call__, debug_enabled=debug_enabled, ) # Use asyncio.create_task used to prevent session.send from blocking the stream reading # if the SDK implementation waits for completion. + state["send_start_ts"] = time.monotonic() send_task = asyncio.create_task(session.send(send_payload)) def _handle_send_task_done(task: asyncio.Task): @@ -9172,6 +9350,8 @@ class Pipe: if not exc: return error_msg = f"Copilot send failed: {exc}" + state["last_error_msg"] = error_msg + state["final_status_desc"] = error_msg self._emit_debug_log_sync( error_msg, __event_call__, @@ -9236,13 +9416,14 @@ class Pipe: while not done.is_set(): try: chunk = await asyncio.wait_for( - queue.get(), timeout=float(self.valves.TIMEOUT) + queue.get(), timeout=queue_poll_interval ) if chunk is SENTINEL: done.set() break if chunk is IDLE_SENTINEL: + artifact_chunks = [] # --- [FINAL STEP] Emit Rich UI Integrated View & Task Completion --- if __event_emitter__: try: @@ -9283,80 +9464,9 @@ class Pipe: except Exception: pass - # 2. Emit UI components (richui or artifacts type) - _idle_dbg = f"IDLE: pending_embeds count={len(pending_embeds) if pending_embeds else 0}, types={[e.get('type') for e in pending_embeds] if pending_embeds else []}" - logger.info(f"[Copilot] {_idle_dbg}") - if __event_call__: - try: - await __event_call__( - { - "type": "execute", - "data": { - "code": f'console.debug("%c[Copilot IDLE] {_idle_dbg}", "color: #f59e0b;");' - }, - } - ) - except Exception: - pass - if pending_embeds: - for embed in pending_embeds: - _embed_dbg = f"Processing embed type='{embed.get('type')}', filename='{embed.get('filename')}', content_len={len(embed.get('content', ''))}" - logger.info(f"[Copilot] IDLE: {_embed_dbg}") - if __event_call__: - try: - await __event_call__( - { - "type": "execute", - "data": { - "code": f'console.debug("%c[Copilot IDLE] {_embed_dbg}", "color: #f59e0b;");' - }, - } - ) - except Exception: - pass - if embed.get("type") in ["richui", "artifacts"]: - # Status update - await __event_emitter__( - { - "type": "status", - "data": { - "description": self._get_translation( - user_lang, - "status_publishing_file", - filename=embed["filename"], - ), - "done": True, - }, - } - ) - # Success notification - await __event_emitter__( - { - "type": "notification", - "data": { - "type": "success", - "content": self._get_translation( - user_lang, "publish_success" - ), - }, - } - ) - - if embed.get("type") == "richui": - # Standard OpenWebUI Embed Structure: type: "embeds", data: {"embeds": [content]} - await __event_emitter__( - { - "type": "embeds", - "data": { - "embeds": [embed["content"]] - }, - } - ) - elif embed.get("type") == "artifacts": - # Directly yield the markdown block to the response stream. - # This securely appends the code block to the final message, - # tricking OpenWebUI into rendering it as an artifact seamlessly. - yield embed["content"] + artifact_chunks = await _flush_pending_embeds( + "session.idle" + ) # 3. LOCK internal status emission for background tasks # (Stray Task A from tool.execution_complete will now be discarded) @@ -9402,6 +9512,15 @@ class Pipe: __event_call__, debug_enabled=debug_enabled, ) + else: + artifact_chunks = await _flush_pending_embeds( + "session.idle.no_emitter" + ) + + for artifact_chunk in artifact_chunks: + if artifact_chunk: + has_content = True + yield artifact_chunk done.set() break @@ -9409,6 +9528,12 @@ class Pipe: if chunk is ERROR_SENTINEL: # Extract error message if possible or use default error_desc = state.get("last_error_msg", "Error during processing") + artifact_chunks = await _flush_pending_embeds("error") + for artifact_chunk in artifact_chunks: + if artifact_chunk: + has_content = True + yield artifact_chunk + state["session_finalized"] = True if __event_emitter__: try: await __event_emitter__( @@ -9443,7 +9568,28 @@ class Pipe: now_ts = time.monotonic() no_progress_timeout = min(float(self.valves.TIMEOUT), 90.0) - time_since_last_event = now_ts - state.get("last_event_ts", stream_start_ts) + time_since_last_event = now_ts - state.get( + "last_event_ts", stream_start_ts + ) + turn_end_ts = state.get("turn_end_ts") or now_ts + time_since_turn_end = now_ts - turn_end_ts + + if ( + state.get("turn_ended") + and not state["idle_reached"] + and time_since_turn_end >= 3.0 + ): + self._emit_debug_log_sync( + "assistant.turn_end received but session.idle did not arrive in time; synthesizing idle for final flush", + __event_call__, + debug_enabled=debug_enabled, + ) + state["idle_reached"] = True + try: + queue.put_nowait(IDLE_SENTINEL) + except Exception: + pass + continue # --- Primary Stall Detection: no content started yet --- if ( @@ -9467,6 +9613,8 @@ class Pipe: stall_msg = ( f"Copilot stalled after assistant.turn_start. Ping failed ({ping_err}). The request was aborted." ) + state["last_error_msg"] = stall_msg + state["final_status_desc"] = stall_msg self._emit_debug_log_sync( stall_msg, __event_call__, @@ -9518,6 +9666,8 @@ class Pipe: f"(limit: {int(absolute_inactivity_limit)}s). Ping failed ({ping_err}). " f"Force-aborting to prevent permanent hang." ) + state["last_error_msg"] = stall_msg + state["final_status_desc"] = stall_msg self._emit_debug_log_sync( stall_msg, __event_call__, @@ -9678,10 +9828,10 @@ class Pipe: pass unsubscribe() - # In the current architecture, CopilotClient is a persistent singleton - # shared across requests to eliminate the 1-2s startup latency. - # Therefore, we NEVER stop the client here. - # The session remains alive in the SDK for follow-up turns. + try: + await client.stop() + except Exception: + pass # Triggering release after CI fix diff --git a/scripts/deploy_pipe.py b/scripts/deploy_pipe.py index 056df07..1dc9a93 100644 --- a/scripts/deploy_pipe.py +++ b/scripts/deploy_pipe.py @@ -9,7 +9,7 @@ SCRIPT_DIR = Path(__file__).parent ENV_FILE = SCRIPT_DIR / ".env" URL = ( - "http://localhost:3000/api/v1/functions/id/github_copilot_official_sdk_pipe/update" + "http://localhost:3003/api/v1/functions/id/github_copilot_official_sdk_pipe/update" ) FILE_PATH = SCRIPT_DIR.parent / "plugins/pipes/github-copilot-sdk/github_copilot_sdk.py" @@ -103,7 +103,7 @@ def deploy_pipe() -> None: print( f"⚠️ Update failed with status {response.status_code}, attempting to create instead..." ) - CREATE_URL = "http://localhost:3000/api/v1/functions/create" + CREATE_URL = "http://localhost:3003/api/v1/functions/create" res_create = requests.post( CREATE_URL, headers=headers, data=json.dumps(payload) )