OneFlow: 启动 Session

您所在的位置:网站首页 启动session函数 OneFlow: 启动 Session

OneFlow: 启动 Session

2024-01-22 04:26| 来源: 网络整理| 查看: 265

前言

前面在初始化 Session 的时候,通过 CurJobAddOp 将 Op 加入到计算图当中,实际上只是将 Op 加入到 Job 里面,而 Job 只是一个 Protobuf Message 罢了。如果用户定义了多个 Job,那么这些 Job 就会构成一个 JobSet。用户将算子添加完之后,就会调用 Complete 对计算图 (其实就是 Job) 进行优化改写。接下来就是启动 Session,启动 Session 的时候进行了什么重要的事情呢?这篇文章就来分析一下。

结论:启动 Session 的时候将逻辑上的 Job 编译为物理上的 Plan,启动 Runtime 去执行 Plan。 流程回顾

在 Session 初始化的时候,我们可以看到先调用了 InitLazyGlobalSession,然后调用 compiler.Compile 将 Op 逐个加入计算图,接下来就是启动 Session,调用 StartLazyGlobalSession。StartLazyGlobalSession 背后做了什么操作呢?

# python/oneflow/compatible/single_client/framework/session_util.py: 183 def Init(self): assert self.status_ is SessionStatus.OPEN self.status_ = SessionStatus.RUNNING if not oneflow._oneflow_internal.IsEnvInited(): flow.env.init() _TryCompleteConfigProto(self.config_proto) self.resource_ = self.config_proto.resource if not oneflow._oneflow_internal.EagerExecutionEnabled(): c_api_util.InitLazyGlobalSession(self.config_proto) for (job_name, func_desc) in self.job_name2function_desc_.items(): compiler.Compile(self, func_desc, self.config_proto) self.existed_module_names_ = set() self.job_name2var_name2var_blob_ = dict() assert len(self.job_name2function_desc_.items()) > 0 oneflow._oneflow_internal.StartLazyGlobalSession() self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo() self.UpdateInfo4InterfaceOp() if not config_util.api_legacy_model_io_enabled(): check_point_v2.Init() else: self.eager_config_proto_ctx_ = oneflow._oneflow_internal.LogicalConfigProtoContext( str(self.config_proto) ) return self StartLazyGlobalSession

回想一下,我们进入这个方法之前的状态,我们有一个 JobBuildAndInferCtxMgr,里面存有 JobSet。用户定义一个 Job,就 JobSet 就多一个 Job。这个 Job 的状态是什么样子的呢?这个 Job 是用户定义的 Job 函数转化过来的,并且经过了 CurJobBuildAndInferCtx_Complete 优化改写了。

StartLazyGlobalSession 启动 Session,背后做了什么呢?带着问题单步调试跟踪进去看一看。

