基于蚁群算法解决多模式资源约束项目调度问题(附源代码)

您所在的位置:网站首页 蚁群算法能解决什么问题和困难 基于蚁群算法解决多模式资源约束项目调度问题(附源代码)

基于蚁群算法解决多模式资源约束项目调度问题(附源代码)

#基于蚁群算法解决多模式资源约束项目调度问题(附源代码)| 来源: 网络整理| 查看: 265

                    基于蚁群算法解决多模式资源约束项目调度问题

最近选修了张超勇教授的项目管理学,最后这门课留下了一门大作业,这里我选做是资源约束下的项目调度问题,通过使用元启发式的蚁群算法求解了这个问题,由于时间比较仓促这个算法还有很多可以优化的地方,后面有时间我在将其优化,下面是pose出了这个问题是什么以及我的蚁群算法的实现过程,并附录python源码。

如果不了解什么是蚁群算法可以看这一篇博文:https://blog.csdn.net/weixin_42715356/article/details/83591619

python源码github下载地址:https://github.com/Greatpanc/Scheduling-problems

一、问题综述

资源约束项目调度问题RCPSP(Resource-Constrained Project Scheduling Problem)一般以找出工期最短(或耗费最小)的调度方案为优化目标,项目由一系列相互关联的活动组成,调度决策需要同时满足项目活动之间的时序约束和资源约束,以使管理目标最优。

多模式资源约束项目调度问题MRCPSP(Multi-mode Resource Constrained Project Scheduling Problem)是传统RCPSP的扩展,其考虑项目中每个活动都有多种可供选择的执行模式,每种执行模式对应一定的工期和资源需求,在调度时能够动态地选择活动执行模式及其属性。相对于单模式调度问题,MRCPSP在资源约束条件下,不仅要选择项目中各活动的执行模式,还要确定活动的开工期,是一类带有复杂非线性特征NP-难问题,精确类算法很难求解。

本文首先对于多模式资源约束项目调度问题(RCPSP)提出假设,然后基于假设的前提将RCPSP转换成一个数学问题,并针对这个数学问题,用数学公式和AOA图刻画出来,后面在问题明了的情况下使用元启发式算法中比较经典的蚁群优化算法来实现MRCPSP的优化,最后用实验结果验证算法的正确性与有效性。

对于蚁群算法,本文首先采用基于活动执行模式的一维实数编码方案,并通过这套编码方案将活动网络图构建出来,并定义了一个基于信息素项和启发式项的的轮盘赌算法实现的状态转移策略,RCPSP的解就是蚂蚁在资源约束和时序约束下在该图上按状态转移策略爬走完毕的过程。

二、数学模型建立过程 2.1 模型假设

(1)每种活动均有多种执行模式可供选择,每种执行模式对应不同的工期和资源需求,执行时在该工期内占用资源,执行完毕后释放这些资源。

(2)每个活动只能选择其中一种执行模式执行,而且一旦执行就不能被中断和更改执行模式直至完成;

(3)活动具有时序约束和资源约束;

(4)每个活动所需资源量均小于资源的最大供应量;

(5)对事件和活动进行编号时,确保事件按时序约束非严格递增,即事件j的所有紧前事件编号必须小于j,后继事件编号必须大于j;

2.2 数学问题抽象

MRCPSP是研究在工程项目活动网络图的基础上满足资源和活动时序等约束来安排各活动的执行模式和开始时间,以使项目工期最短的调度方法,其实质是寻求各个活动执行模式和开工时间的最佳组合。

为了更好描述活动时序约束,本文采用AOA(activity-on-arrow)有向活动网络图表示整个项目活动及其时序约束关系,如图1所示。其中结点代表事件,箭线代表活动,方向代表时序约束。一个活动能够执行的前提是其箭头前的事件都被完成了。

为方便求解该问题,引入虚事件0作为所有没有紧前事件的事件的紧前事件,引入虚事件N+1作为没有后续事件的事件的后续事件,且规定虚事件0和虚事件N+1只有一种执行模式、不占用资源和执行时间为0,代表整个项目的初始状态和结束状态。Fm+1为结束事件(N+1)对应的唯一活动(M+1)的最早开始时间,即项目总工期,如图2所示。

