""" 优化版执行计划 - 支持动态追加步骤 在执行过程中可以接收新的步骤并追加到执行队列 """ import asyncio import json import time from typing import List, Dict, Set, Generator, Any import AgentCoord.RehearsalEngine_V2.Action as Action import AgentCoord.util as util from termcolor import colored from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager # ==================== 配置参数 ==================== # 最大并发请求数 MAX_CONCURRENT_REQUESTS = 2 # 批次之间的延迟 BATCH_DELAY = 1.0 # 429错误重试次数和延迟 MAX_RETRIES = 3 RETRY_DELAY = 5.0 # ==================== 限流器 ==================== class RateLimiter: """ 异步限流器,控制并发请求数量 """ def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS): self.semaphore = asyncio.Semaphore(max_concurrent) self.max_concurrent = max_concurrent async def __aenter__(self): await self.semaphore.acquire() return self async def __aexit__(self, *args): self.semaphore.release() # 全局限流器实例 rate_limiter = RateLimiter() def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]: """ 构建动作依赖图 Args: TaskProcess: 任务流程列表 Returns: 依赖映射字典 {action_index: [dependent_action_indices]} """ dependency_map = {i: [] for i in range(len(TaskProcess))} for i, action in enumerate(TaskProcess): important_inputs = action.get('ImportantInput', []) if not important_inputs: continue # 检查是否依赖其他动作的ActionResult for j, prev_action in enumerate(TaskProcess): if i == j: continue # 判断是否依赖前一个动作的结果 if any( inp.startswith('ActionResult:') and inp == f'ActionResult:{prev_action["ID"]}' for inp in important_inputs ): dependency_map[i].append(j) return dependency_map def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]: """ 将动作分为多个批次,每批内部可以并行执行 Args: TaskProcess: 任务流程列表 dependency_map: 依赖图 Returns: 批次列表 [[batch1_indices], [batch2_indices], ...] """ batches = [] completed: Set[int] = set() while len(completed) < len(TaskProcess): # 找出所有依赖已满足的动作 ready_to_run = [ i for i in range(len(TaskProcess)) if i not in completed and all(dep in completed for dep in dependency_map[i]) ] if not ready_to_run: # 避免死循环 remaining = [i for i in range(len(TaskProcess)) if i not in completed] if remaining: ready_to_run = remaining[:1] else: break batches.append(ready_to_run) completed.update(ready_to_run) return batches async def execute_single_action_async( ActionInfo: Dict, General_Goal: str, TaskDescription: str, OutputName: str, KeyObjects: Dict, ActionHistory: List, agentName: str, AgentProfile_Dict: Dict, InputName_List: List[str] ) -> Dict: """ 异步执行单个动作 Args: ActionInfo: 动作信息 General_Goal: 总体目标 TaskDescription: 任务描述 OutputName: 输出对象名称 KeyObjects: 关键对象字典 ActionHistory: 动作历史 agentName: 智能体名称 AgentProfile_Dict: 智能体配置字典 InputName_List: 输入名称列表 Returns: 动作执行结果 """ actionType = ActionInfo["ActionType"] # 创建动作实例 if actionType in Action.customAction_Dict: currentAction = Action.customAction_Dict[actionType]( info=ActionInfo, OutputName=OutputName, KeyObjects=KeyObjects, ) else: currentAction = Action.BaseAction( info=ActionInfo, OutputName=OutputName, KeyObjects=KeyObjects, ) # 在线程池中运行,避免阻塞事件循环 loop = asyncio.get_event_loop() ActionInfo_with_Result = await loop.run_in_executor( None, lambda: currentAction.run( General_Goal=General_Goal, TaskDescription=TaskDescription, agentName=agentName, AgentProfile_Dict=AgentProfile_Dict, InputName_List=InputName_List, OutputName=OutputName, KeyObjects=KeyObjects, ActionHistory=ActionHistory, ) ) return ActionInfo_with_Result async def execute_step_async_streaming( stepDescrip: Dict, General_Goal: str, AgentProfile_Dict: Dict, KeyObjects: Dict, step_index: int, total_steps: int, execution_id: str = None, RehearsalLog: List = None # 用于追加日志到历史记录 ) -> Generator[Dict, None, None]: """ 异步执行单个步骤,支持流式返回 Args: stepDescrip: 步骤描述 General_Goal: 总体目标 AgentProfile_Dict: 智能体配置字典 KeyObjects: 关键对象字典 step_index: 步骤索引 total_steps: 总步骤数 execution_id: 执行ID Yields: 执行事件字典 """ # 准备步骤信息 StepName = ( util.camel_case_to_normal(stepDescrip["StepName"]) if util.is_camel_case(stepDescrip["StepName"]) else stepDescrip["StepName"] ) TaskContent = stepDescrip["TaskContent"] InputName_List = ( [ ( util.camel_case_to_normal(obj) if util.is_camel_case(obj) else obj ) for obj in stepDescrip["InputObject_List"] ] if stepDescrip["InputObject_List"] is not None else None ) OutputName = ( util.camel_case_to_normal(stepDescrip["OutputObject"]) if util.is_camel_case(stepDescrip["OutputObject"]) else stepDescrip["OutputObject"] ) Agent_List = stepDescrip["AgentSelection"] TaskProcess = stepDescrip["TaskProcess"] TaskDescription = ( util.converter.generate_template_sentence_for_CollaborationBrief( input_object_list=InputName_List, output_object=OutputName, agent_list=Agent_List, step_task=TaskContent, ) ) # 初始化日志节点 inputObject_Record = [ {InputName: KeyObjects[InputName]} for InputName in InputName_List ] stepLogNode = { "LogNodeType": "step", "NodeId": StepName, "InputName_List": InputName_List, "OutputName": OutputName, "chatLog": [], "inputObject_Record": inputObject_Record, } objectLogNode = { "LogNodeType": "object", "NodeId": OutputName, "content": None, } # 返回步骤开始事件 yield { "type": "step_start", "step_index": step_index, "total_steps": total_steps, "step_name": StepName, "task_description": TaskDescription, } # 构建动作依赖图 dependency_map = build_action_dependency_graph(TaskProcess) batches = get_parallel_batches(TaskProcess, dependency_map) ActionHistory = [] total_actions = len(TaskProcess) completed_actions = 0 # 步骤开始日志 util.print_colored( f"📋 步骤 {step_index + 1}/{total_steps}: {StepName} ({total_actions} 个动作, 分 {len(batches)} 批执行)", text_color="cyan" ) # 分批执行动作 for batch_index, batch_indices in enumerate(batches): # 在每个批次执行前检查暂停状态 should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: return batch_size = len(batch_indices) # 批次执行日志 if batch_size > 1: util.print_colored( f"🚦 批次 {batch_index + 1}/{len(batches)}: 并行执行 {batch_size} 个动作", text_color="blue" ) else: util.print_colored( f"🔄 批次 {batch_index + 1}/{len(batches)}: 串行执行", text_color="yellow" ) # 并行执行当前批次的所有动作 tasks = [ execute_single_action_async( TaskProcess[i], General_Goal=General_Goal, TaskDescription=TaskDescription, OutputName=OutputName, KeyObjects=KeyObjects, ActionHistory=ActionHistory, agentName=TaskProcess[i]["AgentName"], AgentProfile_Dict=AgentProfile_Dict, InputName_List=InputName_List ) for i in batch_indices ] # 等待当前批次完成 batch_results = await asyncio.gather(*tasks) # 逐个返回结果 for i, result in enumerate(batch_results): action_index_in_batch = batch_indices[i] completed_actions += 1 util.print_colored( f"✅ 动作 {completed_actions}/{total_actions} 完成: {result['ActionType']} by {result['AgentName']}", text_color="green" ) ActionHistory.append(result) # 立即返回该动作结果 yield { "type": "action_complete", "step_index": step_index, "step_name": StepName, "action_index": action_index_in_batch, "total_actions": total_actions, "completed_actions": completed_actions, "action_result": result, "batch_info": { "batch_index": batch_index, "batch_size": batch_size, "is_parallel": batch_size > 1 } } # 步骤完成 objectLogNode["content"] = KeyObjects[OutputName] stepLogNode["ActionHistory"] = ActionHistory # 收集该步骤使用的 agent(去重) assigned_agents_in_step = list(set(Agent_List)) if Agent_List else [] # 追加到 RehearsalLog(因为 RehearsalLog 是可变对象,会反映到原列表) if RehearsalLog is not None: RehearsalLog.append(stepLogNode) RehearsalLog.append(objectLogNode) yield { "type": "step_complete", "step_index": step_index, "step_name": StepName, "step_log_node": stepLogNode, "object_log_node": objectLogNode, "assigned_agents": {StepName: assigned_agents_in_step}, # 该步骤使用的 agent } def executePlan_streaming_dynamic( plan: Dict, num_StepToRun: int, RehearsalLog: List, AgentProfile_Dict: Dict, existingKeyObjects: Dict = None, execution_id: str = None ) -> Generator[str, None, None]: """ 动态执行计划,支持在执行过程中追加新步骤 Args: plan: 执行计划 num_StepToRun: 要运行的步骤数 RehearsalLog: 已执行的历史记录 AgentProfile_Dict: 智能体配置 existingKeyObjects: 已存在的KeyObjects execution_id: 执行ID(用于动态追加步骤) Yields: SSE格式的事件字符串 """ # 初始化执行状态 general_goal = plan.get("General Goal", "") # 确保有 execution_id if execution_id is None: import time execution_id = f"{general_goal}_{int(time.time() * 1000)}" execution_state_manager.start_execution(execution_id, general_goal) # 准备执行 KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {} finishedStep_index = -1 for logNode in RehearsalLog: if logNode["LogNodeType"] == "step": finishedStep_index += 1 if logNode["LogNodeType"] == "object": KeyObjects[logNode["NodeId"]] = logNode["content"] # 确定要运行的步骤范围 if num_StepToRun is None: run_to = len(plan["Collaboration Process"]) else: run_to = (finishedStep_index + 1) + num_StepToRun steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to] # 使用动态执行管理器 if execution_id: # 初始化执行管理器,使用传入的execution_id actual_execution_id = dynamic_execution_manager.start_execution(general_goal, steps_to_run, execution_id) total_steps = len(steps_to_run) # 使用队列实现流式推送 async def produce_events(queue: asyncio.Queue): """异步生产者""" try: step_index = 0 if execution_id: # 动态模式:循环获取下一个步骤 # 等待新步骤的最大次数(避免无限等待) max_empty_wait_cycles = 5 # 最多等待60次,每次等待1秒 empty_wait_count = 0 while True: # 检查暂停状态 should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: util.print_colored("🛑 用户请求停止执行", "red") await queue.put({ "type": "error", "message": "执行已被用户停止" }) break # 获取下一个步骤 stepDescrip = dynamic_execution_manager.get_next_step(execution_id) if stepDescrip is None: # 没有更多步骤了,检查是否应该继续等待 empty_wait_count += 1 # 获取执行信息 execution_info = dynamic_execution_manager.get_execution_info(execution_id) if execution_info: queue_total_steps = execution_info.get("total_steps", 0) completed_steps = execution_info.get("completed_steps", 0) # 如果没有步骤在队列中(queue_total_steps为0),立即退出 if queue_total_steps == 0: break # 如果所有步骤都已完成,等待可能的新步骤 if completed_steps >= queue_total_steps: if empty_wait_count >= max_empty_wait_cycles: # 等待超时,退出执行 break else: # 等待新步骤追加 await asyncio.sleep(1) continue else: # 还有步骤未完成,继续尝试获取 await asyncio.sleep(0.5) empty_wait_count = 0 # 重置等待计数 continue else: # 执行信息不存在,退出 break # 重置等待计数 empty_wait_count = 0 # 获取最新的总步骤数(用于显示) execution_info = dynamic_execution_manager.get_execution_info(execution_id) current_total_steps = execution_info.get("total_steps", total_steps) if execution_info else total_steps # 执行步骤 async for event in execute_step_async_streaming( stepDescrip, plan["General Goal"], AgentProfile_Dict, KeyObjects, step_index, current_total_steps, # 使用动态更新的总步骤数 execution_id, RehearsalLog # 传递 RehearsalLog 用于追加日志 ): if execution_state_manager.is_stopped(execution_id): await queue.put({ "type": "error", "message": "执行已被用户停止" }) return await queue.put(event) # 标记步骤完成 dynamic_execution_manager.mark_step_completed(execution_id) # 更新KeyObjects OutputName = stepDescrip.get("OutputObject", "") if OutputName and OutputName in KeyObjects: # 对象日志节点会在step_complete中发送 pass step_index += 1 else: # 非动态模式:按顺序执行所有步骤 for step_index, stepDescrip in enumerate(steps_to_run): should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: util.print_colored("🛑 用户请求停止执行", "red") await queue.put({ "type": "error", "message": "执行已被用户停止" }) return async for event in execute_step_async_streaming( stepDescrip, plan["General Goal"], AgentProfile_Dict, KeyObjects, step_index, total_steps, execution_id, RehearsalLog # 传递 RehearsalLog 用于追加日志 ): if execution_state_manager.is_stopped(execution_id): await queue.put({ "type": "error", "message": "执行已被用户停止" }) return await queue.put(event) except Exception as e: await queue.put({ "type": "error", "message": f"执行出错: {str(e)}" }) finally: await queue.put(None) # 运行异步任务并实时yield loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: queue = asyncio.Queue(maxsize=10) producer_task = loop.create_task(produce_events(queue)) while True: event = loop.run_until_complete(queue.get()) if event is None: break # 立即转换为SSE格式并发送 event_str = json.dumps(event, ensure_ascii=False) yield f"data: {event_str}\n\n" loop.run_until_complete(producer_task) if not execution_state_manager.is_stopped(execution_id): complete_event = json.dumps({ "type": "execution_complete", "total_steps": total_steps }, ensure_ascii=False) yield f"data: {complete_event}\n\n" finally: # 在关闭事件循环之前先清理执行记录 if execution_id: # 清理执行记录 dynamic_execution_manager.cleanup(execution_id) # 清理执行状态 execution_state_manager.cleanup(execution_id) if 'producer_task' in locals(): if not producer_task.done(): producer_task.cancel() # 确保所有任务都完成后再关闭事件循环 try: pending = asyncio.all_tasks(loop) for task in pending: task.cancel() loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) except Exception: pass # 忽略清理过程中的错误 loop.close() # 保留旧版本函数以保持兼容性 executePlan_streaming = executePlan_streaming_dynamic