在 StartLazyGlobalSession 中获取 JobSet,从 JobBuildAndInferCtxMgr 直接拿到。这个细节很重要,JobSet 是承接上一个部分的线索。其实第二篇文章分析 Python 端构图的时候,没有深入 CurJobAddOp 去,因为里面涉及到了 SBP 的推导等。JobSet 是一个 Protobuf message,它的成员是可重复的 Job。JobSet 有 LazyJobBuildAndInferCtxMgr 进行管理,在打开一个 JobBuildAndInferCtx 的时候,会在 JobSet 中新增一个 Job,然后将 Job 传给 JobBuildAndInferCtx。 StartLazyGlobalSession 中最重要的操作是创建一个全局的 Oneflow 对象,然后使用 JobSet 去初始化这个 Oneflow 对象。JobSet 会这个过程中编译成 Plan,然后启动 Runtime。 // oneflow/api/python/session/session.h: 88 inline Maybe StartLazyGlobalSession() { CHECK_NOTNULL_OR_RETURN(Global::Get()) job_set(); if (Global::Get()->enable_debug_mode()) { TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set); } if (job_set.job().empty()) { return Error::JobSetEmptyError() PushKV("session_job_set", job_set); Global::New(job_set.inter_job_reuse_mem_strategy()); Global::New(); JUST(Global::Get()->Init(job_set)); return Maybe::Ok(); } // oneflow/core/job/job_build_and_infer_ctx_mgr.h: 38 class JobBuildAndInferCtxMgr { public: // ... const JobSet& job_set() const { return job_set_; } // ... } 在 Oneflow 全局对象在初始化的过程中,调用 CompileJobsAndPushMergedPlan 将 Job 编译为 MergedPlan。如果不是 Master 节点,那么不会进行编译,会调用 PullPlan 从 Master 拉取 Plan。最后使用 Plan 初始化 Runtime。 // oneflow/core/job/oneflow.cpp: 1005 Maybe Oneflow::Init(const oneflow::JobSet& job_set) { OF_PROFILER_RANGE_GUARD("Oneflow::Init"); // Runtime OF_PROFILER_RANGE_PUSH("CompileJobsAndPushMergedPlan"); JUST(CompileJobsAndPushMergedPlan(job_set.job())); OF_PROFILER_RANGE_POP(); // CompileJobsAndPushMergedPlan double start = GetCurTime(); PullPlan("merged_plan", &plan_); LOG(INFO) job_conf(), i); if (job_desc.Bool("__is_user_function__")) { function_jobs.push_back(jobs.at(i)); } } HashMap push_op_name2parallel_blob_conf; FilterOpName2ParallelBlobConf({OperatorConf::kInputConf}, function_jobs, &push_op_name2parallel_blob_conf); HashMap pull_op_name2parallel_blob_conf; FilterOpName2ParallelBlobConf({OperatorConf::kReturnConf}, function_jobs, &pull_op_name2parallel_blob_conf); for (const auto& pair : push_op_name2parallel_blob_conf) { auto push_job = std::make_shared(); MakePushJob(std::string("System-Push-") + pair.first, pair.first, pair.second, push_job.get()); jobs.emplace_back(push_job); } for (const auto& pair : pull_op_name2parallel_blob_conf) { auto pull_job = std::make_shared(); MakePullJob(std::string("System-Pull-") + pair.first, pair.first, pair.second, pull_job.get()); jobs.emplace_back(pull_job); } std::vector sub_plans(jobs.size()); FOR_RANGE(int64_t, i, 0, jobs.size()) { AddJobName2JobId(jobs.at(i)->job_conf().job_name(), i); auto scope = std::make_unique(jobs.at(i)->job_conf(), i); JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true)); } MergeSubPlan(&plan, std::move(sub_plans)); InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan); InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan); PlanUtil::SetForceInplaceMemBlock(&plan); FinishGlobalCriticalSectionDesc(plan, jobs.size()); Plan main_plan; std::vector identity_tick_op_names; { Job main_job; std::vector lock_back_edges; JUST(MakeMainJob(&main_job, &identity_tick_op_names, &lock_back_edges)); AddJobName2JobId(main_job.job_conf().job_name(), jobs.size()); JUST(CompileMainJob(&main_job, lock_back_edges, jobs.size(), &main_plan)); } LinkMainPlan(&plan, std::move(main_plan), identity_tick_op_names); PlanUtil::CleanUselessMemBlockAndCheckValid(&plan); PlanUtil::DumpCtrlRegstInfoToPlan(&plan); if (Global::Get()->enable_debug_mode()) { TeePersistentLogStream::Create("merged_plan")->Write(plan); PlanUtil::ToDotFile(plan, "/dot/merged_plan.dot"); } return Maybe::Ok(); } 启动

编译完成之后,就可以启动 Runtime 了。

启动 Runtime 主要做几件事情:

所有需要 Plan 的全局对象,调用 AddPlan 将 Plan 传给他们 分解 Plan 的 Task,每个 task 一个 actor,根据 task 上的 job_id 信息,创建 actor 的大小 构建 RuntimeCtx,调用 HandoutTasks 分发 task,并且发送 ActorCmd::kConstructActor 启动 Actor。 向所有 source_tasks 发送 ActorCmd::kStart 启动 actor。 // oneflow/core/job/runtime.cpp: 60 Runtime::Runtime(const Plan& plan, const HashMap& variable_op_name2eager_blob) { { // NOTE(chengcheng): All runtime Global objects AddPlan Global::Get()->AddPlan(plan, variable_op_name2eager_blob); Global::Get()->AddPlan(plan); Global::Get()->AddPlan(plan); collective_boxing_executor_plan_token_ = Global::Get()->AddPlan(plan); } std::vector source_tasks; std::vector other_tasks; int64_t this_machine_task_num = 0; for (const TaskProto& task : plan.task()) { if (task.machine_id() != GlobalProcessCtx::Rank()) { continue; } if (!HasNonCtrlConsumedRegstDescId(task)) { source_tasks.push_back(&task); } else { other_tasks.push_back(&task); } auto it = job_id2actor_size_.find(task.job_id()); if (it == job_id2actor_size_.end()) { auto emplace_ret_pair = job_id2actor_size_.emplace(task.job_id(), 0); CHECK(emplace_ret_pair.second); it = emplace_ret_pair.first; } it->second++; this_machine_task_num++; } RuntimeCtx* runtime_ctx = Global::Get(); runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num); HandoutTasks(source_tasks); HandoutTasks(other_tasks); runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt"); LOG(INFO) GetThrd(task->thrd_id())->AddTask(*task); } SendCmdMsg(tasks, ActorCmd::kConstructActor); } 总结

总结一下 StartLazyGlobalSession,在进入这个方法之前,已经有 JobSet 了,这个 JobSet 是经过 CurJobBuildAndInferCtx_Complete 优化改写了。接下来进入 StartLazyGlobalSession,它会添加更多的 Job 用于模型 IO,用于推送输入、拉取输出,编译连接成 MergedPlan。有了 MergedPlan 之后,就可以带着这个 Plan 启动运行时,启动 Actor。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3