User: {user_name}
Time: {current_date_time_str}
"""
# =================================================================
# JavaScript Rendering Script
# =================================================================
SCRIPT_TEMPLATE_INFOGRAPHIC = """
"""
class Action:
class Valves(BaseModel):
SHOW_STATUS: bool = Field(
default=True, description="Show operation status updates in chat interface."
)
MODEL_ID: str = Field(
default="",
description="Built-in LLM model ID for text analysis. If empty, uses current conversation model.",
)
MIN_TEXT_LENGTH: int = Field(
default=100,
description="Minimum text length (characters) required for infographic analysis.",
)
CLEAR_PREVIOUS_HTML: bool = Field(
default=False,
description="Force clear old plugin results (if True, overwrite instead of merge).",
)
MESSAGE_COUNT: int = Field(
default=1,
description="Number of recent messages to use for generation. Set to 1 for just the last message, or higher for more context.",
)
OUTPUT_MODE: str = Field(
default="image",
description="Output mode: 'html' for interactive HTML, or 'image' to embed as Markdown image (default).",
)
SHOW_DEBUG_LOG: bool = Field(
default=False,
description="Whether to print debug logs in the browser console.",
)
def __init__(self):
self.valves = self.Valves()
# Fallback mapping for variants not in TRANSLATIONS keys
self.fallback_map = {
"es-AR": "es-ES",
"es-MX": "es-ES",
"fr-CA": "fr-FR",
"en-CA": "en-US",
"en-GB": "en-US",
"en-AU": "en-US",
"de-AT": "de-DE",
}
async def _get_user_context(
self,
__user__: Optional[Dict[str, Any]],
__event_call__: Optional[Callable[[Any], Awaitable[None]]] = None,
__request__: Optional[Request] = None,
) -> Dict[str, str]:
"""Extract basic user context with safe fallbacks."""
if isinstance(__user__, (list, tuple)):
user_data = __user__[0] if __user__ else {}
elif isinstance(__user__, dict):
user_data = __user__
else:
user_data = {}
user_id = user_data.get("id", "unknown_user")
user_name = user_data.get("name", "User")
# Default from profile
user_language = user_data.get("language", "en-US")
user_theme = "light"
# Level 1 Fallback: Accept-Language from __request__ headers
if (
__request__
and hasattr(__request__, "headers")
and "accept-language" in __request__.headers
):
raw_lang = __request__.headers.get("accept-language", "")
if raw_lang:
user_language = raw_lang.split(",")[0].split(";")[0]
# Priority: Document Lang > LocalStorage (Frontend) > Browser > Request Header > Profile
if __event_call__:
try:
js_code = """
try {
const html = document.documentElement;
const body = document.body;
const htmlClass = html ? html.className : '';
const bodyClass = body ? body.className : '';
const htmlDataTheme = html ? html.getAttribute('data-theme') : '';
let theme = 'light';
// 1. Check parent document's html/body class or data-theme
if (htmlDataTheme === 'dark' || bodyClass.includes('dark') || htmlClass.includes('dark')) {
theme = 'dark';
} else if (htmlDataTheme === 'light' || bodyClass.includes('light') || htmlClass.includes('light')) {
theme = 'light';
} else {
// 2. Check meta theme-color luma
const metas = document.querySelectorAll('meta[name="theme-color"]');
let foundMeta = false;
if (metas.length > 0) {
const color = metas[metas.length - 1].content.trim();
const m = color.match(/^#?([0-9a-f]{6})$/i);
if (m) {
const hex = m[1];
const r = parseInt(hex.slice(0, 2), 16);
const g = parseInt(hex.slice(2, 4), 16);
const b = parseInt(hex.slice(4, 6), 16);
const luma = (0.2126 * r + 0.7152 * g + 0.0722 * b) / 255;
theme = luma < 0.5 ? 'dark' : 'light';
foundMeta = true;
}
}
// 3. Check system preference
if (!foundMeta && window.matchMedia && window.matchMedia('(prefers-color-scheme: dark)').matches) {
theme = 'dark';
}
}
const lang = document.documentElement.lang ||
localStorage.getItem('locale') ||
localStorage.getItem('language') ||
navigator.language ||
'en-US';
return JSON.stringify({ lang, theme });
} catch (e) {
return JSON.stringify({ lang: 'en-US', theme: 'light' });
}
"""
# Use asyncio.wait_for to prevent hanging if frontend fails to callback
frontend_res_str = await asyncio.wait_for(
__event_call__({"type": "execute", "data": {"code": js_code}}),
timeout=2.0,
)
if frontend_res_str and isinstance(frontend_res_str, str):
try:
import json
frontend_res = json.loads(frontend_res_str)
user_language = frontend_res.get("lang", user_language)
user_theme = frontend_res.get("theme", user_theme)
except Exception:
user_language = frontend_res_str
except Exception as e:
logger.warning(f"Failed to retrieve frontend language/theme: {e}")
return {
"user_id": user_id,
"user_name": user_name,
"user_language": user_language,
"user_theme": user_theme,
}
def _resolve_language(self, lang: str) -> str:
"""Resolve the best matching language code from the TRANSLATIONS dict."""
target_lang = lang
if target_lang in TRANSLATIONS:
return target_lang
if hasattr(self, 'fallback_map') and target_lang in self.fallback_map:
target_lang = self.fallback_map[target_lang]
if target_lang in TRANSLATIONS:
return target_lang
if "-" in lang:
base_lang = lang.split("-")[0]
for supported_lang in TRANSLATIONS:
if supported_lang.startswith(base_lang + "-"):
return supported_lang
return "en-US"
def _get_translation(self, lang: str, key: str, **kwargs) -> str:
"""Get translated string for the given language and key."""
target_lang = self._resolve_language(lang)
lang_dict = TRANSLATIONS.get(target_lang, TRANSLATIONS["en-US"])
text = lang_dict.get(key, TRANSLATIONS["en-US"].get(key, key))
if kwargs:
try:
text = text.format(**kwargs)
except Exception as e:
logger.warning(f"Translation formatting failed for {key}: {e}")
return text
def _get_chat_context(
self, body: dict, __metadata__: Optional[dict] = None
) -> Dict[str, str]:
"""
Unified extraction of chat context information (chat_id, message_id).
Prioritizes extraction from body, then metadata.
"""
chat_id = ""
message_id = ""
# 1. Try to get from body
if isinstance(body, dict):
chat_id = body.get("chat_id", "")
message_id = body.get("id", "") # message_id is usually 'id' in body
# Check body.metadata as fallback
if not chat_id or not message_id:
body_metadata = body.get("metadata", {})
if isinstance(body_metadata, dict):
if not chat_id:
chat_id = body_metadata.get("chat_id", "")
if not message_id:
message_id = body_metadata.get("message_id", "")
# 2. Try to get from __metadata__ (as supplement)
if __metadata__ and isinstance(__metadata__, dict):
if not chat_id:
chat_id = __metadata__.get("chat_id", "")
if not message_id:
message_id = __metadata__.get("message_id", "")
return {
"chat_id": str(chat_id).strip(),
"message_id": str(message_id).strip(),
}
def _extract_infographic_syntax(self, llm_output: str) -> str:
"""Extract infographic syntax from LLM output"""
match = re.search(r"```infographic\s*(.*?)\s*```", llm_output, re.DOTALL)
if match:
extracted_content = match.group(1).strip()
else:
logger.warning(
"LLM output did not follow expected format, treating entire output as syntax."
)
extracted_content = llm_output.strip()
return extracted_content.replace("", "<\\/script>")
async def _emit_status(self, emitter, description: str, done: bool = False):
"""Send status update event"""
if self.valves.SHOW_STATUS and emitter:
await emitter(
{"type": "status", "data": {"description": description, "done": done}}
)
async def _emit_notification(self, emitter, content: str, ntype: str = "info"):
"""Send notification event (info/success/warning/error)"""
if emitter:
await emitter(
{"type": "notification", "data": {"type": ntype, "content": content}}
)
async def _emit_debug_log(self, emitter, title: str, data: dict):
"""Print structured debug logs in the browser console"""
if not self.valves.SHOW_DEBUG_LOG or not emitter:
return
try:
js_code = f"""
(async function() {{
console.group("🛠️ {title}");
console.log({json.dumps(data, ensure_ascii=False)});
console.groupEnd();
}})();
"""
await emitter({"type": "execute", "data": {"code": js_code}})
except Exception as e:
print(f"Error emitting debug log: {e}")
def _remove_existing_html(self, content: str) -> str:
"""Remove existing plugin-generated HTML code blocks from content"""
pattern = r"```html\s*[\s\S]*?```"
return re.sub(pattern, "", content).strip()
def _extract_text_content(self, content) -> str:
"""Extract text from message content, supporting multimodal message formats"""
if isinstance(content, str):
return content
elif isinstance(content, list):
# Multimodal message: [{"type": "text", "text": "..."}, {"type": "image_url", ...}]
text_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif isinstance(item, str):
text_parts.append(item)
return "\n".join(text_parts)
return str(content) if content else ""
def _merge_html(
self,
existing_html_code: str,
new_content: str,
new_styles: str = "",
new_scripts: str = "",
user_language: str = "en",
) -> str:
"""Merge new content into existing HTML container or create a new one"""
if (
"" in existing_html_code
and "" in existing_html_code
):
base_html = existing_html_code
base_html = re.sub(r"^```html\s*", "", base_html)
base_html = re.sub(r"\s*```$", "", base_html)
else:
base_html = HTML_WRAPPER_TEMPLATE.replace("{user_language}", user_language)
wrapped_content = f'