因此,资源约束下的项目调度问题可以抽象成如下数学问题:在资源约束下,将项目描述转换为AOA有向活动网络图,并从虚事件0出发到虚事件N+1结束,将所有活动(边)遍历且只遍历一次所花的时间最短的数学问题。

2.3 模型建立

设项目包含N个事件M个活动,且令(i,j)代表从事件i到事件j的一项活动;P(j)代表事件j的紧前事件集合;S(j)代表事件j的后继事件集合;d{_{ij}}^{k}代表完成活动(i,j)所要花费的工期。

设项目所有活动共享V种可更新资源,其中第v种资源的供应量为R_{v},且令r_{ijv}^{k} 代表活动(i,j)用模式k执行时单位工期所需资源v的数量;

F_{i}代表从初始事件0开始执行到事件i时所耗费的总工期,其中F_{0}=0。

优化目标是在满足资源约束和时序约束前提下,通过确定合理的活动执行顺序、相应的执行模式以及活动的开始时间,使项目的工期最短,即F_{N+1}最小。对应数学表达式为:

                                                                                          f(x)=Min(F_{N+1})                  (1)

约束条件为:

上述各约束式的含义:

式(2)表示资源约束,SA_{t}表示在时期t时正在进行的活动集合;

式(3)表示时序约束,表达的是任意事件j的前趋活动(i,j)必须在事件j的后继活动(j,l) 开始之前完成;

式(4)表示活动(i,j)仅能用一种模式执行;

式(5)定义了活动的时间窗约束,即活动(i,j)必须在其时间窗[ET_{ij}LT_{ij} ]内完成(如果活动(i,j)在时间窗[ET_{ij}LT_{ij} ]中的一个时点t上被完成,则Y_{ij}^{t},否则为0)。

三、蚁群算法在MRCPSP中的设计 3.1 蚁群算法简介

蚁群系统(Ant System或Ant Colony System)是由意大利学者Dorigo、Maniezzo等人于20世纪90年代提出来的。他们在研究蚂蚁觅食的过程中,发现单个蚂蚁的行为比较简单,但是蚁群整体却可以体现一些智能的行为。例如蚁群可以在不同的环境下,寻找最短到达食物源的路径。这是因为蚁群内的蚂蚁可以通过某种信息机制实现信息的传递。后又经进一步研究发现,蚂蚁会在其经过的路径上释放一种可以称之为“信息素”的物质,蚁群内的蚂蚁对“信息素”具有感知能力,它们会沿着“信息素”浓度较高路径行走,而每只路过的蚂蚁都会在路上留下“信息素”,这就形成一种类似正反馈的机制,经过一段时间后,整个蚁群就会沿着最短路径到达食物源了。

蚂蚁找到最短路径要归功于信息素和环境,假设有两条路可从蚁窝通向食物,开始时两条路上的蚂蚁数量差不多:当蚂蚁到达终点之后会立即返回,距离短的路上的蚂蚁往返一次时间短,重复频率快,在单位时间里往返蚂蚁的数目就多,留下的信息素也多,会吸引更多蚂蚁过来,会留下更多信息素。而距离长的路正相反,因此越来越多的蚂蚁聚集到最短路径上来。

在多模式资源约束下的项目调度问题中,蚂蚁是走在一个由活动及其可选模式所构成的图上的,该情况下的项目调度问题的解是由活动的序列及其模式所出现的顺序所构成的一个路径图,其目标是希望所有活动都完成所耗费的时间最短,而蚂蚁的信息素是释放在活动及其对应选择的模式上的,因此在蚁群算法的设计中,最短项目工期所构成的路径中,随着信息素的更新,使得其各个路径点的信息素含量会积累得最高,蚂蚁选择走的概率最大。

3.2 蚁群算法在MRCPSP中的实现流程

 

(1)初始化过程

启发项矩阵:启发项矩阵记录的是每一个活动本身被选择的固有可能性,其值大小与所选贪心策略有关,本文的某活动贪心策略选取为该活动的所需资源的总量与总资源数的比值,式(6)为其计算公式:

其中r_{ij}^{k}代表活动i在k模式(k取0和1)下所需j资源(j取1、2、3)的数量。

