Files
Fu-Jie_openwebui-extensions/plugins/tools/openwebui-skills-manager/openwebui_skills_manager.py

1711 lines
74 KiB
Python
Raw Normal View History

"""
title: OpenWebUI Skills Manager Tool
author: Fu-Jie
author_url: https://github.com/Fu-Jie/openwebui-extensions
funding_url: https://github.com/open-webui
version: 0.3.0
openwebui_id: b4bce8e4-08e7-4f90-bea7-dc31d463a0bb
requirements:
description: Standalone OpenWebUI tool for managing native Workspace Skills (list/show/install/create/update/delete) for any model.
"""
import asyncio
import json
import logging
import re
import tempfile
import tarfile
import uuid
import zipfile
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
try:
from open_webui.models.skills import Skills, SkillForm, SkillMeta
except Exception:
Skills = None
SkillForm = None
SkillMeta = None
BASE_TRANSLATIONS = {
"status_listing": "Listing your skills...",
"status_showing": "Reading skill details...",
"status_installing": "Installing skill from URL...",
"status_installing_batch": "Installing {total} skill(s)...",
"status_discovering_skills": "Discovering skills in {url}...",
"status_detecting_repo_root": "Detected GitHub repo root: {url}. Auto-converting to discovery mode...",
"status_batch_duplicates_removed": "Removed {count} duplicate URL(s) from batch.",
"status_duplicate_skill_name": "Warning: Duplicate skill name '{name}' - {action} multiple times.",
"status_creating": "Creating skill...",
"status_updating": "Updating skill...",
"status_deleting": "Deleting skill...",
"status_done": "Done.",
"status_list_done": "Found {count} skills ({active_count} active).",
"status_show_done": "Loaded skill: {name}.",
"status_install_done": "Installed skill: {name}.",
"status_install_overwrite_done": "Installed by updating existing skill: {name}.",
"status_create_done": "Created skill: {name}.",
"status_create_overwrite_done": "Updated existing skill: {name}.",
"status_update_done": "Updated skill: {name}.",
"status_delete_done": "Deleted skill: {name}.",
"status_install_batch_done": "Batch install completed: {succeeded} succeeded, {failed} failed.",
"err_unavailable": "OpenWebUI Skills model is unavailable in this runtime.",
"err_user_required": "User context is required.",
"err_name_required": "Skill name is required.",
"err_not_found": "Skill not found.",
"err_no_update_fields": "No update fields provided.",
"err_url_required": "Skill URL is required.",
"err_install_fetch": "Failed to fetch skill content from URL.",
"err_install_parse": "Failed to parse skill package/content.",
"err_invalid_url": "Invalid URL. Only http(s) URLs are supported.",
"err_untrusted_domain": "Domain not in whitelist. Trusted domains: {domains}",
"msg_created": "Skill created successfully.",
"msg_updated": "Skill updated successfully.",
"msg_deleted": "Skill deleted successfully.",
"msg_installed": "Skill installed successfully.",
}
TRANSLATIONS = {
"en-US": BASE_TRANSLATIONS,
"zh-CN": {
"status_listing": "正在列出你的技能...",
"status_showing": "正在读取技能详情...",
"status_installing": "正在从 URL 安装技能...",
"status_installing_batch": "正在安装 {total} 个技能...",
"status_discovering_skills": "正在从 {url} 发现技能...",
"status_detecting_repo_root": "检测到 GitHub repo 根目录:{url}。自动转换为发现模式...",
"status_batch_duplicates_removed": "已从批量队列中移除 {count} 个重复 URL。",
"status_duplicate_skill_name": "警告:技能名称 '{name}' 重复 - 多次 {action}",
"status_creating": "正在创建技能...",
"status_updating": "正在更新技能...",
"status_deleting": "正在删除技能...",
"status_done": "已完成。",
"status_list_done": "已找到 {count} 个技能(启用 {active_count} 个)。",
"status_show_done": "已加载技能:{name}",
"status_install_done": "技能安装完成:{name}",
"status_install_overwrite_done": "已通过覆盖更新完成安装:{name}",
"status_create_done": "技能创建完成:{name}",
"status_create_overwrite_done": "已更新同名技能:{name}",
"status_update_done": "技能更新完成:{name}",
"status_delete_done": "技能删除完成:{name}",
"status_install_batch_done": "批量安装完成:成功 {succeeded} 个,失败 {failed} 个。",
"err_unavailable": "当前运行环境不可用 OpenWebUI Skills 模型。",
"err_user_required": "需要用户上下文。",
"err_name_required": "技能名称不能为空。",
"err_not_found": "未找到技能。",
"err_no_update_fields": "未提供可更新字段。",
"err_url_required": "技能 URL 不能为空。",
"err_install_fetch": "从 URL 获取技能内容失败。",
"err_install_parse": "解析技能包或内容失败。",
"err_invalid_url": "URL 无效,仅支持 http(s) 地址。",
"err_untrusted_domain": "域名不在白名单中。授信域名:{domains}",
"msg_created": "技能创建成功。",
"msg_updated": "技能更新成功。",
"msg_deleted": "技能删除成功。",
"msg_installed": "技能安装成功。",
},
"zh-TW": {
"status_listing": "正在列出你的技能...",
"status_showing": "正在讀取技能詳情...",
"status_installing": "正在從 URL 安裝技能...",
"status_installing_batch": "正在安裝 {total} 個技能...",
"status_discovering_skills": "正在從 {url} 發現技能...",
"status_detecting_repo_root": "偵測到 GitHub repo 根目錄:{url}。自動轉換為發現模式...",
"status_batch_duplicates_removed": "已從批次佇列中移除 {count} 個重複 URL。",
"status_duplicate_skill_name": "警告:技能名稱 '{name}' 重複 - 多次 {action}",
"status_creating": "正在建立技能...",
"status_updating": "正在更新技能...",
"status_deleting": "正在刪除技能...",
"status_done": "已完成。",
"status_list_done": "已找到 {count} 個技能(啟用 {active_count} 個)。",
"status_show_done": "已載入技能:{name}",
"status_install_done": "技能安裝完成:{name}",
"status_install_overwrite_done": "已透過覆蓋更新完成安裝:{name}",
"status_create_done": "技能建立完成:{name}",
"status_create_overwrite_done": "已更新同名技能:{name}",
"status_update_done": "技能更新完成:{name}",
"status_delete_done": "技能刪除完成:{name}",
"status_install_batch_done": "批次安裝完成:成功 {succeeded} 個,失敗 {failed} 個。",
"err_unavailable": "目前執行環境不可用 OpenWebUI Skills 模型。",
"err_user_required": "需要使用者上下文。",
"err_name_required": "技能名稱不能為空。",
"err_not_found": "未找到技能。",
"err_no_update_fields": "未提供可更新欄位。",
"err_url_required": "技能 URL 不能為空。",
"err_install_fetch": "從 URL 取得技能內容失敗。",
"err_install_parse": "解析技能包或內容失敗。",
"err_invalid_url": "URL 無效,僅支援 http(s) 位址。",
"msg_created": "技能建立成功。",
"msg_updated": "技能更新成功。",
"msg_deleted": "技能刪除成功。",
"msg_installed": "技能安裝成功。",
},
"zh-HK": {
"status_listing": "正在列出你的技能...",
"status_showing": "正在讀取技能詳情...",
"status_installing": "正在從 URL 安裝技能...",
"status_installing_batch": "正在安裝 {total} 個技能...",
"status_discovering_skills": "正在從 {url} 發現技能...",
"status_detecting_repo_root": "偵測到 GitHub repo 根目錄:{url}。自動轉換為發現模式...",
"status_batch_duplicates_removed": "已從批次佇列中移除 {count} 個重複 URL。",
"status_duplicate_skill_name": "警告:技能名稱 '{name}' 重複 - 多次 {action}",
"status_creating": "正在建立技能...",
"status_updating": "正在更新技能...",
"status_deleting": "正在刪除技能...",
"status_done": "已完成。",
"status_list_done": "已找到 {count} 個技能(啟用 {active_count} 個)。",
"status_show_done": "已載入技能:{name}",
"status_install_done": "技能安裝完成:{name}",
"status_install_overwrite_done": "已透過覆蓋更新完成安裝:{name}",
"status_create_done": "技能建立完成:{name}",
"status_create_overwrite_done": "已更新同名技能:{name}",
"status_update_done": "技能更新完成:{name}",
"status_delete_done": "技能刪除完成:{name}",
"status_install_batch_done": "批次安裝完成:成功 {succeeded} 個,失敗 {failed} 個。",
"err_unavailable": "目前執行環境不可用 OpenWebUI Skills 模型。",
"err_user_required": "需要使用者上下文。",
"err_name_required": "技能名稱不能為空。",
"err_not_found": "未找到技能。",
"err_no_update_fields": "未提供可更新欄位。",
"err_url_required": "技能 URL 不能為空。",
"err_install_fetch": "從 URL 取得技能內容失敗。",
"err_install_parse": "解析技能包或內容失敗。",
"err_invalid_url": "URL 無效,僅支援 http(s) 位址。",
"msg_created": "技能建立成功。",
"msg_updated": "技能更新成功。",
"msg_deleted": "技能刪除成功。",
"msg_installed": "技能安裝成功。",
},
"ja-JP": {
"status_listing": "スキル一覧を取得しています...",
"status_showing": "スキル詳細を読み込み中...",
"status_installing": "URL からスキルをインストール中...",
"status_installing_batch": "{total} 件のスキルをインストール中...",
"status_discovering_skills": "{url} からスキルを検出中...",
"status_detecting_repo_root": "GitHub リポジトリルートを検出しました: {url}。自動検出モードに変換しています...",
"status_batch_duplicates_removed": "バッチから {count} 個の重複 URL を削除しました。",
"status_duplicate_skill_name": "警告: スキル名 '{name}' の重複 - {action} が複数回実行されました。",
"status_creating": "スキルを作成中...",
"status_updating": "スキルを更新中...",
"status_deleting": "スキルを削除中...",
"status_done": "完了しました。",
"status_list_done": "{count} 件のスキルが見つかりました(有効: {active_count} 件)。",
"status_show_done": "スキルを読み込みました: {name}",
"status_install_done": "スキルをインストールしました: {name}",
"status_install_overwrite_done": "既存スキルを更新してインストールしました: {name}",
"status_create_done": "スキルを作成しました: {name}",
"status_create_overwrite_done": "同名スキルを更新しました: {name}",
"status_update_done": "スキルを更新しました: {name}",
"status_delete_done": "スキルを削除しました: {name}",
"status_install_batch_done": "一括インストール完了: 成功 {succeeded} 件、失敗 {failed} 件。",
"err_unavailable": "この実行環境では OpenWebUI Skills モデルを利用できません。",
"err_user_required": "ユーザーコンテキストが必要です。",
"err_name_required": "スキル名は必須です。",
"err_not_found": "スキルが見つかりません。",
"err_no_update_fields": "更新する項目が指定されていません。",
"err_url_required": "スキル URL は必須です。",
"err_install_fetch": "URL からスキル内容の取得に失敗しました。",
"err_install_parse": "スキルパッケージ/内容の解析に失敗しました。",
"err_invalid_url": "無効な URL です。http(s) のみサポートします。",
"msg_created": "スキルを作成しました。",
"msg_updated": "スキルを更新しました。",
"msg_deleted": "スキルを削除しました。",
"msg_installed": "スキルをインストールしました。",
},
"ko-KR": {
"status_listing": "스킬 목록을 불러오는 중...",
"status_showing": "스킬 상세 정보를 읽는 중...",
"status_installing": "URL에서 스킬 설치 중...",
"status_installing_batch": "스킬 {total}개를 설치하는 중...",
"status_discovering_skills": "{url}에서 스킬 발견 중...",
"status_detecting_repo_root": "GitHub 저장소 루트 검출: {url}. 자동 발견 모드로 변환 중...",
"status_batch_duplicates_removed": "배치에서 {count}개의 중복 URL을 제거했습니다.",
"status_duplicate_skill_name": "경고: 스킬 이름 '{name}'이 중복됨 - {action}이 여러 번 실행됨.",
"status_creating": "스킬 생성 중...",
"status_updating": "스킬 업데이트 중...",
"status_deleting": "스킬 삭제 중...",
"status_done": "완료되었습니다.",
"status_list_done": "스킬 {count}개를 찾았습니다(활성 {active_count}개).",
"status_show_done": "스킬을 불러왔습니다: {name}.",
"status_install_done": "스킬 설치 완료: {name}.",
"status_install_overwrite_done": "기존 스킬을 업데이트하여 설치 완료: {name}.",
"status_create_done": "스킬 생성 완료: {name}.",
"status_create_overwrite_done": "동일 이름 스킬 업데이트 완료: {name}.",
"status_update_done": "스킬 업데이트 완료: {name}.",
"status_delete_done": "스킬 삭제 완료: {name}.",
"status_install_batch_done": "일괄 설치 완료: 성공 {succeeded}개, 실패 {failed}개.",
"err_unavailable": "현재 런타임에서 OpenWebUI Skills 모델을 사용할 수 없습니다.",
"err_user_required": "사용자 컨텍스트가 필요합니다.",
"err_name_required": "스킬 이름은 필수입니다.",
"err_not_found": "스킬을 찾을 수 없습니다.",
"err_no_update_fields": "업데이트할 필드가 제공되지 않았습니다.",
"err_url_required": "스킬 URL이 필요합니다.",
"err_install_fetch": "URL에서 스킬 내용을 가져오지 못했습니다.",
"err_install_parse": "스킬 패키지/내용 파싱에 실패했습니다.",
"err_invalid_url": "잘못된 URL입니다. http(s)만 지원됩니다.",
"msg_created": "스킬이 생성되었습니다.",
"msg_updated": "스킬이 업데이트되었습니다.",
"msg_deleted": "스킬이 삭제되었습니다.",
"msg_installed": "스킬이 설치되었습니다.",
},
"fr-FR": {
"status_listing": "Liste des skills en cours...",
"status_showing": "Lecture des détails du skill...",
"status_installing": "Installation du skill depuis l'URL...",
"status_installing_batch": "Installation de {total} skill(s)...",
"status_discovering_skills": "Découverte de skills dans {url}...",
"status_detecting_repo_root": "Racine du dépôt GitHub détectée: {url}. Conversion en mode découverte automatique...",
"status_batch_duplicates_removed": "{count} URL en doublon(s) supprimée(s) du lot.",
"status_duplicate_skill_name": "Attention: Nom du skill '{name}' en doublon - {action} plusieurs fois.",
"status_creating": "Création du skill...",
"status_updating": "Mise à jour du skill...",
"status_deleting": "Suppression du skill...",
"status_done": "Terminé.",
"status_list_done": "{count} skills trouvés ({active_count} actifs).",
"status_show_done": "Skill chargé : {name}.",
"status_install_done": "Skill installé : {name}.",
"status_install_overwrite_done": "Skill installé en mettant à jour l'existant : {name}.",
"status_create_done": "Skill créé : {name}.",
"status_create_overwrite_done": "Skill existant mis à jour : {name}.",
"status_update_done": "Skill mis à jour : {name}.",
"status_delete_done": "Skill supprimé : {name}.",
"status_install_batch_done": "Installation en lot terminée : {succeeded} réussies, {failed} échouées.",
"err_unavailable": "Le modèle OpenWebUI Skills n'est pas disponible dans cet environnement.",
"err_user_required": "Le contexte utilisateur est requis.",
"err_name_required": "Le nom du skill est requis.",
"err_not_found": "Skill introuvable.",
"err_no_update_fields": "Aucun champ à mettre à jour n'a été fourni.",
"err_url_required": "L'URL du skill est requise.",
"err_install_fetch": "Échec de récupération du contenu du skill depuis l'URL.",
"err_install_parse": "Échec de l'analyse du package/contenu du skill.",
"err_invalid_url": "URL invalide. Seules les URL http(s) sont prises en charge.",
"msg_created": "Skill créé avec succès.",
"msg_updated": "Skill mis à jour avec succès.",
"msg_deleted": "Skill supprimé avec succès.",
"msg_installed": "Skill installé avec succès.",
},
"de-DE": {
"status_listing": "Deine Skills werden aufgelistet...",
"status_showing": "Skill-Details werden gelesen...",
"status_installing": "Skill wird von URL installiert...",
"status_installing_batch": "{total} Skill(s) werden installiert...",
"status_discovering_skills": "Suche nach Skills in {url}...",
"status_creating": "Skill wird erstellt...",
"status_updating": "Skill wird aktualisiert...",
"status_deleting": "Skill wird gelöscht...",
"status_done": "Fertig.",
"status_list_done": "{count} Skills gefunden ({active_count} aktiv).",
"status_show_done": "Skill geladen: {name}.",
"status_install_done": "Skill installiert: {name}.",
"status_install_overwrite_done": "Skill durch Aktualisierung installiert: {name}.",
"status_create_done": "Skill erstellt: {name}.",
"status_create_overwrite_done": "Bestehender Skill aktualisiert: {name}.",
"status_update_done": "Skill aktualisiert: {name}.",
"status_delete_done": "Skill gelöscht: {name}.",
"status_install_batch_done": "Batch-Installation abgeschlossen: {succeeded} erfolgreich, {failed} fehlgeschlagen.",
"err_unavailable": "Das OpenWebUI-Skills-Modell ist in dieser Laufzeit nicht verfügbar.",
"err_user_required": "Benutzerkontext ist erforderlich.",
"err_name_required": "Skill-Name ist erforderlich.",
"err_not_found": "Skill nicht gefunden.",
"err_no_update_fields": "Keine zu aktualisierenden Felder angegeben.",
"err_url_required": "Skill-URL ist erforderlich.",
"err_install_fetch": "Skill-Inhalt konnte nicht von URL geladen werden.",
"err_install_parse": "Skill-Paket/Inhalt konnte nicht geparst werden.",
"err_invalid_url": "Ungültige URL. Nur http(s)-URLs werden unterstützt.",
"msg_created": "Skill erfolgreich erstellt.",
"msg_updated": "Skill erfolgreich aktualisiert.",
"msg_deleted": "Skill erfolgreich gelöscht.",
"msg_installed": "Skill erfolgreich installiert.",
},
"es-ES": {
"status_listing": "Listando tus skills...",
"status_showing": "Leyendo detalles del skill...",
"status_installing": "Instalando skill desde URL...",
"status_installing_batch": "Instalando {total} skill(s)...",
"status_discovering_skills": "Descubriendo skills en {url}...",
"status_creating": "Creando skill...",
"status_updating": "Actualizando skill...",
"status_deleting": "Eliminando skill...",
"status_done": "Hecho.",
"status_list_done": "Se encontraron {count} skills ({active_count} activos).",
"status_show_done": "Skill cargado: {name}.",
"status_install_done": "Skill instalado: {name}.",
"status_install_overwrite_done": "Skill instalado actualizando el existente: {name}.",
"status_create_done": "Skill creado: {name}.",
"status_create_overwrite_done": "Skill existente actualizado: {name}.",
"status_update_done": "Skill actualizado: {name}.",
"status_delete_done": "Skill eliminado: {name}.",
"status_install_batch_done": "Instalación por lotes completada: {succeeded} correctas, {failed} fallidas.",
"err_unavailable": "El modelo OpenWebUI Skills no está disponible en este entorno.",
"err_user_required": "Se requiere contexto de usuario.",
"err_name_required": "Se requiere el nombre del skill.",
"err_not_found": "Skill no encontrado.",
"err_no_update_fields": "No se proporcionaron campos para actualizar.",
"err_url_required": "Se requiere la URL del skill.",
"err_install_fetch": "No se pudo obtener el contenido del skill desde la URL.",
"err_install_parse": "No se pudo analizar el paquete/contenido del skill.",
"err_invalid_url": "URL inválida. Solo se admiten URLs http(s).",
"msg_created": "Skill creado correctamente.",
"msg_updated": "Skill actualizado correctamente.",
"msg_deleted": "Skill eliminado correctamente.",
"msg_installed": "Skill instalado correctamente.",
},
"it-IT": {
"status_listing": "Elenco delle skill in corso...",
"status_showing": "Lettura dei dettagli della skill...",
"status_installing": "Installazione della skill da URL...",
"status_installing_batch": "Installazione di {total} skill in corso...",
"status_discovering_skills": "Scoperta di skills in {url}...",
"status_creating": "Creazione della skill...",
"status_updating": "Aggiornamento della skill...",
"status_deleting": "Eliminazione della skill...",
"status_done": "Fatto.",
"status_list_done": "Trovate {count} skill ({active_count} attive).",
"status_show_done": "Skill caricata: {name}.",
"status_install_done": "Skill installata: {name}.",
"status_install_overwrite_done": "Skill installata aggiornando l'esistente: {name}.",
"status_create_done": "Skill creata: {name}.",
"status_create_overwrite_done": "Skill esistente aggiornata: {name}.",
"status_update_done": "Skill aggiornata: {name}.",
"status_delete_done": "Skill eliminata: {name}.",
"status_install_batch_done": "Installazione batch completata: {succeeded} riuscite, {failed} non riuscite.",
"err_unavailable": "Il modello OpenWebUI Skills non è disponibile in questo runtime.",
"err_user_required": "È richiesto il contesto utente.",
"err_name_required": "Il nome della skill è obbligatorio.",
"err_not_found": "Skill non trovata.",
"err_no_update_fields": "Nessun campo da aggiornare fornito.",
"err_url_required": "L'URL della skill è obbligatoria.",
"err_install_fetch": "Impossibile recuperare il contenuto della skill dall'URL.",
"err_install_parse": "Impossibile analizzare il pacchetto/contenuto della skill.",
"err_invalid_url": "URL non valido. Sono supportati solo URL http(s).",
"msg_created": "Skill creata con successo.",
"msg_updated": "Skill aggiornata con successo.",
"msg_deleted": "Skill eliminata con successo.",
"msg_installed": "Skill installata con successo.",
},
"vi-VN": {
"status_listing": "Đang liệt kê kỹ năng của bạn...",
"status_showing": "Đang đọc chi tiết kỹ năng...",
"status_installing": "Đang cài đặt kỹ năng từ URL...",
"status_installing_batch": "Đang cài đặt {total} kỹ năng...",
"status_discovering_skills": "Đang phát hiện kỹ năng trong {url}...",
"status_creating": "Đang tạo kỹ năng...",
"status_updating": "Đang cập nhật kỹ năng...",
"status_deleting": "Đang xóa kỹ năng...",
"status_done": "Hoàn tất.",
"status_list_done": "Đã tìm thấy {count} kỹ năng ({active_count} đang bật).",
"status_show_done": "Đã tải kỹ năng: {name}.",
"status_install_done": "Cài đặt kỹ năng hoàn tất: {name}.",
"status_install_overwrite_done": "Đã cài đặt bằng cách cập nhật kỹ năng hiện có: {name}.",
"status_create_done": "Tạo kỹ năng hoàn tất: {name}.",
"status_create_overwrite_done": "Đã cập nhật kỹ năng cùng tên: {name}.",
"status_update_done": "Cập nhật kỹ năng hoàn tất: {name}.",
"status_delete_done": "Xóa kỹ năng hoàn tất: {name}.",
"status_install_batch_done": "Cài đặt hàng loạt hoàn tất: thành công {succeeded}, thất bại {failed}.",
"err_unavailable": "Mô hình OpenWebUI Skills không khả dụng trong môi trường hiện tại.",
"err_user_required": "Cần có ngữ cảnh người dùng.",
"err_name_required": "Tên kỹ năng là bắt buộc.",
"err_not_found": "Không tìm thấy kỹ năng.",
"err_no_update_fields": "Không có trường nào để cập nhật.",
"err_url_required": "URL kỹ năng là bắt buộc.",
"err_install_fetch": "Không thể tải nội dung kỹ năng từ URL.",
"err_install_parse": "Không thể phân tích gói/nội dung kỹ năng.",
"err_invalid_url": "URL không hợp lệ. Chỉ hỗ trợ URL http(s).",
"msg_created": "Tạo kỹ năng thành công.",
"msg_updated": "Cập nhật kỹ năng thành công.",
"msg_deleted": "Xóa kỹ năng thành công.",
"msg_installed": "Cài đặt kỹ năng thành công.",
},
"id-ID": {
"status_listing": "Sedang menampilkan daftar skill Anda...",
"status_showing": "Sedang membaca detail skill...",
"status_installing": "Sedang memasang skill dari URL...",
"status_installing_batch": "Sedang memasang {total} skill...",
"status_discovering_skills": "Sedang mencari skill di {url}...",
"status_creating": "Sedang membuat skill...",
"status_updating": "Sedang memperbarui skill...",
"status_deleting": "Sedang menghapus skill...",
"status_done": "Selesai.",
"status_list_done": "Ditemukan {count} skill ({active_count} aktif).",
"status_show_done": "Skill dimuat: {name}.",
"status_install_done": "Skill berhasil dipasang: {name}.",
"status_install_overwrite_done": "Skill dipasang dengan memperbarui skill yang ada: {name}.",
"status_create_done": "Skill berhasil dibuat: {name}.",
"status_create_overwrite_done": "Skill dengan nama sama berhasil diperbarui: {name}.",
"status_update_done": "Skill berhasil diperbarui: {name}.",
"status_delete_done": "Skill berhasil dihapus: {name}.",
"status_install_batch_done": "Pemasangan batch selesai: {succeeded} berhasil, {failed} gagal.",
"err_unavailable": "Model OpenWebUI Skills tidak tersedia di runtime ini.",
"err_user_required": "Konteks pengguna diperlukan.",
"err_name_required": "Nama skill wajib diisi.",
"err_not_found": "Skill tidak ditemukan.",
"err_no_update_fields": "Tidak ada field pembaruan yang diberikan.",
"err_url_required": "URL skill wajib diisi.",
"err_install_fetch": "Gagal mengambil konten skill dari URL.",
"err_install_parse": "Gagal mem-parsing paket/konten skill.",
"err_invalid_url": "URL tidak valid. Hanya URL http(s) yang didukung.",
"msg_created": "Skill berhasil dibuat.",
"msg_updated": "Skill berhasil diperbarui.",
"msg_deleted": "Skill berhasil dihapus.",
"msg_installed": "Skill berhasil dipasang.",
},
}
FALLBACK_MAP = {
"zh": "zh-CN",
"zh-TW": "zh-TW",
"zh-HK": "zh-HK",
"en": "en-US",
"ja": "ja-JP",
"ko": "ko-KR",
"fr": "fr-FR",
"de": "de-DE",
"es": "es-ES",
"it": "it-IT",
"vi": "vi-VN",
"id": "id-ID",
}
def _resolve_language(user_language: str) -> str:
"""Normalize user language code to a supported translation key."""
value = str(user_language or "").strip()
if not value:
return "en-US"
normalized = value.replace("_", "-")
if normalized in TRANSLATIONS:
return normalized
lower_to_lang = {k.lower(): k for k in TRANSLATIONS.keys()}
if normalized.lower() in lower_to_lang:
return lower_to_lang[normalized.lower()]
if normalized in FALLBACK_MAP:
return FALLBACK_MAP[normalized]
lower_fallback = {k.lower(): v for k, v in FALLBACK_MAP.items()}
if normalized.lower() in lower_fallback:
return lower_fallback[normalized.lower()]
base = normalized.split("-")[0].lower()
return lower_fallback.get(base, "en-US")
def _t(lang: str, key: str, **kwargs) -> str:
"""Return translated text for key with safe formatting."""
lang_key = _resolve_language(lang)
text = TRANSLATIONS.get(lang_key, TRANSLATIONS["en-US"]).get(
key, TRANSLATIONS["en-US"].get(key, key)
)
if kwargs:
try:
text = text.format(**kwargs)
except KeyError:
pass
return text
async def _get_user_context(
__user__: Optional[dict],
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, str]:
"""Extract robust user context with frontend language fallback."""
if isinstance(__user__, (list, tuple)):
user_data = __user__[0] if __user__ else {}
elif isinstance(__user__, dict):
user_data = __user__
else:
user_data = {}
user_language = user_data.get("language", "en-US")
if __request__ and hasattr(__request__, "headers"):
accept_lang = __request__.headers.get("accept-language", "")
if accept_lang:
user_language = accept_lang.split(",")[0].split(";")[0]
if __event_call__:
try:
js_code = """
try {
return (
document.documentElement.lang ||
localStorage.getItem('locale') ||
localStorage.getItem('language') ||
navigator.language ||
'en-US'
);
} catch (e) {
return 'en-US';
}
"""
frontend_lang = await asyncio.wait_for(
__event_call__({"type": "execute", "data": {"code": js_code}}),
timeout=2.0,
)
if frontend_lang and isinstance(frontend_lang, str):
user_language = frontend_lang
except Exception as e:
logger.warning(f"Failed to retrieve frontend language: {e}")
return {
"user_id": str(user_data.get("id", "")).strip(),
"user_name": user_data.get("name", "User"),
"user_language": user_language,
}
async def _emit_notification(
emitter: Optional[Any],
content: str,
ntype: str = "info",
):
"""Emit notification event (info, success, warning, error)."""
if emitter:
await emitter(
{"type": "notification", "data": {"type": ntype, "content": content}}
)
async def _emit_notification(
emitter: Optional[Any],
content: str,
ntype: str = "info",
):
"""Emit notification event (info, success, warning, error)."""
if emitter:
await emitter(
{"type": "notification", "data": {"type": ntype, "content": content}}
)
async def _emit_status(
valves,
emitter: Optional[Any],
description: str,
done: bool = False,
):
"""Emit status event to OpenWebUI status bar when enabled."""
if valves.SHOW_STATUS and emitter:
await emitter(
{
"type": "status",
"data": {"description": description, "done": done},
}
)
def _require_skills_model():
"""Ensure OpenWebUI Skills model APIs are available."""
if Skills is None or SkillForm is None or SkillMeta is None:
raise RuntimeError("skills_model_unavailable")
def _user_skills(user_id: str, access: str = "read") -> List[Any]:
"""Load user-scoped skills using OpenWebUI Skills model."""
return Skills.get_skills_by_user_id(user_id, access) or []
def _find_skill(
user_id: str,
skill_id: str = "",
name: str = "",
) -> Optional[Any]:
"""Find a skill by id or case-insensitive name within user scope."""
skills = _user_skills(user_id, "read")
target_id = (skill_id or "").strip()
target_name = (name or "").strip().lower()
for skill in skills:
sid = str(getattr(skill, "id", "") or "")
sname = str(getattr(skill, "name", "") or "")
if target_id and sid == target_id:
return skill
if target_name and sname.lower() == target_name:
return skill
return None
def _extract_folder_name_from_url(url: str) -> str:
"""Extract folder name from GitHub URL path.
Examples:
- https://github.com/.../tree/main/skills/xlsx -> xlsx
- https://github.com/.../blob/main/skills/SKILL.md -> skills
- https://raw.githubusercontent.com/.../main/skills/SKILL.md -> skills
"""
try:
# Remove query string and fragments
path = url.split("?")[0].split("#")[0]
# Get last path component
parts = path.rstrip("/").split("/")
if parts:
last = parts[-1]
# Skip if it's a file extension
if "." not in last or last.startswith("."):
return last
# Return parent directory if it's a filename
if len(parts) > 1:
return parts[-2]
except Exception:
pass
return ""
async def _discover_skills_from_github_directory(
valves, url: str, lang: str
) -> List[str]:
"""
Discover all skill subdirectories from a GitHub tree URL.
Uses GitHub Git Trees API to find all SKILL.md files recursively.
Example: https://github.com/anthropics/skills/tree/main/skills
Returns: List of individual skill tree URLs for each directory containing SKILL.md
"""
skill_urls = []
match = re.match(r"https://github\.com/([^/]+)/([^/]+)/tree/([^/]+)(/.*)?\Z", url)
if not match:
return skill_urls
owner = match.group(1)
repo = match.group(2)
branch = match.group(3)
target_path = (match.group(4) or "").strip("/")
try:
# Use recursive git trees API to find all SKILL.md files in the repository
api_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
response_bytes = await _fetch_bytes(valves, api_url)
data = json.loads(response_bytes.decode("utf-8"))
if "tree" in data:
for item in data["tree"]:
item_path = item.get("path", "")
# Check for SKILL.md paths (case-insensitive for convenience)
if not item_path.lower().endswith("skill.md"):
continue
# If a specific target path was provided (like /skills), we only discover skills inside it
if target_path:
# Must be exactly the target_path/SKILL.md or inside the target_path/ directory
if not (item_path.startswith(f"{target_path}/") or item_path == f"{target_path}/SKILL.md"):
continue
# Get the directory containing SKILL.md
if "/" in item_path:
skill_dir = item_path.rsplit("/", 1)[0]
skill_url = f"https://github.com/{owner}/{repo}/tree/{branch}/{skill_dir}"
else:
skill_url = f"https://github.com/{owner}/{repo}/tree/{branch}"
# De-duplicate
if skill_url not in skill_urls:
skill_urls.append(skill_url)
skill_urls.sort()
except Exception as e:
logger.warning(f"Failed to discover skills from GitHub directory {url}: {e}")
return skill_urls
def _is_github_repo_root(url: str) -> bool:
"""Check if URL is a GitHub repo root (e.g., https://github.com/owner/repo)."""
match = re.match(r"^https://github\.com/([^/]+)/([^/]+)/?$", url)
return match is not None
def _normalize_github_repo_url(url: str) -> str:
"""Convert GitHub repo root URL to tree discovery URL (assuming main/master branch)."""
match = re.match(r"^https://github\.com/([^/]+)/([^/]+)/?$", url)
if match:
owner = match.group(1)
repo = match.group(2)
# Try main branch first, API will handle if it doesn't exist
return f"https://github.com/{owner}/{repo}/tree/main"
return url
def _resolve_github_tree_urls(url: str) -> List[str]:
"""For GitHub tree URLs, resolve to direct file URL.
Example: https://github.com/anthropics/skills/tree/main/skills/xlsx
Returns: [
https://raw.githubusercontent.com/anthropics/skills/main/skills/xlsx/SKILL.md,
]
"""
urls = []
match = re.match(r"https://github\.com/([^/]+)/([^/]+)/tree/([^/]+)(/.*)?\Z", url)
if match:
owner = match.group(1)
repo = match.group(2)
branch = match.group(3)
path = match.group(4) or ""
base = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}{path}"
# Only look for SKILL.md
urls.append(f"{base}/SKILL.md")
return urls
def _normalize_url(url: str) -> str:
"""Normalize supported URLs (GitHub blob -> raw, tree -> try direct files first)."""
value = (url or "").strip()
if not value.startswith("http://") and not value.startswith("https://"):
raise ValueError("invalid_url")
# Handle GitHub blob URLs -> convert to raw
if "github.com" in value and "/blob/" in value:
value = value.replace("github.com", "raw.githubusercontent.com")
value = value.replace("/blob/", "/")
# Note: GitHub tree URLs are handled separately in install_skill
# via _resolve_github_tree_urls()
return value
def _is_safe_url(valves, url: str) -> Tuple[bool, Optional[str]]:
"""
Validate that URL is safe for downloading from trusted domains.
Checks:
1. URL must use http/https scheme
2. Hostname must be in the trusted domains whitelist
Returns: Tuple of (is_safe: bool, error_message: Optional[str])
"""
try:
parsed = urllib.parse.urlparse(url)
hostname = (parsed.hostname or "").strip()
if not hostname:
return False, "URL is malformed: missing hostname"
# Check scheme: only http/https allowed
if parsed.scheme not in ("http", "https"):
return False, f"URL scheme not allowed: {parsed.scheme}"
# Domain whitelist check (enforced)
trusted_domains = [
d.strip().lower()
for d in (valves.TRUSTED_DOMAINS or "").split(",")
if d.strip()
]
if not trusted_domains:
return False, "No trusted domains configured."
hostname_lower = hostname.lower()
# Check if hostname matches any trusted domain (exact or subdomain)
is_trusted = False
for trusted_domain in trusted_domains:
if hostname_lower == trusted_domain or hostname_lower.endswith(
"." + trusted_domain
):
is_trusted = True
break
if not is_trusted:
return (
False,
f"Domain '{hostname}' not in whitelist. Allowed: {', '.join(trusted_domains)}",
)
return True, None
except Exception as e:
return False, f"URL validation error: {e}"
async def _fetch_bytes(valves, url: str) -> bytes:
"""Fetch bytes from URL with timeout guard and SSRF protection."""
# Validate URL safety before fetching
is_safe, error_message = _is_safe_url(valves, url)
if not is_safe:
raise ValueError(error_message or "Unsafe URL")
def _sync_fetch(target: str) -> bytes:
with urllib.request.urlopen(
target, timeout=valves.INSTALL_FETCH_TIMEOUT
) as resp:
return resp.read()
return await asyncio.wait_for(
asyncio.to_thread(_sync_fetch, url),
timeout=valves.INSTALL_FETCH_TIMEOUT + 1.0,
)
def _parse_skill_md_meta(content: str, fallback_name: str) -> Tuple[str, str, str]:
"""Parse markdown skill content into (name, description, body)."""
fm_match = re.match(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
if fm_match:
fm_text = fm_match.group(1)
body = content[fm_match.end() :].strip()
name = fallback_name
description = ""
for line in fm_text.split("\n"):
m_name = re.match(r"^name:\s*(.+)$", line)
if m_name:
name = m_name.group(1).strip().strip("\"'")
m_desc = re.match(r"^description:\s*(.+)$", line)
if m_desc:
description = m_desc.group(1).strip().strip("\"'")
return name, description, body
h1_match = re.search(r"^#\s+(.+)$", content.strip(), re.MULTILINE)
name = h1_match.group(1).strip() if h1_match else fallback_name
return name, "", content.strip()
def _append_source_url_to_content(content: str, url: str, lang: str = "en-US") -> str:
"""
Append installation source URL information to skill content.
Adds a reference link at the bottom of the content.
"""
if not content or not url:
return content
# Remove any existing source references (to prevent duplication when updating)
content = re.sub(
r"\n*---\n+\*\*Installation Source.*?\*\*:.*?\n+---\n*$",
"",
content,
flags=re.DOTALL | re.IGNORECASE,
)
# Determine the appropriate language for the label
source_label = {
"en-US": "Installation Source",
"zh-CN": "安装源",
"zh-TW": "安裝來源",
"zh-HK": "安裝來源",
"ja-JP": "インストールソース",
"ko-KR": "설치 소스",
"fr-FR": "Source d'installation",
"de-DE": "Installationsquelle",
"es-ES": "Fuente de instalación",
}.get(lang, "Installation Source")
reference_text = {
"en-US": "For additional related files or documentation, you can reference the installation source below:",
"zh-CN": "如需获取相关文件或文档,可以参考下面的安装源:",
"zh-TW": "如需獲取相關檔案或文件,可以參考下面的安裝來源:",
"zh-HK": "如需獲取相關檔案或文件,可以參考下面的安裝來源:",
"ja-JP": "関連ファイルまたはドキュメントについては、以下のインストールソースを参照できます:",
"ko-KR": "관련 파일 또는 문서를 확인하려면 아래 설치 소스를 참조할 수 있습니다:",
"fr-FR": "Pour obtenir des fichiers ou des documents connexes, vous pouvez vous reporter à la source d'installation ci-dessous :",
"de-DE": "Für zusätzliche verwandte Dateien oder Dokumentation können Sie die folgende Installationsquelle referenzieren:",
"es-ES": "Para archivos o documentación relacionados, puede consultar la siguiente fuente de instalación:",
}.get(
lang,
"For additional related files or documentation, you can reference the installation source below:",
)
# Append source URL with reference
source_block = (
f"\n\n---\n**{source_label}**: [{url}]({url})\n\n*{reference_text}*\n---"
)
return content + source_block
def _safe_extract_zip(zip_path: Path, extract_dir: Path) -> None:
"""
Safely extract a ZIP file, validating member paths to prevent path traversal.
"""
with zipfile.ZipFile(zip_path, "r") as zf:
for member in zf.namelist():
# Check for path traversal attempts
member_path = Path(extract_dir) / member
try:
# Ensure the resolved path is within extract_dir
member_path.resolve().relative_to(extract_dir.resolve())
except ValueError:
# Path is outside extract_dir (traversal attempt)
logger.warning(f"Skipping unsafe ZIP member: {member}")
continue
# Extract the member
zf.extract(member, extract_dir)
def _safe_extract_tar(tar_path: Path, extract_dir: Path) -> None:
"""
Safely extract a TAR file, validating member paths to prevent path traversal.
"""
with tarfile.open(tar_path, "r:*") as tf:
for member in tf.getmembers():
# Check for path traversal attempts
member_path = Path(extract_dir) / member.name
try:
# Ensure the resolved path is within extract_dir
member_path.resolve().relative_to(extract_dir.resolve())
except ValueError:
# Path is outside extract_dir (traversal attempt)
logger.warning(f"Skipping unsafe TAR member: {member.name}")
continue
# Extract the member
tf.extract(member, extract_dir)
def _extract_skill_from_archive(payload: bytes) -> Tuple[str, str, str]:
"""Extract SKILL.md from zip/tar archives with path traversal protection."""
with tempfile.TemporaryDirectory(prefix="owui-skill-") as tmp:
root = Path(tmp)
archive_path = root / "pkg"
archive_path.write_bytes(payload)
extract_dir = root / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
extracted = False
try:
_safe_extract_zip(archive_path, extract_dir)
extracted = True
except Exception as e:
logger.debug(f"Failed to extract as ZIP: {e}")
pass
if not extracted:
try:
_safe_extract_tar(archive_path, extract_dir)
extracted = True
except Exception as e:
logger.debug(f"Failed to extract as TAR: {e}")
pass
if not extracted:
raise ValueError("install_parse")
# Only look for SKILL.md
candidates = list(extract_dir.rglob("SKILL.md"))
if not candidates:
raise ValueError("install_parse")
chosen = candidates[0]
text = chosen.read_text(encoding="utf-8", errors="ignore")
fallback_name = chosen.parent.name or "installed-skill"
return _parse_skill_md_meta(text, fallback_name)
async def _install_single_skill(
valves,
url: str,
name: str,
user_id: str,
lang: str,
overwrite: bool,
__event_emitter__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Internal method to install a single skill from URL."""
try:
if not (url or "").strip():
raise ValueError(_t(lang, "err_url_required"))
# Extract potential folder name from URL before normalization
url_folder = _extract_folder_name_from_url(url).strip()
parsed_name = ""
parsed_desc = ""
parsed_body = ""
payload = None
# Special handling for GitHub tree URLs
if "github.com" in url and "/tree/" in url:
fallback_file_urls = _resolve_github_tree_urls(url)
# Try to fetch SKILL.md directly from the tree path
for file_url in fallback_file_urls:
try:
payload = await _fetch_bytes(valves, file_url)
if payload:
break
except Exception:
continue
if payload:
# Successfully fetched direct file
text = payload.decode("utf-8", errors="ignore")
fallback = url_folder or "installed-skill"
parsed_name, parsed_desc, parsed_body = _parse_skill_md_meta(
text, fallback
)
else:
# No direct file found at this GitHub tree URL path
raise ValueError(f"Could not find SKILL.md in {url}")
else:
# Handle other URL types (blob, direct markdown, archives)
normalized = _normalize_url(url)
payload = await _fetch_bytes(valves, normalized)
if normalized.lower().endswith((".zip", ".tar", ".tar.gz", ".tgz")):
parsed_name, parsed_desc, parsed_body = _extract_skill_from_archive(
payload
)
else:
text = payload.decode("utf-8", errors="ignore")
# Use extracted folder name as fallback
fallback = url_folder or "installed-skill"
parsed_name, parsed_desc, parsed_body = _parse_skill_md_meta(
text, fallback
)
final_name = (name or parsed_name or url_folder or "installed-skill").strip()
final_desc = (parsed_desc or final_name).strip()
final_content = (parsed_body or final_desc).strip()
# Append installation source URL to the skill content
final_content = _append_source_url_to_content(final_content, url, lang)
if not final_name:
raise ValueError(_t(lang, "err_name_required"))
existing = _find_skill(user_id=user_id, name=final_name)
# install_skill always overwrites by default (overwrite=True);
# ALLOW_OVERWRITE_ON_CREATE valve also controls this.
allow_overwrite = overwrite or valves.ALLOW_OVERWRITE_ON_CREATE
if existing:
sid = str(getattr(existing, "id", "") or "")
if not allow_overwrite:
# Should not normally reach here since install defaults overwrite=True
return {
"error": f"Skill already exists: {final_name}",
"hint": "Pass overwrite=true to replace the existing skill.",
}
updated = Skills.update_skill_by_id(
sid,
{
"name": final_name,
"description": final_desc,
"content": final_content,
"is_active": True,
},
)
await _emit_status(valves, __event_emitter__, _t(lang, "status_install_overwrite_done", name=final_name),
done=True,
)
return {
"success": True,
"action": "updated",
"id": str(getattr(updated, "id", "") or sid),
"name": final_name,
"source_url": url,
}
new_skill = Skills.insert_new_skill(
user_id=user_id,
form_data=SkillForm(
id=str(uuid.uuid4()),
name=final_name,
description=final_desc,
content=final_content,
meta=SkillMeta(),
is_active=True,
),
)
await _emit_status(valves, __event_emitter__, _t(lang, "status_install_done", name=final_name),
done=True,
)
return {
"success": True,
"action": "installed",
"id": str(getattr(new_skill, "id", "") or ""),
"name": final_name,
"source_url": url,
}
except Exception as e:
key = None
if str(e) in {"invalid_url", "install_parse"}:
key = "err_invalid_url" if str(e) == "invalid_url" else "err_install_parse"
msg = (
_t(lang, key)
if key
else (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
)
logger.error(f"_install_single_skill failed for {url}: {msg}", exc_info=True)
return {"error": msg, "url": url}
class Tools:
"""OpenWebUI native tools for simple skill lifecycle management."""
class Valves(BaseModel):
"""Configurable plugin valves."""
SHOW_STATUS: bool = Field(
default=True,
description="Whether to show operation status updates.",
)
ALLOW_OVERWRITE_ON_CREATE: bool = Field(
default=True,
description="Allow create_skill/install_skill to overwrite same-name skill by default.",
)
INSTALL_FETCH_TIMEOUT: float = Field(
default=12.0,
description="Timeout in seconds for URL fetch when installing a skill.",
)
TRUSTED_DOMAINS: str = Field(
default="github.com,huggingface.co,githubusercontent.com",
description="Comma-separated list of primary trusted domains for skill downloads (always enforced). URLs with domains matching or containing these primary domains (including subdomains) are allowed. E.g., 'github.com' allows github.com and *.github.com.",
)
def __init__(self):
"""Initialize plugin valves."""
self.valves = self.Valves()
async def list_skills(
self,
include_content: bool = False,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""List current user's OpenWebUI skills."""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_listing"))
skills = _user_skills(user_id, "read")
rows = []
for skill in skills:
row = {
"id": str(getattr(skill, "id", "") or ""),
"name": getattr(skill, "name", ""),
"description": getattr(skill, "description", ""),
"is_active": bool(getattr(skill, "is_active", True)),
"updated_at": str(getattr(skill, "updated_at", "") or ""),
}
if include_content:
row["content"] = getattr(skill, "content", "")
rows.append(row)
rows.sort(key=lambda x: (x.get("name") or "").lower())
active_count = sum(1 for row in rows if row.get("is_active"))
await _emit_status(self.valves, __event_emitter__, _t(
lang,
"status_list_done",
count=len(rows),
active_count=active_count,
),
done=True,
)
return {"count": len(rows), "skills": rows}
except Exception as e:
msg = (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
return {"error": msg}
async def show_skill(
self,
skill_id: str = "",
name: str = "",
include_content: bool = True,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Show one skill by id or name."""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_showing"))
skill = _find_skill(user_id=user_id, skill_id=skill_id, name=name)
if not skill:
raise ValueError(_t(lang, "err_not_found"))
result = {
"id": str(getattr(skill, "id", "") or ""),
"name": getattr(skill, "name", ""),
"description": getattr(skill, "description", ""),
"is_active": bool(getattr(skill, "is_active", True)),
"updated_at": str(getattr(skill, "updated_at", "") or ""),
}
if include_content:
result["content"] = getattr(skill, "content", "")
skill_name = result.get("name") or result.get("id") or "unknown"
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_show_done", name=skill_name),
done=True,
)
return result
except Exception as e:
msg = (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
return {"error": msg}
async def install_skill(
self,
url: str,
name: str = "",
overwrite: bool = True,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Install one or more skills from URL(s), with support for GitHub directory auto-discovery.
Args:
url: A single URL string OR a JSON array of URL strings for batch install.
Examples:
Single: "https://github.com/owner/repo/tree/main/skills/xlsx"
Directory: "https://github.com/owner/repo/tree/main/skills"
Batch: ["https://github.com/owner/repo/tree/main/skills/xlsx",
"https://github.com/owner/repo/tree/main/skills/csv"]
name: Optional custom name for the skill (single install only).
overwrite: If True (default), overwrites any existing skill with the same name.
Auto-Discovery Feature:
If a GitHub tree URL points to a directory that contains multiple skill subdirectories,
this tool will automatically discover all subdirectories and batch install them.
Example: "https://github.com/anthropics/skills/tree/main/skills" will auto-discover
all skill folders under /skills and install them all at once.
Supported URL formats:
- GitHub tree URL: https://github.com/owner/repo/tree/branch/path/to/skill
- GitHub skill directory (auto-discovery): https://github.com/owner/repo/tree/branch/path
- GitHub blob URL: https://github.com/owner/repo/blob/branch/path/SKILL.md
- Raw markdown URL: https://raw.githubusercontent.com/.../SKILL.md
- Archive URL: https://example.com/skill.zip (must contain SKILL.md)
"""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
# Stage 1: Check for directory auto-discovery (GitHub URLs)
if isinstance(url, str) and "github.com" in url:
# Auto-convert repo root URL to tree discovery URL
if _is_github_repo_root(url):
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_detecting_repo_root", url=url[-50:]),
)
url = _normalize_github_repo_url(url)
# If URL contains /tree/, auto-discover all skill subdirectories
if "/tree/" in url:
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_discovering_skills", url=(url or "")[-50:]),
)
discover_fn = _discover_skills_from_github_directory
discovered = []
if callable(discover_fn):
discovered = await discover_fn(self.valves, url, lang)
else:
logger.warning(
"_discover_skills_from_github_directory is unavailable on current Tools instance."
)
if discovered:
# Auto-discovered subdirectories, treat as batch
url = discovered
# Stage 2: Check if url is a list/tuple (batch mode)
if isinstance(url, (list, tuple)):
urls = list(url)
if not urls:
raise ValueError(_t(lang, "err_url_required"))
# Deduplicate URLs while preserving order
seen_urls = set()
unique_urls = []
duplicates_removed = 0
for url_item in urls:
url_str = str(url_item).strip()
if url_str not in seen_urls:
unique_urls.append(url_str)
seen_urls.add(url_str)
else:
duplicates_removed += 1
# Notify if duplicates were found
if duplicates_removed > 0:
await _emit_notification(
__event_emitter__,
_t(
lang,
"status_batch_duplicates_removed",
count=duplicates_removed,
),
ntype="info",
)
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_installing_batch", total=len(unique_urls)),
)
results = []
installed_names = {} # Track installed skill names to detect duplicates
for idx, single_url in enumerate(unique_urls, 1):
result = await _install_single_skill(
self.valves,
url=single_url,
name="", # Batch mode doesn't support per-item names
user_id=user_id,
lang=lang,
overwrite=overwrite,
__event_emitter__=__event_emitter__,
)
# Track installed name to detect duplicates
if result.get("success"):
installed_name = result.get("name", "").lower()
if installed_name in installed_names:
# Duplicate skill name detected
prev_url = installed_names[installed_name]
logger.warning(
f"Duplicate skill name detected: '{result.get('name')}' "
f"from {single_url} (previously from {prev_url})"
)
await _emit_notification(
__event_emitter__,
_t(
lang,
"status_duplicate_skill_name",
name=result.get("name"),
action=result.get("action", "installed"),
),
ntype="warning",
)
else:
installed_names[installed_name] = single_url
results.append(result)
# Summary
success_count = sum(1 for r in results if r.get("success"))
error_count = len(results) - success_count
await _emit_status(self.valves, __event_emitter__, _t(
lang,
"status_install_batch_done",
succeeded=success_count,
failed=error_count,
),
done=True,
)
return {
"batch": True,
"total": len(results),
"succeeded": success_count,
"failed": error_count,
"results": results,
}
else:
# Single mode
if not (url or "").strip():
raise ValueError(_t(lang, "err_url_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_installing"))
result = await _install_single_skill(
self.valves,
url=str(url).strip(),
name=name,
user_id=user_id,
lang=lang,
overwrite=overwrite,
__event_emitter__=__event_emitter__,
)
return result
except Exception as e:
key = None
if str(e) in {"invalid_url", "install_parse"}:
key = (
"err_invalid_url"
if str(e) == "invalid_url"
else "err_install_parse"
)
msg = (
_t(lang, key)
if key
else (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
logger.error(f"install_skill failed: {msg}", exc_info=True)
return {"error": msg}
async def create_skill(
self,
name: str,
description: str = "",
content: str = "",
overwrite: bool = True,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Create a new skill, or update same-name skill when overwrite is enabled."""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
skill_name = (name or "").strip()
if not skill_name:
raise ValueError(_t(lang, "err_name_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_creating"))
existing = _find_skill(user_id=user_id, name=skill_name)
allow_overwrite = overwrite or self.valves.ALLOW_OVERWRITE_ON_CREATE
final_description = (description or skill_name).strip()
final_content = (content or final_description).strip()
if existing:
if not allow_overwrite:
return {
"error": f"Skill already exists: {skill_name}",
"hint": "Use overwrite=true to update existing skill.",
}
sid = str(getattr(existing, "id", "") or "")
updated = Skills.update_skill_by_id(
sid,
{
"name": skill_name,
"description": final_description,
"content": final_content,
"is_active": True,
},
)
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_create_overwrite_done", name=skill_name),
done=True,
)
return {
"success": True,
"action": "updated",
"id": str(getattr(updated, "id", "") or sid),
"name": skill_name,
}
new_skill = Skills.insert_new_skill(
user_id=user_id,
form_data=SkillForm(
id=str(uuid.uuid4()),
name=skill_name,
description=final_description,
content=final_content,
meta=SkillMeta(),
is_active=True,
),
)
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_create_done", name=skill_name),
done=True,
)
return {
"success": True,
"action": "created",
"id": str(getattr(new_skill, "id", "") or ""),
"name": skill_name,
}
except Exception as e:
msg = (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
logger.error(f"create_skill failed: {msg}", exc_info=True)
return {"error": msg}
async def update_skill(
self,
skill_id: str = "",
name: str = "",
new_name: str = "",
description: str = "",
content: str = "",
is_active: Optional[bool] = None,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Modify an existing skill by updating one or more fields.
Locate skill by `skill_id` or `name` (case-insensitive). Update any of:
- `new_name`: Rename the skill (checked for name uniqueness)
- `description`: Update skill description
- `content`: Modify skill code/content
- `is_active`: Enable or disable the skill
Returns updated skill info and list of modified fields.
"""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_updating"))
skill = _find_skill(user_id=user_id, skill_id=skill_id, name=name)
if not skill:
raise ValueError(_t(lang, "err_not_found"))
# Get skill ID early for collision detection
sid = str(getattr(skill, "id", "") or "")
updates: Dict[str, Any] = {}
if new_name.strip():
# Check for name collision with other skills
new_name_clean = new_name.strip()
# Check if another skill already has this name (case-insensitive)
for other_skill in _user_skills(user_id, "read"):
other_id = str(getattr(other_skill, "id", "") or "")
other_name = str(getattr(other_skill, "name", "") or "")
# Skip the current skill being updated
if other_id == sid:
continue
if other_name.lower() == new_name_clean.lower():
return {
"error": f'Another skill already has the name "{new_name_clean}".',
"hint": "Choose a different name or delete the conflicting skill first.",
}
updates["name"] = new_name_clean
if description.strip():
updates["description"] = description.strip()
if content.strip():
updates["content"] = content.strip()
if is_active is not None:
updates["is_active"] = bool(is_active)
if not updates:
raise ValueError(_t(lang, "err_no_update_fields"))
updated = Skills.update_skill_by_id(sid, updates)
updated_name = str(
getattr(updated, "name", "")
or updates.get("name")
or getattr(skill, "name", "")
or sid
)
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_update_done", name=updated_name),
done=True,
)
return {
"success": True,
"id": str(getattr(updated, "id", "") or sid),
"name": str(
getattr(updated, "name", "")
or updates.get("name")
or getattr(skill, "name", "")
),
"updated_fields": list(updates.keys()),
}
except Exception as e:
msg = (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
return {"error": msg}
async def delete_skill(
self,
skill_id: str = "",
name: str = "",
__user__: Optional[dict] = None,
__event_emitter__: Optional[Any] = None,
__event_call__: Optional[Any] = None,
__request__: Optional[Any] = None,
) -> Dict[str, Any]:
"""Delete one skill by id or name."""
user_ctx = await _get_user_context(__user__, __event_call__, __request__)
lang = user_ctx["user_language"]
user_id = user_ctx["user_id"]
try:
_require_skills_model()
if not user_id:
raise ValueError(_t(lang, "err_user_required"))
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_deleting"))
skill = _find_skill(user_id=user_id, skill_id=skill_id, name=name)
if not skill:
raise ValueError(_t(lang, "err_not_found"))
sid = str(getattr(skill, "id", "") or "")
sname = str(getattr(skill, "name", "") or "")
Skills.delete_skill_by_id(sid)
deleted_name = sname or sid or "unknown"
await _emit_status(self.valves, __event_emitter__, _t(lang, "status_delete_done", name=deleted_name),
done=True,
)
return {
"success": True,
"id": sid,
"name": sname,
}
except Exception as e:
msg = (
_t(lang, "err_unavailable")
if str(e) == "skills_model_unavailable"
else str(e)
)
await _emit_status(self.valves, __event_emitter__, msg, done=True)
return {"error": msg}