fix(copilot-sdk): implement dict-based isolated cache and optimize session config

- Fix model list flapping bug by utilizing dictionary-based '_discovery_cache' keyed by config hash instead of wiping a global list.

- Optimize performance by removing redundant disk IO 'config.json' syncing ('_sync_mcp_config' and '_sync_copilot_config'); SDK directly accepts params via 'SessionConfig'.

- Remove unused imports and variables based on flake8 lint rules.
This commit is contained in:
fujie
2026-03-07 22:13:58 +08:00
parent a777112417
commit f4f7b65792

View File

@@ -504,16 +504,17 @@ class Pipe:
description="BYOK Wire API override.",
)
# ==================== Class-Level Caches ====================
# These caches persist across requests since OpenWebUI may create
# new Pipe instances for each request.
# =============================================================
_model_cache: List[dict] = [] # Model list cache
_shared_clients: Dict[str, Any] = {} # Map: token_hash -> CopilotClient
_shared_client_lock = asyncio.Lock() # Lock for thread-safe client lifecycle
_model_cache: List[dict] = [] # Model list cache (Memory only fallback)
_standard_model_ids: set = set() # Track standard model IDs
_last_byok_config_hash: str = "" # Track BYOK config for cache invalidation
_last_model_cache_time: float = 0 # Timestamp of last model cache refresh
_last_byok_config_hash: str = "" # Track BYOK config (Status only)
_last_model_cache_time: float = 0 # Timestamp
_env_setup_done = False # Track if env setup has been completed
_last_update_check = 0 # Timestamp of last CLI update check
_discovery_cache: Dict[str, Dict[str, Any]] = (
{}
) # Map config_hash -> {"time": float, "models": list}
def _is_version_at_least(self, target: str) -> bool:
"""Check if OpenWebUI version is at least the target version."""
@@ -3918,7 +3919,9 @@ class Pipe:
return None
return os.path.join(self._get_session_metadata_dir(chat_id), "plan.md")
def _persist_plan_text(self, chat_id: Optional[str], content: Optional[str]) -> None:
def _persist_plan_text(
self, chat_id: Optional[str], content: Optional[str]
) -> None:
"""Persist plan text into the chat-specific session metadata directory."""
plan_path = self._get_plan_file_path(chat_id)
if not plan_path or not isinstance(content, str):
@@ -4908,7 +4911,6 @@ class Pipe:
# Setup Python Virtual Environment to strictly protect system python
if not os.path.exists(f"{venv_dir}/bin/activate"):
import subprocess
import sys
subprocess.run(
@@ -5423,51 +5425,191 @@ class Pipe:
logger.warning(f"[Copilot] Failed to parse UserValves: {e}")
return self.UserValves()
def _format_model_item(self, m: Any, source: str = "copilot") -> Optional[dict]:
"""Standardize model item into OpenWebUI pipe format."""
try:
# 1. Resolve ID
mid = m.get("id") if isinstance(m, dict) else getattr(m, "id", "")
if not mid:
return None
# 2. Extract Multiplier (billing info)
bill = (
m.get("billing") if isinstance(m, dict) else getattr(m, "billing", {})
)
if hasattr(bill, "to_dict"):
bill = bill.to_dict()
mult = float(bill.get("multiplier", 1.0)) if isinstance(bill, dict) else 1.0
# 3. Clean ID and build display name
cid = self._clean_model_id(mid)
# Format name based on source
if source == "byok":
display_name = f"-{cid}"
else:
display_name = f"-{cid} ({mult}x)" if mult > 0 else f"-🔥 {cid} (0x)"
return {
"id": f"{self.id}-{mid}" if source == "copilot" else mid,
"name": display_name,
"multiplier": mult,
"raw_id": mid,
"source": source,
"provider": (
self._get_provider_name(m) if source == "copilot" else "BYOK"
),
}
except Exception as e:
logger.debug(f"[Pipes] Format error for model {m}: {e}")
return None
async def pipes(self, __user__: Optional[dict] = None) -> List[dict]:
"""Dynamically fetch and filter model list."""
if self.valves.DEBUG:
logger.info(f"[Pipes] Called with user context: {bool(__user__)}")
"""Model discovery: Fetches standard and BYOK models with config-isolated caching."""
uv = self._get_user_valves(__user__)
token = uv.GH_TOKEN
# Determine check interval (24 hours default)
now = datetime.now().timestamp()
needs_setup = not self.__class__._env_setup_done or (
now - self.__class__._last_update_check > 86400
)
# 1. Environment Setup (Only if needed or not done)
if needs_setup:
self._setup_env(token=token)
self.__class__._last_update_check = now
else:
# Still inject token for BYOK real-time updates
if token:
os.environ["GH_TOKEN"] = os.environ["GITHUB_TOKEN"] = token
# Get user info for isolation
user_data = (
__user__[0] if isinstance(__user__, (list, tuple)) else (__user__ or {})
)
user_id = user_data.get("id") or user_data.get("user_id") or "default_user"
token = uv.GH_TOKEN or self.valves.GH_TOKEN
# Multiplier filtering: User can constrain, but not exceed global limit
global_max = self.valves.MAX_MULTIPLIER
user_max = uv.MAX_MULTIPLIER
if user_max is not None:
eff_max = min(float(user_max), float(global_max))
now = datetime.now().timestamp()
cache_ttl = self.valves.MODEL_CACHE_TTL
# Fingerprint the context so different users/tokens DO NOT evict each other
current_config_str = f"{token}|{uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL}|{uv.BYOK_API_KEY or self.valves.BYOK_API_KEY}|{self.valves.BYOK_BEARER_TOKEN}"
current_config_hash = hashlib.md5(current_config_str.encode()).hexdigest()
# Dictionary-based Cache lookup (Solves the flapping bug)
if hasattr(self.__class__, "_discovery_cache"):
cached = self.__class__._discovery_cache.get(current_config_hash)
if cached and cache_ttl > 0 and (now - cached["time"]) <= cache_ttl:
self.__class__._model_cache = cached[
"models"
] # Update global for pipeline capability fallbacks
return self._apply_model_filters(cached["models"], uv)
# 1. Core discovery logic (Always fresh)
results = await asyncio.gather(
self._fetch_standard_models(token, __user__),
self._fetch_byok_models(uv),
return_exceptions=True,
)
standard_results = results[0] if not isinstance(results[0], Exception) else []
byok_results = results[1] if not isinstance(results[1], Exception) else []
# Merge all discovered models
all_models = standard_results + byok_results
# Update local instance cache for validation purposes in _pipe_impl
self.__class__._model_cache = all_models
# Update Config-isolated dict cache
if not hasattr(self.__class__, "_discovery_cache"):
self.__class__._discovery_cache = {}
if all_models:
self.__class__._discovery_cache[current_config_hash] = {
"time": now,
"models": all_models,
}
else:
eff_max = float(global_max)
# If discovery completely failed, cache for a very short duration (10s) to prevent spam but allow quick recovery
self.__class__._discovery_cache[current_config_hash] = {
"time": now - cache_ttl + 10,
"models": all_models,
}
if self.valves.DEBUG:
logger.info(
f"[Pipes] Multiplier Filter: User={user_max}, Global={global_max}, Effective={eff_max}"
# 2. Return results with real-time user-specific filtering
return self._apply_model_filters(all_models, uv)
async def _get_client(self, token: str) -> Any:
"""Get or create the persistent CopilotClient from the pool based on token."""
if not token:
raise ValueError("GitHub Token is required to initialize CopilotClient")
# Use an MD5 hash of the token as the key for the client pool
token_hash = hashlib.md5(token.encode()).hexdigest()
async with self.__class__._shared_client_lock:
# Check if client exists for this token and is healthy
client = self.__class__._shared_clients.get(token_hash)
if client:
try:
state = client.get_state()
if state == "connected":
return client
if state == "error":
try:
await client.stop()
except:
pass
del self.__class__._shared_clients[token_hash]
except Exception:
del self.__class__._shared_clients[token_hash]
# Ensure environment discovery is done
if not self.__class__._env_setup_done:
self._setup_env(token=token)
# Build configuration and start persistent client
client_config = self._build_client_config(user_id=None, chat_id=None)
client_config["github_token"] = token
client_config["auto_start"] = True
new_client = CopilotClient(client_config)
await new_client.start()
self.__class__._shared_clients[token_hash] = new_client
return new_client
async def _fetch_standard_models(self, token: str, __user__: dict) -> List[dict]:
"""Fetch models using the shared persistent client pool."""
if not token:
return []
try:
client = await self._get_client(token)
raw = await client.list_models()
models = []
for m in raw if isinstance(raw, list) else []:
formatted = self._format_model_item(m, source="copilot")
if formatted:
models.append(formatted)
models.sort(key=lambda x: (x.get("multiplier", 1.0), x.get("raw_id", "")))
return models
except Exception as e:
logger.error(f"[Pipes] Standard fetch failed: {e}")
return []
def _apply_model_filters(
self, models: List[dict], uv: "Pipe.UserValves"
) -> List[dict]:
"""Apply user-defined multiplier and keyword exclusions to the model list."""
if not models:
# Check if BYOK or GH_TOKEN is configured at all
has_byok_config = (uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL) and (
uv.BYOK_API_KEY
or self.valves.BYOK_API_KEY
or uv.BYOK_BEARER_TOKEN
or self.valves.BYOK_BEARER_TOKEN
)
if not (uv.GH_TOKEN or self.valves.GH_TOKEN) and not has_byok_config:
return [
{
"id": "no_credentials",
"name": "⚠️ No credentials configured. Please set GH_TOKEN or BYOK settings in Valves.",
}
]
return [{"id": "warming_up", "name": "Waiting for model discovery..."}]
# Resolve constraints
global_max = getattr(self.valves, "MAX_MULTIPLIER", 1.0)
user_max = getattr(uv, "MAX_MULTIPLIER", None)
eff_max = (
min(float(user_max), float(global_max))
if user_max is not None
else float(global_max)
)
# Keyword filtering: combine global and user keywords
ex_kw = [
k.strip().lower()
for k in (self.valves.EXCLUDE_KEYWORDS + "," + uv.EXCLUDE_KEYWORDS).split(
@@ -5475,189 +5617,31 @@ class Pipe:
)
if k.strip()
]
# --- NEW: CONFIG-AWARE CACHE INVALIDATION ---
# Calculate current config fingerprint to detect changes
current_config_str = f"{token}|{uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL}|{uv.BYOK_API_KEY or self.valves.BYOK_API_KEY}|{self.valves.BYOK_BEARER_TOKEN}"
current_config_hash = hashlib.md5(current_config_str.encode()).hexdigest()
# TTL-based cache expiry
cache_ttl = self.valves.MODEL_CACHE_TTL
if (
self._model_cache
and cache_ttl > 0
and (now - self.__class__._last_model_cache_time) > cache_ttl
):
if self.valves.DEBUG:
logger.info(
f"[Pipes] Model cache expired (TTL={cache_ttl}s). Invalidating."
)
self.__class__._model_cache = []
if (
self._model_cache
and self.__class__._last_byok_config_hash != current_config_hash
):
if self.valves.DEBUG:
logger.info(
f"[Pipes] Configuration change detected. Invalidating model cache."
)
self.__class__._model_cache = []
self.__class__._last_byok_config_hash = current_config_hash
if not self._model_cache:
# Update the hash when we refresh the cache
self.__class__._last_byok_config_hash = current_config_hash
if self.valves.DEBUG:
logger.info("[Pipes] Refreshing model cache...")
try:
# Use effective token for fetching.
# If COPILOT_CLI_PATH is missing (e.g. env cleared after worker restart),
# force a full re-discovery by resetting _env_setup_done first.
if not os.environ.get("COPILOT_CLI_PATH"):
self.__class__._env_setup_done = False
self._setup_env(token=token)
# Fetch BYOK models if configured
byok = []
effective_base_url = uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL
if effective_base_url and (
uv.BYOK_API_KEY
or self.valves.BYOK_API_KEY
or uv.BYOK_BEARER_TOKEN
or self.valves.BYOK_BEARER_TOKEN
):
byok = await self._fetch_byok_models(uv=uv)
standard = []
cli_path = os.environ.get("COPILOT_CLI_PATH", "")
cli_ready = bool(cli_path and os.path.exists(cli_path))
if token and cli_ready:
client_config = {
"cli_path": cli_path,
"cwd": self._get_workspace_dir(
user_id=user_id, chat_id="listing"
),
}
c = CopilotClient(client_config)
try:
await c.start()
raw = await c.list_models()
for m in raw if isinstance(raw, list) else []:
try:
mid = (
m.get("id")
if isinstance(m, dict)
else getattr(m, "id", "")
)
if not mid:
continue
# Extract multiplier
bill = (
m.get("billing")
if isinstance(m, dict)
else getattr(m, "billing", {})
)
if hasattr(bill, "to_dict"):
bill = bill.to_dict()
mult = (
float(bill.get("multiplier", 1))
if isinstance(bill, dict)
else 1.0
)
cid = self._clean_model_id(mid)
standard.append(
{
"id": f"{self.id}-{mid}",
"name": (
f"-{cid} ({mult}x)"
if mult > 0
else f"-🔥 {cid} (0x)"
),
"multiplier": mult,
"raw_id": mid,
"source": "copilot",
"provider": self._get_provider_name(m),
}
)
except:
pass
standard.sort(key=lambda x: (x["multiplier"], x["raw_id"]))
self._standard_model_ids = {m["raw_id"] for m in standard}
except Exception as e:
logger.error(f"[Pipes] Error listing models: {e}")
finally:
await c.stop()
elif token and self.valves.DEBUG:
logger.info(
"[Pipes] Copilot CLI not ready during listing. Skip standard model probe to avoid blocking startup."
)
self._model_cache = standard + byok
self.__class__._last_model_cache_time = now
if not self._model_cache:
has_byok = bool(
(uv.BYOK_BASE_URL or self.valves.BYOK_BASE_URL)
and (
uv.BYOK_API_KEY
or self.valves.BYOK_API_KEY
or uv.BYOK_BEARER_TOKEN
or self.valves.BYOK_BEARER_TOKEN
)
)
if not token and not has_byok:
return [
{
"id": "no_token",
"name": "⚠️ No credentials configured. Please set GH_TOKEN or BYOK settings in Valves.",
}
]
return [
{
"id": "warming_up",
"name": "Copilot CLI is preparing in background. Please retry in a moment.",
}
]
except Exception as e:
return [{"id": "error", "name": f"Error: {e}"}]
# Final pass filtering from cache (applied on every request)
res = []
# Use a small epsilon for float comparison to avoid precision issues (e.g. 0.33 vs 0.33000001)
epsilon = 0.0001
for m in self._model_cache:
# 1. Keyword filter
for m in models:
mid = (m.get("raw_id") or m.get("id", "")).lower()
mname = m.get("name", "").lower()
# Filter by Keyword
if any(kw in mid or kw in mname for kw in ex_kw):
continue
# 2. Multiplier filter (only for standard Copilot models)
# Filter by Multiplier (Copilot source only)
if m.get("source") == "copilot":
m_mult = float(m.get("multiplier", 0))
if m_mult > (eff_max + epsilon):
if self.valves.DEBUG:
logger.debug(
f"[Pipes] Filtered {m.get('id')} (Mult: {m_mult} > {eff_max})"
)
if float(m.get("multiplier", 1.0)) > (eff_max + epsilon):
continue
res.append(m)
return res if res else [{"id": "none", "name": "No models matched filters"}]
async def _get_client(self):
"""Helper to get or create a CopilotClient instance."""
client_config = {}
if os.environ.get("COPILOT_CLI_PATH"):
client_config["cli_path"] = os.environ["COPILOT_CLI_PATH"]
client = CopilotClient(client_config)
await client.start()
return client
return (
res
if res
else [
{"id": "none", "name": "No models matched your current Valve filters"}
]
)
def _setup_env(
self,
@@ -5666,7 +5650,7 @@ class Pipe:
token: str = None,
enable_mcp: bool = True,
):
"""Setup environment variables and resolve Copilot CLI path from SDK bundle."""
"""Setup environment variables and resolve the deterministic Copilot CLI path."""
# 1. Real-time Token Injection (Always updates on each call)
effective_token = token or self.valves.GH_TOKEN
@@ -5674,42 +5658,30 @@ class Pipe:
os.environ["GH_TOKEN"] = os.environ["GITHUB_TOKEN"] = effective_token
if self._env_setup_done:
if debug_enabled:
self._sync_mcp_config(
__event_call__,
debug_enabled,
enable_mcp=enable_mcp,
)
return
os.environ["COPILOT_AUTO_UPDATE"] = "false"
# 2. Deterministic CLI Path Discovery
# We prioritize the bundled CLI from the SDK to ensure version compatibility.
cli_path = ""
try:
from copilot.client import _get_bundled_cli_path
# 2. CLI Path Discovery (priority: env var > PATH > SDK bundle)
cli_path = os.environ.get("COPILOT_CLI_PATH", "")
found = bool(cli_path and os.path.exists(cli_path))
cli_path = _get_bundled_cli_path() or ""
except ImportError:
pass
if not found:
sys_path = shutil.which("copilot")
if sys_path:
cli_path = sys_path
found = True
# Fallback to environment or system PATH only if bundled path is invalid
if not cli_path or not os.path.exists(cli_path):
cli_path = (
os.environ.get("COPILOT_CLI_PATH") or shutil.which("copilot") or ""
)
if not found:
try:
from copilot.client import _get_bundled_cli_path
cli_ready = bool(cli_path and os.path.exists(cli_path))
bundled_path = _get_bundled_cli_path()
if bundled_path and os.path.exists(bundled_path):
cli_path = bundled_path
found = True
except ImportError:
pass
# 3. Finalize
cli_ready = found
# 3. Finalize Environment
if cli_ready:
os.environ["COPILOT_CLI_PATH"] = cli_path
# Add the CLI's parent directory to PATH so subprocesses can invoke `copilot` directly
# Add to PATH for subprocess visibility
cli_bin_dir = os.path.dirname(cli_path)
current_path = os.environ.get("PATH", "")
if cli_bin_dir and cli_bin_dir not in current_path.split(os.pathsep):
@@ -5719,7 +5691,7 @@ class Pipe:
self.__class__._last_update_check = datetime.now().timestamp()
self._emit_debug_log_sync(
f"Environment setup complete. CLI ready={cli_ready}. Path: {cli_path}",
f"Deterministic Env Setup: CLI ready={cli_ready}, Path={cli_path}",
__event_call__,
debug_enabled=debug_enabled,
)
@@ -5831,117 +5803,6 @@ class Pipe:
return text_content, attachments
def _sync_copilot_config(
self, reasoning_effort: str, __event_call__=None, debug_enabled: bool = False
):
"""
Dynamically update config.json if REASONING_EFFORT is set.
This provides a fallback if API injection is ignored by the server.
"""
if not reasoning_effort:
return
effort = reasoning_effort
try:
# Target dynamic config path
config_path = os.path.join(self._get_copilot_config_dir(), "config.json")
config_dir = os.path.dirname(config_path)
# Only proceed if directory exists (avoid creating trash types of files if path is wrong)
if not os.path.exists(config_dir):
return
data = {}
# Read existing config
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
data = json.load(f)
except Exception:
data = {}
# Update if changed
current_val = data.get("reasoning_effort")
if current_val != effort:
data["reasoning_effort"] = effort
try:
with open(config_path, "w") as f:
json.dump(data, f, indent=4)
self._emit_debug_log_sync(
f"Dynamically updated config.json: reasoning_effort='{effort}'",
__event_call__,
debug_enabled=debug_enabled,
)
except Exception as e:
self._emit_debug_log_sync(
f"Failed to write config.json: {e}",
__event_call__,
debug_enabled=debug_enabled,
)
except Exception as e:
self._emit_debug_log_sync(
f"Config sync check failed: {e}",
__event_call__,
debug_enabled=debug_enabled,
)
def _sync_mcp_config(
self,
__event_call__=None,
debug_enabled: bool = False,
enable_mcp: bool = True,
):
"""Sync MCP configuration to dynamic config.json."""
path = os.path.join(self._get_copilot_config_dir(), "config.json")
# If disabled, we should ensure the config doesn't contain stale MCP info
if not enable_mcp:
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
if "mcp_servers" in data:
del data["mcp_servers"]
with open(path, "w") as f:
json.dump(data, f, indent=4)
self._emit_debug_log_sync(
"MCP disabled: Cleared MCP servers from config.json",
__event_call__,
debug_enabled,
)
except:
pass
return
mcp = self._parse_mcp_servers(__event_call__, enable_mcp=enable_mcp)
if not mcp:
return
try:
path = os.path.join(self._get_copilot_config_dir(), "config.json")
os.makedirs(os.path.dirname(path), exist_ok=True)
data = {}
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
except:
pass
if json.dumps(data.get("mcp_servers"), sort_keys=True) != json.dumps(
mcp, sort_keys=True
):
data["mcp_servers"] = mcp
with open(path, "w") as f:
json.dump(data, f, indent=4)
self._emit_debug_log_sync(
f"Synced {len(mcp)} MCP servers to config.json",
__event_call__,
debug_enabled,
)
except:
pass
# ==================== Internal Implementation ====================
# _pipe_impl() contains the main request handling logic.
# ================================================================
@@ -5993,6 +5854,7 @@ class Pipe:
effective_debug = self.valves.DEBUG or user_valves.DEBUG
effective_token = user_valves.GH_TOKEN or self.valves.GH_TOKEN
token = effective_token # For compatibility with _get_client(token)
# Get Chat ID using improved helper
chat_ctx = self._get_chat_context(
@@ -6332,26 +6194,21 @@ class Pipe:
else:
is_byok_model = not has_multiplier and byok_active
# Mode Selection Info
await self._emit_debug_log(
f"Mode: {'BYOK' if is_byok_model else 'Standard'}, Reasoning: {is_reasoning}, Admin: {is_admin}",
__event_call__,
debug_enabled=effective_debug,
)
# Ensure we have the latest config (only for standard Copilot models)
if not is_byok_model:
self._sync_copilot_config(effective_reasoning_effort, __event_call__)
# Shared state for delayed HTML embeds (Premium Experience)
pending_embeds = []
# Initialize Client
client = CopilotClient(
self._build_client_config(user_id=user_id, chat_id=chat_id)
)
should_stop_client = True
# Use Shared Persistent Client Pool (Token-aware)
client = await self._get_client(token)
should_stop_client = False # Never stop the shared singleton pool!
try:
await client.start()
# Note: client is already started in _get_client
# Initialize custom tools (Handles caching internally)
custom_tools = await self._initialize_custom_tools(
@@ -7831,7 +7688,7 @@ class Pipe:
# We do not destroy session here to allow persistence,
# but we must stop the client.
await client.stop()
except Exception as e:
except Exception:
pass