信息素矩阵:信息素含量矩阵记录的是蚂蚁在各个路径上留下的信息素的值,本文各个活动信息素含量初始化所采用的策略为品质因子与活动数的比值,式(7)为其计算公式:

其中Q为品质因子,代表的是蚂蚁在走完一条路径后留下的信息素总量,activity_num为总活动数。

(2)状态转移策略

状态转移策略是蚂蚁在候选活动列表中选择下一个路径所采取的方式,本文主要采取轮盘赌策略:

蚂蚁通过当前活动的启发项和信息素强度信息综合考虑决定下一个活动选取的概率,式(8)为其计算公式:

其中,\alpha是信息素重要程度因子,\beta是启发函数重要程度因子。

然后根据所求的各个活动出现概率制作出一个轮盘赌机器,并随机生成一个0到1的小数,选取这个小数对应轮盘赌机器上活动,这样既保证了信息素强度大的活动优先被选取,还能够增大蚂蚁搜索的随机性。

(3)信息素更新

信息素更新主要有信息素挥发和蚂蚁再次留下新的信息素两部分组成,本文对于信息素更新分为两部分:

第一部分是对信息素进行局部更新,蚂蚁每选好要执行的活动及其模式后就对该活动进行更新,式(9)为其更新公式:

第二部分是精英蚂蚁策略,在所有蚂蚁走完后,选取所消耗工期最短的那只蚂蚁,极大的增强其走过的路径上的信息素含量,使得接下来的蚂蚁有更高的概率从该路径上走,其公式如式(10)所示:

(4)完成某项活动的最小开始时间计算方式

完成某项活动的最小开始时间必须满足时序约束和资源约束:时序约束是指该活动的最早开始时间必须要大于该活动的后序事件最早被完成的时间;资源约束是指当前资源池内的资源数满足该活动被执行所需要的资源数,如果不满足则需等待其他活动执行完释放资源后执行,即当前时间应该更新为其他活动释放资源的时刻。

(5)活动列表更新方式

活动等待列表更新方式包括两部分,第一部分为某活动被执行后从活动列表中删除,第二部分为某活动的后续事件被执行完后,应该对其其紧后事件的所有紧前事件进行判断,如果均被执行完,则将该活动的后续事件到其紧后事件对应的活动添加到活动等待列表当中。

四、算法测试与结果分析 4.1测试数据说明

本算法采用了一个10事件14活动的项目调度数据集来验证ACS算法在解决该类问题上的可行性和有效性。

该项目需要三种可更新资源R1、R2和R3,单位工期限量分别为8、16和20,除虚事件对应活动外,项目中每个活动均有两种可执行模式:通常模式(N)和通过增加资源缩短工期的压缩模式(C),不同执行模式下各活动的工期(T)及所需资源如表各1所示。

表格 1 不同模式下各活动的具体参数

活动

通常模式(N)

压缩模式(C)

活动

通常模式(N)

压缩模式(C)

工期

资源

R1,R2,R3

工期

资源

R1,R2,R3

工期

资源

R1,R2,R3

工期

资源

R1,R2,R3

1

5

(3,2,6)

3

(4,5,8)

8

8

(3,2,6)

6

(5,4,8)

2

3

(2,3,4)

2

(3,4,6)

9

10

(7,6,4)

7

(10,8,7)

3

6

(3,2,7)

4

(4,4,9)

10

6

(3,2,5)

5

(3,4,6)

4

5

(2,3,10)

4

(3,4,12)

11

5

(3,4,8)

3

(4,5,10)

5

8

(6,5,5)

6

(8,7,6)

12

4

(2,3,7)

3

(4,4,9)

6

5

(2,3,8)

3

(3,4,9)

13

9

(2,3,9)

5

(4,5,10)

7

9

(5,4,4)

6

(9,6,6)

14

12

(3,2,10)

8

(5,4,12)

表示该项目的时序约束的AOA图如图4所示。

该项目的最佳调度总工期为30天,其调度方案如表格2所示:

表格 2最佳项目调度方案

执行顺序

1-3-6-2-7-4-5-10-8-11-14-9-13-12

执行模式

C-C-C-C-N-N-C-N-N-N-C-C-C-N

时间

0-4-5-5-7-7-13-13-14-19-21-27-30

