diff --git a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py index b139ec0..c1e0d7a 100644 --- a/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py +++ b/plugins/pipes/github-copilot-sdk/github_copilot_sdk.py @@ -504,16 +504,17 @@ class Pipe: description="BYOK Wire API override.", ) - # ==================== Class-Level Caches ==================== - # These caches persist across requests since OpenWebUI may create - # new Pipe instances for each request. - # ============================================================= - _model_cache: List[dict] = [] # Model list cache + _shared_clients: Dict[str, Any] = {} # Map: token_hash -> CopilotClient + _shared_client_lock = asyncio.Lock() # Lock for thread-safe client lifecycle + _model_cache: List[dict] = [] # Model list cache (Memory only fallback) _standard_model_ids: set = set() # Track standard model IDs - _last_byok_config_hash: str = "" # Track BYOK config for cache invalidation - _last_model_cache_time: float = 0 # Timestamp of last model cache refresh + _last_byok_config_hash: str = "" # Track BYOK config (Status only) + _last_model_cache_time: float = 0 # Timestamp _env_setup_done = False # Track if env setup has been completed _last_update_check = 0 # Timestamp of last CLI update check + _discovery_cache: Dict[str, Dict[str, Any]] = ( + {} + ) # Map config_hash -> {"time": float, "models": list} def _is_version_at_least(self, target: str) -> bool: """Check if OpenWebUI version is at least the target version.""" @@ -3918,7 +3919,9 @@ class Pipe: return None return os.path.join(self._get_session_metadata_dir(chat_id), "plan.md") - def _persist_plan_text(self, chat_id: Optional[str], content: Optional[str]) -> None: + def _persist_plan_text( + self, chat_id: Optional[str], content: Optional[str] + ) -> None: """Persist plan text into the chat-specific session metadata directory.""" plan_path = self._get_plan_file_path(chat_id) if not plan_path or not isinstance(content, str): @@ -4908,7 +4911,6 @@ class Pipe: # Setup Python Virtual Environment to strictly protect system python if not os.path.exists(f"{venv_dir}/bin/activate"): - import subprocess import sys subprocess.run( @@ -5423,51 +5425,191 @@ class Pipe: logger.warning(f"[Copilot] Failed to parse UserValves: {e}") return self.UserValves() + def _format_model_item(self, m: Any, source: str = "copilot") -> Optional[dict]: + """Standardize model item into OpenWebUI pipe format.""" + try: + # 1. Resolve ID + mid = m.get("id") if isinstance(m, dict) else getattr(m, "id", "") + if not mid: + return None + + # 2. Extract Multiplier (billing info) + bill = ( + m.get("billing") if isinstance(m, dict) else getattr(m, "billing", {}) + ) + if hasattr(bill, "to_dict"): + bill = bill.to_dict() + mult = float(bill.get("multiplier", 1.0)) if isinstance(bill, dict) else 1.0 + + # 3. Clean ID and build display name + cid = self._clean_model_id(mid) + + # Format name based on source + if source == "byok": + display_name = f"-{cid}" + else: + display_name = f"-{cid} ({mult}x)" if mult > 0 else f"-🔥 {cid} (0x)" + + return { + "id": f"{self.id}-{mid}" if source == "copilot" else mid, + "name": display_name, + "multiplier": mult, + "raw_id": mid, + "source": source, + "provider": ( + self._get_provider_name(m) if source == "copilot" else "BYOK" + ), + } + except Exception as e: + logger.debug(f"[Pipes] Format error for model {m}: {e}") + return None + async def pipes(self, __user__: Optional[dict] = None) -> List[dict]: - """Dynamically fetch and filter model list.""" - if self.valves.DEBUG: - logger.info(f"[Pipes] Called with user context: {bool(__user__)}") - + """Model discovery: Fetches standard and BYOK models with config-isolated caching.""" uv = self._get_user_valves(__user__) - token = uv.GH_TOKEN - - # Determine check interval (24 hours default) - now = datetime.now().timestamp() - needs_setup = not self.__class__._env_setup_done or ( - now - self.__class__._last_update_check > 86400 - ) - - # 1. Environment Setup (Only if needed or not done) - if needs_setup: - self._setup_env(token=token) - self.__class__._last_update_check = now - else: - # Still inject token for BYOK real-time updates - if token: - os.environ["GH_TOKEN"] = os.environ["GITHUB_TOKEN"] = token - - # Get user info for isolation - user_data = ( - __user__[0] if isinstance(__user__, (list, tuple)) else (__user__ or {}) - ) - user_id = user_data.get("id") or user_data.get("user_id") or "default_user" - token = uv.GH_TOKEN or self.valves.GH_TOKEN - # Multiplier filtering: User can constrain, but not exceed global limit - global_max = self.valves.MAX_MULTIPLIER - user_max = uv.MAX_MULTIPLIER - if user_max is not None: - eff_max = min(float(user_max), float(global_max)) + now = datetime.now().timestamp() + cache_ttl = self.valves.MODEL_CACHE_TTL + + # Fingerprint the context so different users/tokens DO NOT evict each other + current_config_str = f"{token}|{uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL}|{uv.BYOK_API_KEY or self.valves.BYOK_API_KEY}|{self.valves.BYOK_BEARER_TOKEN}" + current_config_hash = hashlib.md5(current_config_str.encode()).hexdigest() + + # Dictionary-based Cache lookup (Solves the flapping bug) + if hasattr(self.__class__, "_discovery_cache"): + cached = self.__class__._discovery_cache.get(current_config_hash) + if cached and cache_ttl > 0 and (now - cached["time"]) <= cache_ttl: + self.__class__._model_cache = cached[ + "models" + ] # Update global for pipeline capability fallbacks + return self._apply_model_filters(cached["models"], uv) + + # 1. Core discovery logic (Always fresh) + results = await asyncio.gather( + self._fetch_standard_models(token, __user__), + self._fetch_byok_models(uv), + return_exceptions=True, + ) + + standard_results = results[0] if not isinstance(results[0], Exception) else [] + byok_results = results[1] if not isinstance(results[1], Exception) else [] + + # Merge all discovered models + all_models = standard_results + byok_results + + # Update local instance cache for validation purposes in _pipe_impl + self.__class__._model_cache = all_models + + # Update Config-isolated dict cache + if not hasattr(self.__class__, "_discovery_cache"): + self.__class__._discovery_cache = {} + + if all_models: + self.__class__._discovery_cache[current_config_hash] = { + "time": now, + "models": all_models, + } else: - eff_max = float(global_max) + # If discovery completely failed, cache for a very short duration (10s) to prevent spam but allow quick recovery + self.__class__._discovery_cache[current_config_hash] = { + "time": now - cache_ttl + 10, + "models": all_models, + } - if self.valves.DEBUG: - logger.info( - f"[Pipes] Multiplier Filter: User={user_max}, Global={global_max}, Effective={eff_max}" + # 2. Return results with real-time user-specific filtering + return self._apply_model_filters(all_models, uv) + + async def _get_client(self, token: str) -> Any: + """Get or create the persistent CopilotClient from the pool based on token.""" + if not token: + raise ValueError("GitHub Token is required to initialize CopilotClient") + + # Use an MD5 hash of the token as the key for the client pool + token_hash = hashlib.md5(token.encode()).hexdigest() + + async with self.__class__._shared_client_lock: + # Check if client exists for this token and is healthy + client = self.__class__._shared_clients.get(token_hash) + if client: + try: + state = client.get_state() + if state == "connected": + return client + if state == "error": + try: + await client.stop() + except: + pass + del self.__class__._shared_clients[token_hash] + except Exception: + del self.__class__._shared_clients[token_hash] + + # Ensure environment discovery is done + if not self.__class__._env_setup_done: + self._setup_env(token=token) + + # Build configuration and start persistent client + client_config = self._build_client_config(user_id=None, chat_id=None) + client_config["github_token"] = token + client_config["auto_start"] = True + + new_client = CopilotClient(client_config) + await new_client.start() + self.__class__._shared_clients[token_hash] = new_client + return new_client + + async def _fetch_standard_models(self, token: str, __user__: dict) -> List[dict]: + """Fetch models using the shared persistent client pool.""" + if not token: + return [] + + try: + client = await self._get_client(token) + raw = await client.list_models() + + models = [] + for m in raw if isinstance(raw, list) else []: + formatted = self._format_model_item(m, source="copilot") + if formatted: + models.append(formatted) + + models.sort(key=lambda x: (x.get("multiplier", 1.0), x.get("raw_id", ""))) + return models + except Exception as e: + logger.error(f"[Pipes] Standard fetch failed: {e}") + return [] + + def _apply_model_filters( + self, models: List[dict], uv: "Pipe.UserValves" + ) -> List[dict]: + """Apply user-defined multiplier and keyword exclusions to the model list.""" + if not models: + # Check if BYOK or GH_TOKEN is configured at all + has_byok_config = (uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL) and ( + uv.BYOK_API_KEY + or self.valves.BYOK_API_KEY + or uv.BYOK_BEARER_TOKEN + or self.valves.BYOK_BEARER_TOKEN ) + if not (uv.GH_TOKEN or self.valves.GH_TOKEN) and not has_byok_config: + return [ + { + "id": "no_credentials", + "name": "⚠️ No credentials configured. Please set GH_TOKEN or BYOK settings in Valves.", + } + ] + return [{"id": "warming_up", "name": "Waiting for model discovery..."}] + + # Resolve constraints + global_max = getattr(self.valves, "MAX_MULTIPLIER", 1.0) + user_max = getattr(uv, "MAX_MULTIPLIER", None) + eff_max = ( + min(float(user_max), float(global_max)) + if user_max is not None + else float(global_max) + ) - # Keyword filtering: combine global and user keywords ex_kw = [ k.strip().lower() for k in (self.valves.EXCLUDE_KEYWORDS + "," + uv.EXCLUDE_KEYWORDS).split( @@ -5475,189 +5617,31 @@ class Pipe: ) if k.strip() ] - - # --- NEW: CONFIG-AWARE CACHE INVALIDATION --- - # Calculate current config fingerprint to detect changes - current_config_str = f"{token}|{uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL}|{uv.BYOK_API_KEY or self.valves.BYOK_API_KEY}|{self.valves.BYOK_BEARER_TOKEN}" - current_config_hash = hashlib.md5(current_config_str.encode()).hexdigest() - - # TTL-based cache expiry - cache_ttl = self.valves.MODEL_CACHE_TTL - if ( - self._model_cache - and cache_ttl > 0 - and (now - self.__class__._last_model_cache_time) > cache_ttl - ): - if self.valves.DEBUG: - logger.info( - f"[Pipes] Model cache expired (TTL={cache_ttl}s). Invalidating." - ) - self.__class__._model_cache = [] - - if ( - self._model_cache - and self.__class__._last_byok_config_hash != current_config_hash - ): - if self.valves.DEBUG: - logger.info( - f"[Pipes] Configuration change detected. Invalidating model cache." - ) - self.__class__._model_cache = [] - self.__class__._last_byok_config_hash = current_config_hash - - if not self._model_cache: - # Update the hash when we refresh the cache - self.__class__._last_byok_config_hash = current_config_hash - if self.valves.DEBUG: - logger.info("[Pipes] Refreshing model cache...") - try: - # Use effective token for fetching. - # If COPILOT_CLI_PATH is missing (e.g. env cleared after worker restart), - # force a full re-discovery by resetting _env_setup_done first. - if not os.environ.get("COPILOT_CLI_PATH"): - self.__class__._env_setup_done = False - self._setup_env(token=token) - - # Fetch BYOK models if configured - byok = [] - effective_base_url = uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL - if effective_base_url and ( - uv.BYOK_API_KEY - or self.valves.BYOK_API_KEY - or uv.BYOK_BEARER_TOKEN - or self.valves.BYOK_BEARER_TOKEN - ): - byok = await self._fetch_byok_models(uv=uv) - - standard = [] - cli_path = os.environ.get("COPILOT_CLI_PATH", "") - cli_ready = bool(cli_path and os.path.exists(cli_path)) - if token and cli_ready: - client_config = { - "cli_path": cli_path, - "cwd": self._get_workspace_dir( - user_id=user_id, chat_id="listing" - ), - } - c = CopilotClient(client_config) - try: - await c.start() - raw = await c.list_models() - for m in raw if isinstance(raw, list) else []: - try: - mid = ( - m.get("id") - if isinstance(m, dict) - else getattr(m, "id", "") - ) - if not mid: - continue - - # Extract multiplier - bill = ( - m.get("billing") - if isinstance(m, dict) - else getattr(m, "billing", {}) - ) - if hasattr(bill, "to_dict"): - bill = bill.to_dict() - mult = ( - float(bill.get("multiplier", 1)) - if isinstance(bill, dict) - else 1.0 - ) - - cid = self._clean_model_id(mid) - standard.append( - { - "id": f"{self.id}-{mid}", - "name": ( - f"-{cid} ({mult}x)" - if mult > 0 - else f"-🔥 {cid} (0x)" - ), - "multiplier": mult, - "raw_id": mid, - "source": "copilot", - "provider": self._get_provider_name(m), - } - ) - except: - pass - standard.sort(key=lambda x: (x["multiplier"], x["raw_id"])) - self._standard_model_ids = {m["raw_id"] for m in standard} - except Exception as e: - logger.error(f"[Pipes] Error listing models: {e}") - finally: - await c.stop() - elif token and self.valves.DEBUG: - logger.info( - "[Pipes] Copilot CLI not ready during listing. Skip standard model probe to avoid blocking startup." - ) - - self._model_cache = standard + byok - self.__class__._last_model_cache_time = now - if not self._model_cache: - has_byok = bool( - (uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL) - and ( - uv.BYOK_API_KEY - or self.valves.BYOK_API_KEY - or uv.BYOK_BEARER_TOKEN - or self.valves.BYOK_BEARER_TOKEN - ) - ) - if not token and not has_byok: - return [ - { - "id": "no_token", - "name": "⚠️ No credentials configured. Please set GH_TOKEN or BYOK settings in Valves.", - } - ] - return [ - { - "id": "warming_up", - "name": "Copilot CLI is preparing in background. Please retry in a moment.", - } - ] - except Exception as e: - return [{"id": "error", "name": f"Error: {e}"}] - - # Final pass filtering from cache (applied on every request) res = [] - # Use a small epsilon for float comparison to avoid precision issues (e.g. 0.33 vs 0.33000001) epsilon = 0.0001 - for m in self._model_cache: - # 1. Keyword filter + for m in models: mid = (m.get("raw_id") or m.get("id", "")).lower() mname = m.get("name", "").lower() + + # Filter by Keyword if any(kw in mid or kw in mname for kw in ex_kw): continue - # 2. Multiplier filter (only for standard Copilot models) + # Filter by Multiplier (Copilot source only) if m.get("source") == "copilot": - m_mult = float(m.get("multiplier", 0)) - if m_mult > (eff_max + epsilon): - if self.valves.DEBUG: - logger.debug( - f"[Pipes] Filtered {m.get('id')} (Mult: {m_mult} > {eff_max})" - ) + if float(m.get("multiplier", 1.0)) > (eff_max + epsilon): continue res.append(m) - return res if res else [{"id": "none", "name": "No models matched filters"}] - - async def _get_client(self): - """Helper to get or create a CopilotClient instance.""" - client_config = {} - if os.environ.get("COPILOT_CLI_PATH"): - client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"] - - client = CopilotClient(client_config) - await client.start() - return client + return ( + res + if res + else [ + {"id": "none", "name": "No models matched your current Valve filters"} + ] + ) def _setup_env( self, @@ -5666,7 +5650,7 @@ class Pipe: token: str = None, enable_mcp: bool = True, ): - """Setup environment variables and resolve Copilot CLI path from SDK bundle.""" + """Setup environment variables and resolve the deterministic Copilot CLI path.""" # 1. Real-time Token Injection (Always updates on each call) effective_token = token or self.valves.GH_TOKEN @@ -5674,42 +5658,30 @@ class Pipe: os.environ["GH_TOKEN"] = os.environ["GITHUB_TOKEN"] = effective_token if self._env_setup_done: - if debug_enabled: - self._sync_mcp_config( - __event_call__, - debug_enabled, - enable_mcp=enable_mcp, - ) return - os.environ["COPILOT_AUTO_UPDATE"] = "false" + # 2. Deterministic CLI Path Discovery + # We prioritize the bundled CLI from the SDK to ensure version compatibility. + cli_path = "" + try: + from copilot.client import _get_bundled_cli_path - # 2. CLI Path Discovery (priority: env var > PATH > SDK bundle) - cli_path = os.environ.get("COPILOT_CLI_PATH", "") - found = bool(cli_path and os.path.exists(cli_path)) + cli_path = _get_bundled_cli_path() or "" + except ImportError: + pass - if not found: - sys_path = shutil.which("copilot") - if sys_path: - cli_path = sys_path - found = True + # Fallback to environment or system PATH only if bundled path is invalid + if not cli_path or not os.path.exists(cli_path): + cli_path = ( + os.environ.get("COPILOT_CLI_PATH") or shutil.which("copilot") or "" + ) - if not found: - try: - from copilot.client import _get_bundled_cli_path + cli_ready = bool(cli_path and os.path.exists(cli_path)) - bundled_path = _get_bundled_cli_path() - if bundled_path and os.path.exists(bundled_path): - cli_path = bundled_path - found = True - except ImportError: - pass - - # 3. Finalize - cli_ready = found + # 3. Finalize Environment if cli_ready: os.environ["COPILOT_CLI_PATH"] = cli_path - # Add the CLI's parent directory to PATH so subprocesses can invoke `copilot` directly + # Add to PATH for subprocess visibility cli_bin_dir = os.path.dirname(cli_path) current_path = os.environ.get("PATH", "") if cli_bin_dir and cli_bin_dir not in current_path.split(os.pathsep): @@ -5719,7 +5691,7 @@ class Pipe: self.__class__._last_update_check = datetime.now().timestamp() self._emit_debug_log_sync( - f"Environment setup complete. CLI ready={cli_ready}. Path: {cli_path}", + f"Deterministic Env Setup: CLI ready={cli_ready}, Path={cli_path}", __event_call__, debug_enabled=debug_enabled, ) @@ -5831,117 +5803,6 @@ class Pipe: return text_content, attachments - def _sync_copilot_config( - self, reasoning_effort: str, __event_call__=None, debug_enabled: bool = False - ): - """ - Dynamically update config.json if REASONING_EFFORT is set. - This provides a fallback if API injection is ignored by the server. - """ - if not reasoning_effort: - return - - effort = reasoning_effort - - try: - # Target dynamic config path - config_path = os.path.join(self._get_copilot_config_dir(), "config.json") - config_dir = os.path.dirname(config_path) - - # Only proceed if directory exists (avoid creating trash types of files if path is wrong) - if not os.path.exists(config_dir): - return - - data = {} - # Read existing config - if os.path.exists(config_path): - try: - with open(config_path, "r") as f: - data = json.load(f) - except Exception: - data = {} - - # Update if changed - current_val = data.get("reasoning_effort") - if current_val != effort: - data["reasoning_effort"] = effort - try: - with open(config_path, "w") as f: - json.dump(data, f, indent=4) - - self._emit_debug_log_sync( - f"Dynamically updated config.json: reasoning_effort='{effort}'", - __event_call__, - debug_enabled=debug_enabled, - ) - except Exception as e: - self._emit_debug_log_sync( - f"Failed to write config.json: {e}", - __event_call__, - debug_enabled=debug_enabled, - ) - except Exception as e: - self._emit_debug_log_sync( - f"Config sync check failed: {e}", - __event_call__, - debug_enabled=debug_enabled, - ) - - def _sync_mcp_config( - self, - __event_call__=None, - debug_enabled: bool = False, - enable_mcp: bool = True, - ): - """Sync MCP configuration to dynamic config.json.""" - path = os.path.join(self._get_copilot_config_dir(), "config.json") - - # If disabled, we should ensure the config doesn't contain stale MCP info - if not enable_mcp: - if os.path.exists(path): - try: - with open(path, "r") as f: - data = json.load(f) - if "mcp_servers" in data: - del data["mcp_servers"] - with open(path, "w") as f: - json.dump(data, f, indent=4) - self._emit_debug_log_sync( - "MCP disabled: Cleared MCP servers from config.json", - __event_call__, - debug_enabled, - ) - except: - pass - return - - mcp = self._parse_mcp_servers(__event_call__, enable_mcp=enable_mcp) - if not mcp: - return - try: - path = os.path.join(self._get_copilot_config_dir(), "config.json") - os.makedirs(os.path.dirname(path), exist_ok=True) - data = {} - if os.path.exists(path): - try: - with open(path, "r") as f: - data = json.load(f) - except: - pass - if json.dumps(data.get("mcp_servers"), sort_keys=True) != json.dumps( - mcp, sort_keys=True - ): - data["mcp_servers"] = mcp - with open(path, "w") as f: - json.dump(data, f, indent=4) - self._emit_debug_log_sync( - f"Synced {len(mcp)} MCP servers to config.json", - __event_call__, - debug_enabled, - ) - except: - pass - # ==================== Internal Implementation ==================== # _pipe_impl() contains the main request handling logic. # ================================================================ @@ -5993,6 +5854,7 @@ class Pipe: effective_debug = self.valves.DEBUG or user_valves.DEBUG effective_token = user_valves.GH_TOKEN or self.valves.GH_TOKEN + token = effective_token # For compatibility with _get_client(token) # Get Chat ID using improved helper chat_ctx = self._get_chat_context( @@ -6332,26 +6194,21 @@ class Pipe: else: is_byok_model = not has_multiplier and byok_active + # Mode Selection Info await self._emit_debug_log( f"Mode: {'BYOK' if is_byok_model else 'Standard'}, Reasoning: {is_reasoning}, Admin: {is_admin}", __event_call__, debug_enabled=effective_debug, ) - # Ensure we have the latest config (only for standard Copilot models) - if not is_byok_model: - self._sync_copilot_config(effective_reasoning_effort, __event_call__) - # Shared state for delayed HTML embeds (Premium Experience) pending_embeds = [] - # Initialize Client - client = CopilotClient( - self._build_client_config(user_id=user_id, chat_id=chat_id) - ) - should_stop_client = True + # Use Shared Persistent Client Pool (Token-aware) + client = await self._get_client(token) + should_stop_client = False # Never stop the shared singleton pool! try: - await client.start() + # Note: client is already started in _get_client # Initialize custom tools (Handles caching internally) custom_tools = await self._initialize_custom_tools( @@ -7831,7 +7688,7 @@ class Pipe: # We do not destroy session here to allow persistence, # but we must stop the client. await client.stop() - except Exception as e: + except Exception: pass