基于DAG的任务依赖调度

您所在的位置:网站首页 任务讲解 基于DAG的任务依赖调度

基于DAG的任务依赖调度

2023-08-09 01:04| 来源: 网络整理| 查看: 265

DAG叫做有向无环图(Directed Acyclic Graph, DAG),读过数据结构的应该都有点印象。

DAG可以用来实现依赖管理,比如:执行A任务之前,必须先完成B任务,因为A任务的输入是B任务的输出。

当整个依赖关系继续变多的时候,就催生出了依赖管理的需求,而A对B的依赖可以描述为图中的一条边。

项目地址

https://github.com/owenliang/task_schedule

DAG

掌握DAG是实现依赖调度的先决条件,DAG有3部分内容需要掌握:

DAG数据结构 DAG拓扑排序(基于入度) DAG逆拓扑序列(基于出度)

推荐读一下这篇博客温习DAG数据结构,拓扑排序的含义与算法:【图论】有向无环图的拓扑排序。

任务依赖

A任务依赖B与C任务,那么就需要建立一条A->B的边、一条A->C的边。

我们需要先执行那些依赖已经执行OK或者压根没有依赖的任务,在这里也就是先执行B和C,最后执行A。

所以,我们需要基于”出度=0″这个条件作为执行条件,任何满足”出度=0″的任务均可以立即执行;在上述例子中,B和C没有依赖其他任务,它们从一开始出度就为0,所以可以立即执行。

任务调度

每次从DAG中找出”出度=0″且”尚未执行”的任务列表,逐个或者并发的执行它们;每当1个任务完成时,就从DAG图中删除这个任务,并发现更多”出度=0″的任务,放入待执行的任务集合中去。

调度过程,就是不断执行依赖完备的任务,并不断发现更多依赖完备的新任务的过程。

构建DAG

C++ 12345678910     /*     *     *             J O B 1     *          /    \     \     *        V       V     V     *      JOB2     JOB3   JOB5     *         \     /     *         V   V     *         JOB4     */

这样一个依赖关系,JOB1依赖JOB2、JOB3、JOB5;JOB2依赖JOB4;JOB3依赖JOB4;

我们通过代码控制,建立这个DAG图:

C++ 123456789101112131415161718192021222324252627282930     // JOB5: 没有依赖    std::vector job5Deps;    graph.addTask("job5", job5Deps);     // JOB4: 没有依赖    std::vector job4Deps;    graph.addTask("job4", job4Deps);     // JOB3: 依赖JOB4    std::vector job3Deps;    job3Deps.push_back("job4");    graph.addTask("job3", job3Deps);     // JOB2: 依赖JOB4    std::vector job2Deps;    job2Deps.push_back("job4");    graph.addTask("job2", job2Deps);     // JOB1: 依赖JOB2和JOB3和JOB5    std::vector job1Deps;    job1Deps.push_back("job2");    job1Deps.push_back("job3");    job1Deps.push_back("job5");    graph.addTask("job1", job1Deps);     // 初始化调度    if (!graph.initGraph()) {        std::cerr C++ 12345678910111213141516171819202122232425262728293031   // 串行执行这些任务    std::vector todo;    do {        todo.clear();         // 获取当前待办任务        graph.getTodoTasks(&todo);        if (!todo.size()) { // 没有待办任务, 结束调度            break;        }         // 输出可以并行处理的待办任务        std::cout

这个过程会打印每一个任务当前依赖哪些任务,以及被哪些任务依赖,方便我调试代码。

TaskNode表示DAG中的一个节点,而TaskNode.outEdge表示这个节点指向了哪些节点,TaskNode.inEdge表示这个节点被哪些节点指向。

上述2个集合随着任务的完成被更新,从而可以随时获知最新的任务依赖关系。

任务调度日志

下面是完整执行日志,展现了整个调度过程中,每个任务依赖与被依赖的关系,以及任务之间的并行关系:

C++ 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 ---------------任务名:job1是否完成:NO(当前)依赖这些任务: job2  job3  job5 (当前)被这些任务依赖:任务名:job2是否完成:NO(当前)依赖这些任务: job4 (当前)被这些任务依赖: job1 任务名:job3是否完成:NO(当前)依赖这些任务: job4 (当前)被这些任务依赖: job1 任务名:job4是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job2  job3 任务名:job5是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 ---------------job4 被执行---------------任务名:job1是否完成:NO(当前)依赖这些任务: job2  job3  job5 (当前)被这些任务依赖:任务名:job2是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job3是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job4是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job2  job3 任务名:job5是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 --------------->>>>>>>>>job5 被执行---------------任务名:job1是否完成:NO(当前)依赖这些任务: job2  job3 (当前)被这些任务依赖:任务名:job2是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job3是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job4是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job2  job3 任务名:job5是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 ---------------job2 被执行---------------任务名:job1是否完成:NO(当前)依赖这些任务: job3 (当前)被这些任务依赖:任务名:job2是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job3是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job4是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job3 任务名:job5是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 --------------->>>>>>>>>job3 被执行---------------任务名:job1是否完成:NO(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job2是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job3是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 任务名:job4是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job5是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖: job1 ---------------job1 被执行---------------任务名:job1是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job2是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job3是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job4是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:任务名:job5是否完成:YES(当前)依赖这些任务:(当前)被这些任务依赖:---------------

最后

基于该demo,可以演化出一个任务调度系统。

用户向系统提交一个DAG拓扑,大概长这样:

拓扑名:hadoop挖掘任务

任务列表:

JOB1 依赖:JOB2,JOB3 JOB2: 依赖:JOB3

系统根据上述配置,创建对应的DAG图,并启动线程/协程完成调度。

 

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3