Files
AgentCoord/backend/server.py

1773 lines
59 KiB
Python
Raw Normal View History

2026-01-21 15:18:15 +08:00
from flask import Flask, request, jsonify, Response, stream_with_context
from flask_socketio import SocketIO, emit, join_room, leave_room
2024-04-07 15:04:00 +08:00
import json
from DataProcess import Add_Collaboration_Brief_FrontEnd
from AgentCoord.RehearsalEngine_V2.ExecutePlan import executePlan
2026-01-21 15:18:15 +08:00
from AgentCoord.RehearsalEngine_V2.ExecutePlan_Optimized import executePlan_streaming
2024-04-07 15:04:00 +08:00
from AgentCoord.PlanEngine.basePlan_Generator import generate_basePlan
from AgentCoord.PlanEngine.fill_stepTask import fill_stepTask
from AgentCoord.PlanEngine.fill_stepTask_TaskProcess import (
fill_stepTask_TaskProcess,
)
from AgentCoord.PlanEngine.branch_PlanOutline import branch_PlanOutline
from AgentCoord.PlanEngine.branch_TaskProcess import branch_TaskProcess
from AgentCoord.PlanEngine.AgentSelectModify import (
AgentSelectModify_init,
AgentSelectModify_addAspect,
)
import os
import yaml
import argparse
from typing import List, Dict
2024-04-07 15:04:00 +08:00
# initialize global variables
yaml_file = os.path.join(os.getcwd(), "config", "config.yaml")
try:
with open(yaml_file, "r", encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
except Exception:
2026-01-29 16:40:10 +08:00
yaml_data = {}
2024-04-07 15:04:00 +08:00
USE_CACHE: bool = os.getenv("USE_CACHE")
if USE_CACHE is None:
USE_CACHE = yaml_data.get("USE_CACHE", False)
else:
USE_CACHE = USE_CACHE.lower() in ["true", "1", "yes"]
AgentBoard = None
AgentProfile_Dict = {}
Request_Cache: dict[str, str] = {}
app = Flask(__name__)
app.config['SECRET_KEY'] = 'agentcoord-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
#socketio = SocketIO(app, cors_allowed_origins="*", async_mode='eventlet')
2024-04-07 15:04:00 +08:00
def truncate_rehearsal_log(RehearsalLog: List, restart_from_step_index: int) -> List:
"""
截断 RehearsalLog只保留指定索引之前的步骤结果
Args:
RehearsalLog: 原始日志列表
restart_from_step_index: 重新执行的起始步骤索引例如1 表示保留步骤0从步骤1重新执行
Returns:
截断后的 RehearsalLog
示例:
restart_from_step_index = 1
RehearsalLog = [step0, object0, step1, object1, step2, object2]
返回 = [step0, object0] # 只保留步骤0的结果
"""
truncated_log = []
step_count = 0
for logNode in RehearsalLog:
if logNode.get("LogNodeType") == "step":
# 只保留 restart_from_step_index 之前的步骤
if step_count < restart_from_step_index:
truncated_log.append(logNode)
step_count += 1
elif logNode.get("LogNodeType") == "object":
# object 节点:如果对应的 step 在保留范围内,保留它
# 策略:保留所有在截断点之前的 object
if step_count <= restart_from_step_index:
truncated_log.append(logNode)
return truncated_log
2024-04-07 15:04:00 +08:00
@app.route("/fill_stepTask_TaskProcess", methods=["post"])
def Handle_fill_stepTask_TaskProcess():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/fill_stepTask_TaskProcess",
incoming_data["General Goal"],
incoming_data["stepTask_lackTaskProcess"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
filled_stepTask = fill_stepTask_TaskProcess(
General_Goal=incoming_data["General Goal"],
stepTask=incoming_data["stepTask_lackTaskProcess"],
AgentProfile_Dict=AgentProfile_Dict,
)
filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask)
Request_Cache[requestIdentifier] = filled_stepTask
response = jsonify(filled_stepTask)
return response
@app.route("/agentSelectModify_init", methods=["post"])
def Handle_agentSelectModify_init():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/agentSelectModify_init",
incoming_data["General Goal"],
incoming_data["stepTask"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
scoreTable = AgentSelectModify_init(
stepTask=incoming_data["stepTask"],
General_Goal=incoming_data["General Goal"],
Agent_Board=AgentBoard,
)
Request_Cache[requestIdentifier] = scoreTable
response = jsonify(scoreTable)
return response
@app.route("/agentSelectModify_addAspect", methods=["post"])
def Handle_agentSelectModify_addAspect():
incoming_data = request.get_json()
requestIdentifier = str(
("/agentSelectModify_addAspect", incoming_data["aspectList"])
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
scoreTable = AgentSelectModify_addAspect(
aspectList=incoming_data["aspectList"], Agent_Board=AgentBoard
)
Request_Cache[requestIdentifier] = scoreTable
response = jsonify(scoreTable)
return response
@app.route("/fill_stepTask", methods=["post"])
def Handle_fill_stepTask():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/fill_stepTask",
incoming_data["General Goal"],
incoming_data["stepTask"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
filled_stepTask = fill_stepTask(
General_Goal=incoming_data["General Goal"],
stepTask=incoming_data["stepTask"],
Agent_Board=AgentBoard,
AgentProfile_Dict=AgentProfile_Dict,
)
filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask)
Request_Cache[requestIdentifier] = filled_stepTask
response = jsonify(filled_stepTask)
return response
@app.route("/branch_PlanOutline", methods=["post"])
def Handle_branch_PlanOutline():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/branch_PlanOutline",
incoming_data["branch_Number"],
incoming_data["Modification_Requirement"],
incoming_data["Existing_Steps"],
incoming_data["Baseline_Completion"],
incoming_data["Initial Input Object"],
incoming_data["General Goal"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
branchList = branch_PlanOutline(
branch_Number=incoming_data["branch_Number"],
Modification_Requirement=incoming_data["Modification_Requirement"],
Existing_Steps=incoming_data["Existing_Steps"],
Baseline_Completion=incoming_data["Baseline_Completion"],
InitialObject_List=incoming_data["Initial Input Object"],
General_Goal=incoming_data["General Goal"],
)
branchList = Add_Collaboration_Brief_FrontEnd(branchList)
Request_Cache[requestIdentifier] = branchList
response = jsonify(branchList)
return response
@app.route("/branch_TaskProcess", methods=["post"])
def Handle_branch_TaskProcess():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/branch_TaskProcess",
incoming_data["branch_Number"],
incoming_data["Modification_Requirement"],
incoming_data["Existing_Steps"],
incoming_data["Baseline_Completion"],
incoming_data["stepTaskExisting"],
incoming_data["General Goal"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
branchList = branch_TaskProcess(
branch_Number=incoming_data["branch_Number"],
Modification_Requirement=incoming_data["Modification_Requirement"],
Existing_Steps=incoming_data["Existing_Steps"],
Baseline_Completion=incoming_data["Baseline_Completion"],
stepTaskExisting=incoming_data["stepTaskExisting"],
General_Goal=incoming_data["General Goal"],
AgentProfile_Dict=AgentProfile_Dict,
)
Request_Cache[requestIdentifier] = branchList
response = jsonify(branchList)
return response
@app.route("/generate_basePlan", methods=["post"])
def Handle_generate_basePlan():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/generate_basePlan",
incoming_data["General Goal"],
incoming_data["Initial Input Object"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
try:
basePlan = generate_basePlan(
General_Goal=incoming_data["General Goal"],
Agent_Board=AgentBoard,
AgentProfile_Dict=AgentProfile_Dict,
InitialObject_List=incoming_data["Initial Input Object"],
)
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
2024-04-07 15:04:00 +08:00
basePlan_withRenderSpec = Add_Collaboration_Brief_FrontEnd(basePlan)
Request_Cache[requestIdentifier] = basePlan_withRenderSpec
response = jsonify(basePlan_withRenderSpec)
return response
@app.route("/executePlan", methods=["post"])
def Handle_executePlan():
incoming_data = request.get_json()
requestIdentifier = str(
(
"/executePlan",
incoming_data["num_StepToRun"],
incoming_data["RehearsalLog"],
incoming_data["plan"],
)
)
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
RehearsalLog = executePlan(
incoming_data["plan"],
incoming_data["num_StepToRun"],
incoming_data["RehearsalLog"],
AgentProfile_Dict,
)
Request_Cache[requestIdentifier] = RehearsalLog
response = jsonify(RehearsalLog)
return response
2026-01-21 15:18:15 +08:00
@app.route("/executePlanOptimized", methods=["post"])
def Handle_executePlanOptimized():
"""
优化版流式执行计划阶段1+2步骤级流式 + 动作级智能并行
返回 SSE 每完成一个动作就返回结果
- 无依赖关系的动作并行执行
- 有依赖关系的动作串行执行
支持参数:
plan: 执行计划
num_StepToRun: 要运行的步骤数
RehearsalLog: 已执行的历史记录
existingKeyObjects: 已存在的KeyObjects用于重新执行时传递中间结果
2026-01-21 15:18:15 +08:00
前端使用 EventSource 接收
"""
incoming_data = request.get_json()
def generate():
try:
for chunk in executePlan_streaming(
plan=incoming_data["plan"],
num_StepToRun=incoming_data.get("num_StepToRun"),
RehearsalLog=incoming_data.get("RehearsalLog", []),
AgentProfile_Dict=AgentProfile_Dict,
existingKeyObjects=incoming_data.get("existingKeyObjects"),
2026-01-21 15:18:15 +08:00
):
yield chunk
except Exception as e:
error_event = json.dumps({
"type": "error",
"message": str(e)
}, ensure_ascii=False)
yield f"data: {error_event}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
2024-04-07 15:04:00 +08:00
@app.route("/_saveRequestCashe", methods=["post"])
def Handle_saveRequestCashe():
with open(
os.path.join(os.getcwd(), "RequestCache", "Request_Cache.json"), "w"
) as json_file:
json.dump(Request_Cache, json_file, indent=4)
response = jsonify(
{"code": 200, "content": "request cashe sucessfully saved"}
)
return response
@app.route("/setAgents", methods=["POST"])
def set_agents():
global AgentBoard, AgentProfile_Dict,yaml_data
2024-04-07 15:04:00 +08:00
AgentBoard = request.json
AgentProfile_Dict = {}
for item in AgentBoard:
name = item["Name"]
if all(item.get(field) for field in ["apiUrl","apiKey","apiModel"]):
agent_config = {
"profile": item["Profile"],
"apiUrl": item["apiUrl"],
"apiKey": item["apiKey"],
"apiModel": item["apiModel"],
"useCustomAPI":True
}
else:
agent_config = {
"profile": item["Profile"],
"apiUrl": yaml_data.get("OPENAI_API_BASE"),
"apiKey": yaml_data.get("OPENAI_API_KEY"),
"apiModel": yaml_data.get("OPENAI_API_MODEL"),
"useCustomAPI":False
}
AgentProfile_Dict[name] = agent_config
2024-04-07 15:04:00 +08:00
return jsonify({"code": 200, "content": "set agentboard successfully"})
def init():
global AgentBoard, AgentProfile_Dict, Request_Cache
# Load Request Cache
try:
with open(
os.path.join(os.getcwd(), "RequestCache", "Request_Cache.json"), "r"
) as json_file:
Request_Cache = json.load(json_file)
print(f"✅ Loaded Request_Cache with {len(Request_Cache)} entries")
except Exception as e:
print(f"⚠️ Failed to load Request_Cache: {e}")
Request_Cache = {}
# Load Agent Board
try:
with open(
os.path.join(os.getcwd(), "AgentRepo", "agentBoard_v1.json"), "r", encoding="utf-8"
) as json_file:
AgentBoard = json.load(json_file)
print(f"✅ Loaded AgentBoard with {len(AgentBoard)} agents")
# Build AgentProfile_Dict
AgentProfile_Dict = {}
for item in AgentBoard:
name = item["Name"]
profile = item["Profile"]
AgentProfile_Dict[name] = profile
print(f"✅ Built AgentProfile_Dict with {len(AgentProfile_Dict)} profiles")
except Exception as e:
print(f"⚠️ Failed to load AgentBoard: {e}")
AgentBoard = []
AgentProfile_Dict = {}
2024-04-07 15:04:00 +08:00
# ==================== WebSocket 连接管理 ====================
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
print(f"✅ WebSocket client connected: {request.sid}")
emit('connected', {'sid': request.sid, 'message': 'WebSocket连接成功'})
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接"""
print(f"❌ WebSocket client disconnected: {request.sid}")
@socketio.on('ping')
def handle_ping():
"""心跳检测"""
emit('pong')
# ==================== WebSocket 事件处理 ====================
# 注以下为WebSocket版本的接口与REST API并存
# 逐步迁移核心接口到WebSocket
@socketio.on('execute_plan_optimized')
def handle_execute_plan_optimized_ws(data):
"""
WebSocket版本优化版流式执行计划
支持步骤级流式 + 动作级智能并行 + 动态追加步骤 + 从指定步骤重新执行
请求格式
{
"id": "request-id",
"action": "execute_plan_optimized",
"data": {
"plan": {...},
"num_StepToRun": null,
"RehearsalLog": [],
"enable_dynamic": true, # 是否启用动态追加步骤
"restart_from_step_index": 1 # 可选从指定步骤重新执行例如1表示从步骤2重新执行
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
plan = incoming_data.get("plan")
num_StepToRun = incoming_data.get("num_StepToRun")
RehearsalLog = incoming_data.get("RehearsalLog", [])
enable_dynamic = incoming_data.get("enable_dynamic", False)
restart_from_step_index = incoming_data.get("restart_from_step_index") # 新增:支持从指定步骤重新执行
# 如果指定了重新执行起始步骤,截断 RehearsalLog
if restart_from_step_index is not None:
print(f"🔄 从步骤 {restart_from_step_index + 1} 重新执行,正在截断 RehearsalLog...")
RehearsalLog = truncate_rehearsal_log(RehearsalLog, restart_from_step_index)
print(f"✅ RehearsalLog 已截断,保留 {sum(1 for node in RehearsalLog if node.get('LogNodeType') == 'step')} 个步骤的结果")
# 如果前端传入了execution_id使用前端的否则生成新的
execution_id = incoming_data.get("execution_id")
if not execution_id:
import time
execution_id = f"{plan.get('General Goal', '').replace(' ', '_')}_{int(time.time() * 1000)}"
if enable_dynamic:
# 动态模式使用executePlan_streaming_dynamic
from AgentCoord.RehearsalEngine_V2.ExecutePlan_Optimized import executePlan_streaming_dynamic
# 发送执行ID确认使用的ID
emit('progress', {
'id': request_id,
'status': 'execution_started',
'execution_id': execution_id,
'message': '执行已启动,支持动态追加步骤'
})
for chunk in executePlan_streaming_dynamic(
plan=plan,
num_StepToRun=num_StepToRun,
RehearsalLog=RehearsalLog,
AgentProfile_Dict=AgentProfile_Dict,
execution_id=execution_id
):
emit('progress', {
'id': request_id,
'status': 'streaming',
'data': chunk.replace('data: ', '').replace('\n\n', '')
})
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'complete',
'data': None
})
else:
# 非动态模式:使用原有方式
for chunk in executePlan_streaming(
plan=plan,
num_StepToRun=num_StepToRun,
RehearsalLog=RehearsalLog,
AgentProfile_Dict=AgentProfile_Dict,
):
emit('progress', {
'id': request_id,
'status': 'streaming',
'data': chunk.replace('data: ', '').replace('\n\n', '')
})
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'complete',
'data': None
})
except Exception as e:
# 发送错误信息
emit('progress', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('add_steps_to_execution')
def handle_add_steps_to_execution(data):
"""
WebSocket版本向正在执行的任务追加新步骤
请求格式
{
"id": "request-id",
"action": "add_steps_to_execution",
"data": {
"execution_id": "execution_id",
"new_steps": [...]
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager
execution_id = incoming_data.get('execution_id')
new_steps = incoming_data.get('new_steps', [])
if not execution_id:
emit('response', {
'id': request_id,
'status': 'error',
'error': '缺少execution_id参数'
})
return
# 追加新步骤到执行队列
added_count = dynamic_execution_manager.add_steps(execution_id, new_steps)
if added_count > 0:
print(f"✅ 成功追加 {added_count} 个步骤到执行队列: {execution_id}")
emit('response', {
'id': request_id,
'status': 'success',
'data': {
'message': f'成功追加 {added_count} 个步骤',
'added_count': added_count
}
})
else:
print(f"⚠️ 无法追加步骤执行ID不存在或已结束: {execution_id}")
emit('response', {
'id': request_id,
'status': 'error',
'error': '执行ID不存在或已结束'
})
except Exception as e:
print(f"❌ 追加步骤失败: {str(e)}")
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('generate_base_plan')
def handle_generate_base_plan_ws(data):
"""
WebSocket版本生成基础计划支持流式/分步返回
请求格式
{
"id": "request-id",
"action": "generate_base_plan",
"data": {
"General Goal": "...",
"Initial Input Object": [...]
}
}
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "generating_outline", "message": "正在生成计划大纲..."}
- progress: {"id": request_id, "status": "streaming", "stage": "processing_steps", "step": 1, "total": 3, "message": "正在处理步骤 1/3..."}
- response: {"id": request_id, "status": "success", "data": basePlan}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
requestIdentifier = str((
"/generate_basePlan",
incoming_data.get("General Goal"),
incoming_data.get("Initial Input Object"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 阶段1生成计划大纲
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generating_outline',
'message': '📋 正在生成计划大纲...'
})
from AgentCoord.PlanEngine.planOutline_Generator import generate_PlanOutline
PlanOutline = generate_PlanOutline(
InitialObject_List=incoming_data.get("Initial Input Object"),
General_Goal=incoming_data.get("General Goal")
)
# 阶段2构建基础计划逐步添加步骤
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'building_plan',
'total_steps': len(PlanOutline),
'message': f'🔨 正在构建计划,共 {len(PlanOutline)} 个步骤...'
})
basePlan = {
"General Goal": incoming_data.get("General Goal"),
"Initial Input Object": incoming_data.get("Initial Input Object"),
"Collaboration Process": []
}
for idx, stepItem in enumerate(PlanOutline, 1):
# 添加智能体选择和任务流程字段
stepItem["AgentSelection"] = []
stepItem["TaskProcess"] = []
stepItem["Collaboration_Brief_frontEnd"] = {
"template": "",
"data": {}
}
basePlan["Collaboration Process"].append(stepItem)
# 发送进度更新
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'adding_step',
'step': idx,
'total': len(PlanOutline),
'step_name': stepItem.get("StepName", ""),
'message': f'✅ 已添加步骤 {idx}/{len(PlanOutline)}: {stepItem.get("StepName", "")}'
})
# 阶段3处理渲染规范
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'rendering',
'message': '🎨 正在处理渲染规范...'
})
basePlan_withRenderSpec = Add_Collaboration_Brief_FrontEnd(basePlan)
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = basePlan_withRenderSpec
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': '✅ 计划生成完成'
})
# 返回最终结果
emit('response', {
'id': request_id,
'status': 'success',
'data': basePlan_withRenderSpec
})
except ValueError as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': f"An unexpected error occurred: {str(e)}"
})
@socketio.on('fill_step_task')
def handle_fill_step_task_ws(data):
"""
WebSocket版本填充步骤任务支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始填充步骤任务..."}
- progress: {"id": request_id, "status": "streaming", "stage": "agent_selection", "message": "正在生成智能体选择..."}
- progress: {"id": request_id, "status": "streaming", "stage": "task_process", "message": "正在生成任务流程..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "任务填充完成"}
- response: {"id": request_id, "status": "success", "data": filled_stepTask}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
print(f"📥 [WS] 收到 fill_step_task 请求: {request_id}")
try:
# 检查缓存
requestIdentifier = str((
"/fill_stepTask",
incoming_data.get("General Goal"),
incoming_data.get("stepTask"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
print(f"✅ [WS] 使用缓存返回: {request_id}")
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'message': f'🚀 开始填充步骤任务: {incoming_data.get("stepTask", {}).get("StepName", "")}'
})
print(f"⏳ [WS] 开始处理 fill_step_task...")
# 阶段1生成智能体选择
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'agent_selection',
'message': '👥 正在生成智能体选择...'
})
from AgentCoord.PlanEngine.AgentSelection_Generator import generate_AgentSelection
stepTask = incoming_data.get("stepTask")
Current_Task = {
"TaskName": stepTask.get("StepName"),
"InputObject_List": stepTask.get("InputObject_List"),
"OutputObject": stepTask.get("OutputObject"),
"TaskContent": stepTask.get("TaskContent"),
}
AgentSelection = generate_AgentSelection(
General_Goal=incoming_data.get("General Goal"),
Current_Task=Current_Task,
Agent_Board=AgentBoard,
)
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'agent_selection_done',
'message': f'✅ 智能体选择完成: {", ".join(AgentSelection)}'
})
# 阶段2生成任务流程
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'task_process',
'message': '📝 正在生成任务流程...'
})
import AgentCoord.util as util
from AgentCoord.PlanEngine.taskProcess_Generator import generate_TaskProcess
Current_Task_Description = {
"TaskName": stepTask.get("StepName"),
"AgentInvolved": [
{"Name": name, "Profile": AgentProfile_Dict[name]}
for name in AgentSelection
],
"InputObject_List": stepTask.get("InputObject_List"),
"OutputObject": stepTask.get("OutputObject"),
"CurrentTaskDescription": util.generate_template_sentence_for_CollaborationBrief(
stepTask.get("InputObject_List"),
stepTask.get("OutputObject"),
AgentSelection,
stepTask.get("TaskContent"),
),
}
TaskProcess = generate_TaskProcess(
General_Goal=incoming_data.get("General Goal"),
Current_Task_Description=Current_Task_Description,
)
# 构建结果
stepTask["AgentSelection"] = AgentSelection
stepTask["TaskProcess"] = TaskProcess
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'task_process_done',
'message': f'✅ 任务流程生成完成,共 {len(TaskProcess)} 个动作'
})
# 阶段3处理渲染规范
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'rendering',
'message': '🎨 正在处理渲染规范...'
})
filled_stepTask = Add_Collaboration_Brief_FrontEnd(stepTask)
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = filled_stepTask
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': '✅ 任务填充完成'
})
# 返回结果
print(f"✅ [WS] fill_step_task 处理完成: {request_id}")
emit('response', {
'id': request_id,
'status': 'success',
'data': filled_stepTask
})
except Exception as e:
print(f"❌ [WS] fill_step_task 处理失败: {request_id}, 错误: {str(e)}")
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('fill_step_task_process')
def handle_fill_step_task_process_ws(data):
"""
WebSocket版本填充步骤任务流程支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始生成任务流程..."}
- progress: {"id": request_id, "status": "streaming", "stage": "generating", "message": "正在生成任务流程..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "任务流程生成完成"}
- response: {"id": request_id, "status": "success", "data": filled_stepTask}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
requestIdentifier = str((
"/fill_stepTask_TaskProcess",
incoming_data.get("General Goal"),
incoming_data.get("stepTask_lackTaskProcess"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
stepTask = incoming_data.get("stepTask_lackTaskProcess")
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'message': f'🚀 开始生成任务流程: {stepTask.get("StepName", "")}'
})
# 生成任务流程
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generating',
'message': '📝 正在生成任务流程...'
})
filled_stepTask = fill_stepTask_TaskProcess(
General_Goal=incoming_data.get("General Goal"),
stepTask=stepTask,
AgentProfile_Dict=AgentProfile_Dict,
)
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generated',
'message': f'✅ 任务流程生成完成,共 {len(filled_stepTask.get("TaskProcess", []))} 个动作'
})
# 处理渲染规范
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'rendering',
'message': '🎨 正在处理渲染规范...'
})
filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask)
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = filled_stepTask
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': '✅ 任务流程生成完成'
})
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': filled_stepTask
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('branch_plan_outline')
def handle_branch_plan_outline_ws(data):
"""
WebSocket版本分支任务大纲支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "branch": 1, "total": 3, "message": "正在生成分支 1/3..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "分支大纲生成完成"}
- response: {"id": request_id, "status": "success", "data": branchList}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
requestIdentifier = str((
"/branch_PlanOutline",
incoming_data.get("branch_Number"),
incoming_data.get("Modification_Requirement"),
incoming_data.get("Existing_Steps"),
incoming_data.get("Baseline_Completion"),
incoming_data.get("Initial Input Object"),
incoming_data.get("General Goal"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
branch_Number = incoming_data.get("branch_Number")
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'total_branches': branch_Number,
'message': f'🚀 开始生成分支大纲,共 {branch_Number} 个分支...'
})
# 生成大纲分支(逐步生成)
from AgentCoord.util.converter import read_LLM_Completion
from AgentCoord.PlanEngine.branch_PlanOutline import JSON_PLAN_OUTLINE_BRANCHING
import json
prompt = f"""
## Instruction
Based on "Existing Steps", your task is to comeplete the "Remaining Steps" for the plan for "General Goal".
Note: "Modification Requirement" specifies how to modify the "Baseline Completion" for a better/alternative solution.
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all content, including StepName, TaskContent, and OutputObject fields.**
## General Goal (Specify the general goal for the plan)
{incoming_data.get("General Goal")}
## Initial Key Object List (Specify the list of initial key objects available for use as the input object of a Step)
{incoming_data.get("Initial Input Object")}
## Existing Steps
{json.dumps(incoming_data.get("Existing_Steps"), indent=4)}
## Baseline Completion
{json.dumps(incoming_data.get("Baseline_Completion"), indent=4)}
## Modification Requirement
{incoming_data.get("Modification_Requirement")}
"""
branch_List = []
for i in range(branch_Number):
# 发送进度更新
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generating_branch',
'branch': i + 1,
'total': branch_Number,
'message': f'🌿 正在生成分支大纲 {i+1}/{branch_Number}...'
})
messages = [
{
"role": "system",
"content": f" The JSON object must use the schema: {json.dumps(JSON_PLAN_OUTLINE_BRANCHING.model_json_schema(), indent=2)}",
},
{"role": "system", "content": prompt},
]
Remaining_Steps = read_LLM_Completion(messages, useGroq=False)["Remaining Steps"]
branch_List.append(Remaining_Steps)
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'branch_done',
'branch': i + 1,
'total': branch_Number,
'steps_count': len(Remaining_Steps),
'message': f'✅ 分支 {i+1}/{branch_Number} 生成完成,包含 {len(Remaining_Steps)} 个步骤'
})
# 处理渲染规范
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'rendering',
'message': '🎨 正在处理渲染规范...'
})
branchList = Add_Collaboration_Brief_FrontEnd(branch_List)
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = branchList
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': f'✅ 分支大纲生成完成,共 {branch_Number} 个分支'
})
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': branchList
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('branch_task_process')
def handle_branch_task_process_ws(data):
"""
WebSocket版本分支任务流程支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "branch": 1, "total": 3, "message": "正在生成分支任务流程 1/3..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "分支任务流程生成完成"}
- response: {"id": request_id, "status": "success", "data": branchList}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
requestIdentifier = str((
"/branch_TaskProcess",
incoming_data.get("branch_Number"),
incoming_data.get("Modification_Requirement"),
incoming_data.get("Existing_Steps"),
incoming_data.get("Baseline_Completion"),
incoming_data.get("stepTaskExisting"),
incoming_data.get("General Goal"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
branch_Number = incoming_data.get("branch_Number")
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'total_branches': branch_Number,
'message': f'🚀 开始生成分支任务流程,共 {branch_Number} 个分支...'
})
# 生成任务流程分支(逐步生成)
from AgentCoord.util.converter import read_LLM_Completion
from AgentCoord.PlanEngine.branch_TaskProcess import (
JSON_TASK_PROCESS_BRANCHING,
ACT_SET,
PROMPT_TASK_PROCESS_BRANCHING
)
import AgentCoord.util as util
import json
stepTaskExisting = incoming_data.get("stepTaskExisting")
Current_Task_Description = {
"TaskName": stepTaskExisting.get("StepName"),
"AgentInvolved": [
{"Name": name, "Profile": AgentProfile_Dict[name]}
for name in stepTaskExisting.get("AgentSelection", [])
],
"InputObject_List": stepTaskExisting.get("InputObject_List"),
"OutputObject": stepTaskExisting.get("OutputObject"),
"CurrentTaskDescription": util.generate_template_sentence_for_CollaborationBrief(
stepTaskExisting.get("InputObject_List"),
stepTaskExisting.get("OutputObject"),
stepTaskExisting.get("AgentSelection"),
stepTaskExisting.get("TaskContent"),
),
}
prompt = PROMPT_TASK_PROCESS_BRANCHING.format(
Modification_Requirement=incoming_data.get("Modification_Requirement"),
Current_Task_Description=json.dumps(Current_Task_Description, indent=4),
Existing_Steps=json.dumps(incoming_data.get("Existing_Steps"), indent=4),
Baseline_Completion=json.dumps(incoming_data.get("Baseline_Completion"), indent=4),
General_Goal=incoming_data.get("General Goal"),
Act_Set=ACT_SET,
)
branch_List = []
for i in range(branch_Number):
# 发送进度更新
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generating_branch',
'branch': i + 1,
'total': branch_Number,
'message': f'🌿 正在生成分支任务流程 {i+1}/{branch_Number}...'
})
messages = [
{
"role": "system",
"content": f" The JSON object must use the schema: {json.dumps(JSON_TASK_PROCESS_BRANCHING.model_json_schema(), indent=2)}",
},
{"role": "system", "content": prompt},
]
Remaining_Steps = read_LLM_Completion(messages, useGroq=False)["Remaining Steps"]
branch_List.append(Remaining_Steps)
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'branch_done',
'branch': i + 1,
'total': branch_Number,
'actions_count': len(Remaining_Steps),
'message': f'✅ 分支 {i+1}/{branch_Number} 生成完成,包含 {len(Remaining_Steps)} 个动作'
})
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = branch_List
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': f'✅ 分支任务流程生成完成,共 {branch_Number} 个分支'
})
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': branch_List
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('agent_select_modify_init')
def handle_agent_select_modify_init_ws(data):
"""
WebSocket版本智能体选择评分初始化支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始生成能力需求..."}
- progress: {"id": request_id, "status": "streaming", "stage": "requirements", "message": "能力需求: [xxx, yyy, zzz]"}
- progress: {"id": request_id, "status": "streaming", "stage": "scoring", "aspect": 1, "total": 3, "message": "正在评分能力 1/3..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "智能体评分完成"}
- response: {"id": request_id, "status": "success", "data": scoreTable}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
requestIdentifier = str((
"/agentSelectModify_init",
incoming_data.get("General Goal"),
incoming_data.get("stepTask"),
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'message': '🚀 开始生成智能体能力需求...'
})
from AgentCoord.util.converter import read_LLM_Completion
from AgentCoord.PlanEngine.AgentSelectModify import (
JSON_ABILITY_REQUIREMENT_GENERATION,
PROMPT_ABILITY_REQUIREMENT_GENERATION,
agentAbilityScoring
)
import json
# 阶段1生成能力需求列表
stepTask = incoming_data.get("stepTask")
Current_Task = {
"TaskName": stepTask.get("StepName"),
"InputObject_List": stepTask.get("InputObject_List"),
"OutputObject": stepTask.get("OutputObject"),
"TaskContent": stepTask.get("TaskContent"),
}
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'generating_requirements',
'message': '📋 正在生成能力需求列表...'
})
messages = [
{
"role": "system",
"content": f" The JSON object must use the schema: {json.dumps(JSON_ABILITY_REQUIREMENT_GENERATION.model_json_schema(), indent=2)}",
},
{
"role": "system",
"content": PROMPT_ABILITY_REQUIREMENT_GENERATION.format(
General_Goal=incoming_data.get("General Goal"),
Current_Task=json.dumps(Current_Task, indent=4),
),
},
]
Ability_Requirement_List = read_LLM_Completion(messages)["AbilityRequirement"]
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'requirements_generated',
'requirements': Ability_Requirement_List,
'message': f'✅ 能力需求生成完成: {", ".join(Ability_Requirement_List)}'
})
# 阶段2为每个能力需求进行智能体评分
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'scoring',
'total_aspects': len(Ability_Requirement_List),
'message': f'📊 开始为 {len(Ability_Requirement_List)} 个能力需求评分...'
})
scoreTable = agentAbilityScoring(AgentBoard, Ability_Requirement_List)
# 逐步报告评分进度
for idx, (ability, scores) in enumerate(scoreTable.items(), 1):
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'aspect_scored',
'aspect': idx,
'total': len(Ability_Requirement_List),
'ability': ability,
'message': f'✅ 能力 "{ability}" 评分完成 ({idx}/{len(Ability_Requirement_List)})'
})
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = scoreTable
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': f'✅ 智能体评分完成,共 {len(Ability_Requirement_List)} 个能力维度'
})
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': scoreTable
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('agent_select_modify_add_aspect')
def handle_agent_select_modify_add_aspect_ws(data):
"""
WebSocket版本添加新的评估维度支持流式/分步返回
流式事件
- progress: {"id": request_id, "status": "streaming", "stage": "starting", "aspect": "新能力", "message": "开始为新能力评分..."}
- progress: {"id": request_id, "status": "streaming", "stage": "scoring", "message": "正在评分..."}
- progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "评分完成"}
- response: {"id": request_id, "status": "success", "data": scoreTable}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
# 检查缓存
aspectList = incoming_data.get("aspectList")
newAspect = aspectList[-1] if aspectList else None
requestIdentifier = str((
"/agentSelectModify_addAspect",
aspectList,
))
if USE_CACHE and requestIdentifier in Request_Cache:
emit('response', {
'id': request_id,
'status': 'success',
'data': Request_Cache[requestIdentifier]
})
return
# 开始处理
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'starting',
'message': f'🚀 开始为新能力维度评分: {newAspect or "Unknown"}'
})
# 添加新维度并评分
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'scoring',
'aspect': newAspect,
'message': f'📊 正在为能力 "{newAspect}" 评分...'
})
scoreTable = AgentSelectModify_addAspect(
aspectList=aspectList,
Agent_Board=AgentBoard
)
# 发送完成信号
emit('progress', {
'id': request_id,
'status': 'streaming',
'stage': 'complete',
'message': f'✅ 能力 "{newAspect}" 评分完成'
})
# 缓存结果
if USE_CACHE:
Request_Cache[requestIdentifier] = scoreTable
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': scoreTable
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('set_agents')
def handle_set_agents_ws(data):
"""
WebSocket版本设置智能体
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
global AgentBoard, AgentProfile_Dict, yaml_data
try:
AgentBoard = incoming_data
AgentProfile_Dict = {}
for item in AgentBoard:
name = item["Name"]
if all(item.get(field) for field in ["apiUrl", "apiKey", "apiModel"]):
agent_config = {
"profile": item["Profile"],
"apiUrl": item["apiUrl"],
"apiKey": item["apiKey"],
"apiModel": item["apiModel"],
"useCustomAPI": True
}
else:
agent_config = {
"profile": item["Profile"],
"apiUrl": yaml_data.get("OPENAI_API_BASE"),
"apiKey": yaml_data.get("OPENAI_API_KEY"),
"apiModel": yaml_data.get("OPENAI_API_MODEL"),
"useCustomAPI": False
}
AgentProfile_Dict[name] = agent_config
# 返回结果
emit('response', {
'id': request_id,
'status': 'success',
'data': {"code": 200, "content": "set agentboard successfully"}
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('stop_generation')
def handle_stop_generation(data):
"""
WebSocket版本停止生成任务
请求格式
{
"id": "request-id",
"action": "stop_generation",
"data": {
"goal": "任务描述"
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
goal = incoming_data.get('goal', '')
# TODO: 这里可以添加实际的停止逻辑
# 例如:设置全局停止标志,通知所有正在运行的生成任务停止
print(f"🛑 收到停止生成请求: goal={goal}")
# 返回成功响应
emit('response', {
'id': request_id,
'status': 'success',
'data': {"message": "已发送停止信号"}
})
except Exception as e:
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('pause_execution')
def handle_pause_execution(data):
"""
WebSocket版本暂停任务执行
请求格式
{
"id": "request-id",
"action": "pause_execution",
"data": {
"goal": "任务描述"
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
goal = incoming_data.get('goal', '')
# 检查当前执行的任务是否匹配
current_goal = execution_state_manager.get_goal()
if current_goal and current_goal != goal:
print(f"⚠️ 任务目标不匹配: 当前={current_goal}, 请求={goal}")
emit('response', {
'id': request_id,
'status': 'error',
'error': '任务目标不匹配'
})
return
# 调用执行状态管理器暂停
success = execution_state_manager.pause_execution()
if success:
print(f"⏸️ [DEBUG] 暂停成功! 当前状态: {execution_state_manager.get_status().value}")
print(f"⏸️ [DEBUG] should_pause: {execution_state_manager._should_pause}")
emit('response', {
'id': request_id,
'status': 'success',
'data': {"message": "已暂停执行,可随时继续"}
})
else:
print(f"⚠️ [DEBUG] 暂停失败,当前状态: {execution_state_manager.get_status().value}")
emit('response', {
'id': request_id,
'status': 'error',
'error': f'无法暂停,当前状态: {execution_state_manager.get_status().value}'
})
except Exception as e:
print(f"❌ 暂停执行失败: {str(e)}")
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('resume_execution')
def handle_resume_execution(data):
"""
WebSocket版本恢复任务执行
请求格式
{
"id": "request-id",
"action": "resume_execution",
"data": {
"goal": "任务描述"
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
goal = incoming_data.get('goal', '')
# 检查当前执行的任务是否匹配
current_goal = execution_state_manager.get_goal()
if current_goal and current_goal != goal:
print(f"⚠️ 任务目标不匹配: 当前={current_goal}, 请求={goal}")
emit('response', {
'id': request_id,
'status': 'error',
'error': '任务目标不匹配'
})
return
# 调用执行状态管理器恢复
success = execution_state_manager.resume_execution()
if success:
print(f"▶️ 已恢复执行: goal={goal}")
emit('response', {
'id': request_id,
'status': 'success',
'data': {"message": "已恢复执行"}
})
else:
print(f"⚠️ 恢复失败,当前状态: {execution_state_manager.get_status()}")
emit('response', {
'id': request_id,
'status': 'error',
'error': f'无法恢复,当前状态: {execution_state_manager.get_status().value}'
})
except Exception as e:
print(f"❌ 恢复执行失败: {str(e)}")
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
@socketio.on('stop_execution')
def handle_stop_execution(data):
"""
WebSocket版本停止任务执行
请求格式
{
"id": "request-id",
"action": "stop_execution",
"data": {
"goal": "任务描述"
}
}
"""
request_id = data.get('id')
incoming_data = data.get('data', {})
try:
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
goal = incoming_data.get('goal', '')
# 检查当前执行的任务是否匹配
current_goal = execution_state_manager.get_goal()
if current_goal and current_goal != goal:
print(f"⚠️ 任务目标不匹配: 当前={current_goal}, 请求={goal}")
emit('response', {
'id': request_id,
'status': 'error',
'error': '任务目标不匹配'
})
return
# 调用执行状态管理器停止
success = execution_state_manager.stop_execution()
if success:
print(f"🛑 [DEBUG] 停止成功! 当前状态: {execution_state_manager.get_status().value}")
print(f"🛑 [DEBUG] should_stop: {execution_state_manager._should_stop}")
emit('response', {
'id': request_id,
'status': 'success',
'data': {"message": "已停止执行"}
})
else:
print(f"⚠️ [DEBUG] 停止失败,当前状态: {execution_state_manager.get_status().value}")
emit('response', {
'id': request_id,
'status': 'error',
'error': f'无法停止,当前状态: {execution_state_manager.get_status().value}'
})
except Exception as e:
print(f"❌ 停止执行失败: {str(e)}")
emit('response', {
'id': request_id,
'status': 'error',
'error': str(e)
})
2024-04-07 15:04:00 +08:00
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="start the backend for AgentCoord"
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="set the port number, 8000 by default.",
2024-04-07 15:04:00 +08:00
)
args = parser.parse_args()
init()
# 使用 socketio.run 替代 app.run支持WebSocket
socketio.run(app, host="0.0.0.0", port=args.port, debug=True, allow_unsafe_werkzeug=True)
#socketio.run(app, host="0.0.0.0", port=args.port, debug=False, allow_unsafe_werkzeug=True)