本文档提供了OpenManus训练逻辑和核心函数的详细解释,重点关注智能体轨迹(trajectories)如何用于损失计算和训练。
OpenManus采用近端策略优化(PPO)算法,主要由以下核心组件组成:
RayPPOTrainer (主训练器)
├── OpenManusAgent (环境交互)
├── ActorRolloutRefWorker (策略网络)
└── CriticWorker (价值网络)
训练核心在RayPPOTrainer.fit()方法中实现:
# 简化的训练循环
for epoch in range(epochs):
for batch in train_dataloader:
# 1. 收集轨迹
trajectories = generation_manager.run_llm_loop(batch)
# 2. 计算复合奖励
compute_total_reward(trajectories)
# 3. 计算优势函数
compute_advantage(batch)
# 4. 更新critic网络
critic_wg.update_critic(batch)
# 5. 更新actor网络
actor_rollout_wg.update_actor(batch)在OpenManusAgent.run_llm_loop和_run_single_rollout中实现:
# _run_single_rollout中的关键逻辑
while not done:
# 获取当前观察
observation = client.observe()
# 生成LLM响应
response = actor_model.generate(observation)
# 解析动作
action = parse_action(response)
# 执行环境步骤
next_obs, reward, done, info = client.step(action)
# 记录轨迹
trajectory.append({"from": "human", "value": next_obs, "reward": reward, "info": info})通过RewardComposer组合多种奖励信号:
# 在_convert_rollout_results_to_dataproto中调用
total_score, breakdown = reward_composer.compute_total_reward(
trajectory=trajectory,
reward_model_info=reward_model_info,
env_name=env_name
)主要奖励组件包括:
GoalReward: 主要任务成功奖励LengthPenalty: 长度惩罚FormatReward: 输出格式正确性奖励
在_convert_rollout_results_to_dataproto方法中,奖励被分配到各个token上:
# 几种奖励分配策略:
if reward_allocation == "last_token":
# 只给最后一个token分配奖励
token_level_rewards[0, last_segment_end] = reward_to_distribute
elif reward_allocation == "uniform_positive":
# 均匀分配正奖励,负奖励仅给最后token
if reward_to_distribute > 0:
reward_per_token = reward_to_distribute / total_agent_tokens
for start, end in agent_indices_in_padded:
token_level_rewards[0, start:end+1] = reward_per_token
elif reward_allocation == "discounted":
# 折扣奖励,从最后一个segment反向分配
gamma = config.algorithm_config.get('gamma', 1.0)
current_reward = reward_to_distribute
for start, end in reversed(agent_indices_in_padded):
# 计算每个segment内的奖励
token_level_rewards[0, start:end+1] = current_reward / segment_len
current_reward *= (gamma ** segment_len)在compute_advantage函数中,使用广义优势估计(GAE)计算优势函数:
if adv_estimator == 'gae':
advantages, returns = core_algos.compute_gae_advantage_return(
token_level_rewards=token_level_rewards,
values=values,
eos_mask=response_mask,
gamma=gamma,
lam=lam
)在update_actor中使用PPO目标函数更新策略:
def update_policy(self, data):
old_log_probs = data.batch['old_log_probs']
advantages = data.batch['advantages']
# 计算当前策略的log概率
current_log_probs = self.compute_log_prob(data)
# 计算策略比率
ratio = torch.exp(current_log_probs - old_log_probs)
# 截断比率
ratio_clipped = torch.clamp(ratio, 1-clip_eps, 1+clip_eps)
# PPO目标
policy_loss = -torch.min(
advantages * ratio,
advantages * ratio_clipped
).mean()
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()在update_critic中通过最小化价值损失更新critic网络:
def update_critic(self, data):
values = self.compute_values(data)
returns = data.batch['returns']
# 价值损失
value_loss = F.mse_loss(values, returns)
self.optimizer.zero_grad()
value_loss.backward()
self.optimizer.step()OpenManus使用Ray和FSDP(完全分片数据并行)进行分布式训练:
ActorRolloutRefWorker: 负责策略网络的前向推理和训练CriticWorker: 负责价值网络的训练RayPPOTrainer: 协调不同worker之间的通信和同步
FSDP通过ShardingStrategy.FULL_SHARD跨节点分片模型参数,允许训练更大的模型。
OpenManus的训练流程整合了几个关键技术:
- 基于PPO的强化学习框架
- 基于轨迹的环境交互和奖励收集
- 组合式奖励计算和灵活的奖励分配策略
- 分布式训练架构支持大规模模型
整个流程的核心在于如何从环境交互中收集有意义的轨迹,并通过适当的奖励函数和优势估计来优化LLM的决策能力。