4.2算法参数项设置 ant_num = 10 # 蚂蚁个数 alpha = 0.1 # 信息素重要程度因子 beta = 1 # 启发函数重要程度因子 rho = 0.2 # 信息素的挥发速度 Q = 1 # 品质因子 itermax = 1000 # 最大迭代次数

 

4.3运行结果分析及改进建议

通过执行算法10次,统计结果见附录A,项目最短工期32天,平均工期为33.1天,其统计结果表格3所示。

表格 3ACS算法运行结果统计

最短时间

算法求得的次数

31

0

32

1

33

7

34

2

由此可见,该算法虽然没有得到最优解,但所求解比较稳定能够全局收敛且误差不太大,只有3.1天,此结果在工程应用中能够接受。

为了进一步减少误差,可以从蚂蚁探索过程中结合其他算法(如遗传算法)的方式增加蚂蚁向其他路径探索能力和改进信息素更新规则如根据前项活动所选模式来决定当前活动及模式的选择。

五、MRCPSP问题蚁群算法源码(Python) import numpy as np import copy """ datasets=[] #datasets[i,j,k],i取活动编号,j代表:0——通常模式,k代表:1——R1 # 1——压缩模式 2——R2 # 3——R3 # 4——工期 eventlist=[] #event[i]代表为第i个事件节点,EventNode对象数组 activitylist #activitylist[i,j],i取活动编号,j代表:0——该活动的前置事件 1——该活动的后置事件 event2activity #event2activity[i,j] i取前置事件id号,j取后置事件id号,其值为对应的活动号 """ """ 事件类:属性 —— its_id :事件的id号 tc:时间约束,说明完成该事件开始时间必须大于该值 predev[i]:紧前事件集合 nextdev[i][j]:j=0第i个紧后事件对应的id号,j=1是执行该事件所对应的活动 """ class EventNode: def __init__(self,its_id=0): self.its_id =its_id self.tc=0 #时间约束 self.predev =[] self.nextdev=[] """ 函数名:getData() 函数功能: 从外界读取项目数据并处理 输入 无 输出 1 datasets:datasets[i,j,k],不同模式下各活动的具体参数矩阵,具体说明见数据说明部分 2 eventlist:事件对象列表,eventlist[event_id]代表event_id事件的对象 3 activitylist:activitylist[i,j]记录i活动的前后置事件的事件号,具体说明见数据说明部分 4 activity_num:总活动数 5 event_num:总事件数 6 event2activity:event2activity[i,j],通过前(i)后(j)置事件id号索取活动编号矩阵 7 R:R[i],i资源的总量 其他说明:无 """ def getData(): datafile=open("data.txt") dataflame=datafile.read() datafile.close() data_temp=dataflame.split("\n") event_num=int(data_temp[0].split("%*%")[0])+2 activity_num=int(data_temp[0].split("%*%")[1])+2 R=[] R.append(int(data_temp[0].split("%*%")[2])+1) R.append(int(data_temp[0].split("%*%")[3])+1) R.append(int(data_temp[0].split("%*%")[4])+1) eventlist=construct_graph(event_num) datasets=np.zeros((activity_num,2,4),dtype=np.int) activitylist=np.zeros((activity_num,2),dtype=np.int) event2activity=np.zeros((activity_num,activity_num),dtype=np.int) for temp in data_temp[2:]: data=temp.split("/") data1=data[0].split(" ") i=int(data1[0]) datasets[i,0,0]=int(data1[1]) # 工期 datasets[i,0,1]=int(data1[2]) # R1资源 datasets[i,0,2]=int(data1[3]) # R2资源 datasets[i,0,3]=int(data1[4]) # R3资源 datasets[i,1,0]=int(data1[5]) # 工期 datasets[i,1,1]=int(data1[6]) # R1资源 datasets[i,1,2]=int(data1[7]) # R2资源 datasets[i,1,3]=int(data1[8]) # R3资源 data2=data[1].split("-") cur_node=int(data2[0]) next_node=int(data2[1]) eventlist[cur_node].nextdev.append([next_node,i]) eventlist[next_node].predev.append(cur_node) event2activity[cur_node,next_node]=i activitylist[i,0]=cur_node activitylist[i,1]=next_node return datasets,eventlist,activitylist,activity_num,event_num,event2activity,R """ 函数名:get_etatable(activity_num) 函数功能: 根据总活动数初始化启发项矩阵 输入 1 activity_num:总活动数 输出 1 etatable:启发项矩阵,etatable[i,j]代表i活动j模式(j=0-正常模式,j=1-压缩模式)的启发值 其他说明:无 """ def get_etatable(activity_num): etatable=np.zeros([activity_num,2]) for i in range(activity_num): etatable[i,0]=(datasets[i,0,1]+datasets[i,0,2]+datasets[i,0,3])/(R[0]+R[1]+R[2]) etatable[i,1]=(datasets[i,1,1]+datasets[i,1,2]+datasets[i,1,3])/(R[0]+R[1]+R[2]) return etatable """ 函数名:init_pheromonetable(activity_num) 函数功能: 根据总活动数初始化信息素矩阵 输入 1 activity_num:总活动数 输出 1 pheromonetable:信息素矩阵,pheromonetable[i,j]代表 i活动j模式(j=0-正常模式,j=1-压缩模式)的信息素含量 其他说明:无 """ def init_pheromonetable(activity_num): pheromonetable=np.ones((activity_num,2))*Q/activity_num return pheromonetable """ 函数名:construct_graph(event_num) 函数功能: 根据事件总数生成对象列表 输入 1 event_num:总事件数 输出 1 eventlist:事件对象列表,eventlist[event_id]代表event_id事件的对象 其他说明:无 """ def construct_graph(event_num): eventlist=[] for i in range(event_num): eventnode=EventNode(i) eventlist.append(eventnode) return eventlist """ 函数名:check_predev(eventnode) 函数功能: 检测是否有紧前事件 输入 1 eventnode:事件对象 输出 1 True:有紧前事件 False:无紧前事件 其他说明:无 """ def check_predev(eventnode): return True if eventnode.predev else False """ 函数名:check_nextdev(eventnode) 函数功能: 检测是否有紧后事件 输入 1 eventnode:事件对象 输出 1 True:有紧后事件 False:无紧后事件 其他说明:无 """ def check_nextdev(eventnode): return True if eventnode.nextdev else False """ 函数名:check_finish(eventlist_finish) 函数功能: 检查是不是所有事件(不包括结束虚事件)都完成 输入 1 eventlist_finish:当前完成的活动列表 输出 1 True:是 False:否 其他说明:无 """ def check_finish(eventlist_finish): for i in range(event_num-2): if i not in eventlist_finish: return False if event_num-2 in eventlist_finish: return False return True """ 函数名:parameters_init() 函数功能: 蚂蚁开始走的过程中的参数初始化函数 输入 1 无 输出 1 无 其他说明: 对当前事件cur_time=0, R1当前资源量=R1资源总量 R2当前资源量=R2资源总量 R3当前资源量=R3资源总量 """ def parameters_init(): cur_time=0 res_time=[] eventlist_finish=[] eventlist_finish.append(0) activity_waited=[] activity_waited.append(0) activitylist_finish=[] R1=R[0] R2=R[1] R3=R[2] return cur_time,res_time,eventlist_finish,\ activity_waited,activitylist_finish,R1,R2,R3 """ 函数名: timeseries_constraint(next_eventnode) 函数功能: 时序约束,当前活动的开始时间应该要满足后续事件能够执行的最早时间 函数执行完后当前时间满足了资源约束 输入 1 activity:活动id号 其他说明:无 """ def timeseries_constraint(activity): next_eventnode=activitylist[activity,1] global cur_time if eventlist[next_eventnode].tc>cur_time: cur_time=eventlist[next_eventnode].tc time_ahead(cur_time) """ 函数名:resource_constraint(activity,mode) 函数功能: 当前活动的开始时间应该要满足当前资源能够支持其执行, 函数执行完后当前时间满足了资源约束 输入 1 activity:活动id号 输入 2 mode:模式类型,mode=0代表通常模式,mode=1代表压缩模式 其他说明: 对当前事件cur_time=0, R1当前资源量=R1资源总量 R2当前资源量=R2资源总量 R3当前资源量=R3资源总量 """ def resource_constraint(activity,mode): global R1,R2,R3,cur_time while True: if(R1>=datasets[activity,mode,1] and R2>=datasets[activity,mode,2] and R3>=datasets[activity,mode,3]): R1-=datasets[activity,mode,1] R2-=datasets[activity,mode,2] R3-=datasets[activity,mode,3] break elif res_time: cur_time=min([i[0] for i in res_time]) time_ahead(cur_time) else: print([R1,R2,R3,activity]) exit() """ 函数名:update_list(activity,mode) 函数功能: 更新活动列表和已完成时间列表信息, 输入 1 activity:活动id号 输入 2 mode:模式类型,mode=0代表通常模式,mode=1代表压缩模式 其他说明: 无 """ def update_list(activity,mode): next_eventnode=activitylist[activity,1] activity_waited.remove(activity) if len(activity_waited)==0 and check_finish(eventlist_finish): eventlist_finish.append(event_num-2) activity_waited.append(activity_num-1) elif next_eventnode!=event_num-2 and next_eventnode not in eventlist_finish: eventlist_finish.append(next_eventnode) for event in eventlist[next_eventnode].nextdev: eventnode=eventlist[event[0]] flag=True for i in eventnode.predev: if i not in eventlist_finish: flag=False if flag: for i in eventnode.predev: activity_waited.append(event2activity[i,eventnode.its_id]) res_time.append([cur_time+datasets[activity,mode,0],activity,mode]) activitylist_finish.append([activity,mode,cur_time]) """ 函数名:go_a_step(activity,mode) 函数功能: 蚂蚁按活动activity的模式mode行进一步 输入 1 activity:活动id号 输入 2 mode:模式类型,mode=0代表通常模式,mode=1代表压缩模式 输出 1 无 其他说明: 时序约束部分:当前活动的开始时间应该要满足后续事件能够执行的最早时间 资源约束部分:当前活动的开始时间应该要满足当前资源能够支持其执行 更新事件过程:当前活动执行完后,在当前活动等待列列表中删去该活动, 并把符合约束的后续活动加入到当前活动等待列表中 """ def go_a_step(activity,mode): global R1,R2,R3,cur_time #时序约束,执行后当前时间满足时序约束 timeseries_constraint(activity) #资源约束,执行后当前时间满足资源约束 resource_constraint(activity,mode) #更新活动列表和已完成时间列表信息 update_list(activity,mode) """ 函数名:time_ahead(time) 函数功能: 删去当前资源时间结束列表中时间小于time的项,并更新当前资源量 输入 1 time:当前时间 输出 1 无 其他说明:无 """ def time_ahead(time): global R1,R2,R3 for res in res_time: if res[0]0)[0][0] # 下一个活动 pheromonetable[activity_waited[int(k/2)],k%2]=\ (1-rho)*pheromonetable[activity_waited[int(k/2)],k%2]+rho*Q/activity_num go_a_step(activity_waited[int(k/2)],k%2) # 将该只蚂蚁探求的结果保存 activitylist_result.append(activitylist_finish) # 对该代蚂蚁的最佳值与当前最佳值进行比较并更新当前最佳值及路径 activitylist_result=np.array(activitylist_result) aver_time[iter] = activitylist_result[:,-1,2].mean() if iter == 0: time_best[iter] = activitylist_result[:,-1,2].min() activitylist_best[iter] = activitylist_result[activitylist_result[:,-1,2].argmin()].copy() else: if activitylist_result[:,-1,2].min() > time_best[iter-1]: time_best[iter] = time_best[iter-1] activitylist_best[iter] = activitylist_best[iter-1].copy() else: time_best[iter] = activitylist_result[:,-1,2].min() activitylist_best[iter] = activitylist_result[activitylist_result[:,-1,2].argmin()].copy() # 将该代蚂蚁所探求的所有解保存下来 result_record.append(activitylist_result) # 更新信息素 changepheromonetable = np.zeros((activity_num,2)) for j in range(activity_num-1): activity=activitylist_best[iter][j,0] mode=activitylist_best[iter][j,1] time=activitylist_best[iter][j,2] changepheromonetable[activity,mode]+=Q/activitylist_best[iter][-1,2] pheromonetable = pheromonetable + changepheromonetable # 将蚂蚁增加一代 iter+=1 # 结果输出 print(time_best[itermax-1]) print(activitylist_best[itermax-1])

 

 

 

 

 

 

 

 

 

 

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3