我正在使用 Google OR-Tools 来解决技术人员处理机器修理任务的问题。机器一开始都是坏的,目标是修理机器并最小化总损失输出(即机器的累积停机时间)。任务之间有一个旅行时间,并且必须有车辆在任务之间运送技术人员。车辆比技术人员少,因此可能存在技术人员必须等待车辆空闲后再前往下一项任务的情况。例如:三名技术人员、八项任务、两辆车。
技术人员可以处理任何任务,并且任务持续时间是预先知道的。
我在下面制作了一个代码(基于带有旅行时间的作业车间),它以 2 个单位的固定旅行时间围绕任务安排技术人员。
但是,我不知道如何对可用的运输约束进行建模。(目前,技术人员可以在完成任务后立即移动)。谁能让我开始了解如何添加此功能?
我的想法是:
- 这实际上是一个车辆路线问题(而不是一个
cp_sat
)并且任务是一种时间窗口吗?或者它是部分车辆路线和部分调度 - 如果是这样,我会使用什么求解器? - 我是否需要一个
trans_for_tech
变量,=1
当车辆在使用时加上一个约束,trans_for_tech<=1
以避免车辆被重复预订。然后这个变量会=1
在电路的弧线被使用时(即它的布尔文字为真)。 - 车辆本身是否真的执行“任务”(即运输),这些“任务”(即运输)需要通过开始/结束/无重叠条件进行建模,
NewOptionalInterval
以便在使用电路的弧线时启用它们。这些的开始/结束实际上是任务间隔的补充吗?然后,从概念上讲,将有两组任务(技术人员工作和车辆运输),这些任务将被安排以最大限度地减少损失的时间。 - 由于运送技术人员的车辆访问不同的任务地点,因此这些车辆也有旅行时间,但我的想法是暂时忽略这一点,只是将车辆建模为可在任务中的任何地方立即可用但受到限制的资源,因此它不能t 同时在两个地方(即变量有 a
sum<=1
或 aAddBoolOr
或类似的东西)。
下面是我的代码,用于技术人员准备好后移动的情况(相当于每个人都有自己的交通工具)。
from ortools.sat.python import cp_model
import pandas as pd
import time
# number technicians
n_techs = 2
#different scenarios for testing {machine:duration to fix}
#this set of tasks runs very quickly
durations={0:3, 1:1, 2:1, 3:2, 4:1}
#this scenario of tasks takes a couple minutes to finish
# durations={0:3, 1:1, 2:1, 3:2, 4:1, 5:2, 6:3, 7:4, 8:1}
#travel times between tasks (constant for now)
travel_time=2
model = cp_model.CpModel()
n_tasks=len(durations)
all_techs=range(n_techs)
all_tasks=range(n_tasks)
#horizon is sum of all durations and max travel time * number of tasks (conservative)
horizon=sum(durations.values())+n_tasks*travel_time
#boolean variable for each combination of technician and task. if True then technician works on this task
tech_for_task={}
for tech in all_techs:
for task in all_tasks:
tech_for_task[(tech,task)]=model.NewBoolVar('tech_{0}_on_task_{1}'.format(tech,task))
#condition to that only one tech works on each task
for task in all_tasks:
model.Add(sum([tech_for_task[(tech,task)] for tech in all_techs])==1)
#start,end and interval for each task
starts = {}
ends = {}
intervals = {}
for tech in all_techs:
for task in all_tasks:
starts[(tech,task)] = model.NewIntVar(0, horizon, 'tech_{0}_task_{1}_start'.format(tech,task))
ends[(tech,task)] = model.NewIntVar(0, horizon, 'tech_{0}_task_{1}_end'.format(tech,task))
#this optional interval is only live when a technician is working on a task (tech_for_task=True)
intervals[(tech,task)]=model.NewOptionalIntervalVar(starts[(tech,task)],
durations[task],
ends[(tech,task)],
tech_for_task[(tech,task)],
'tech_{0}_task_{1}_opt_interval'.format(tech,task))
#the tasks for each technician should not overlap
for tech in all_techs:
model.AddNoOverlap([intervals[(tech,task)] for task in all_tasks])
# Transition times and transition costs using a circuit constraint
for tech in all_techs:
arcs = []
for i in all_tasks:
tti=(tech,i)
# Initial arc from the dummy node (0) to a task.
start_lit = model.NewBoolVar('')
arcs.append([0, i + 1, start_lit])
# If this task is the first, set start to 0.
model.Add(starts[tti] == 0).OnlyEnforceIf(start_lit)
# Final arc from an arc to the dummy node.
arcs.append([i + 1, 0, model.NewBoolVar('')])
# Self arc if the task is not performed.
arcs.append([i + 1, i + 1, tech_for_task[tti].Not()])
for j in all_tasks:
if i==j:
continue
else:
ttj=(tech,j)
#the indexing of the arcs[i+1,j+1,lit] is one more than the nodes due to the dummy node (0)
lit = model.NewBoolVar('{0} follows {1}'.format(j,i))
arcs.append([i + 1, j + 1, lit])
model.AddImplication(lit, tech_for_task[tti]) # if lit=True --> tech_for_task[tti]=True
model.AddImplication(lit, tech_for_task[ttj]) # if lit=True --> tech_for_task[ttj]=True
# add reified transition to link the literals with the times of the tasks
model.Add(starts[ttj] == ends[tti] + travel_time).OnlyEnforceIf(lit)
if arcs:
model.AddCircuit(arcs)
# minimize sum of end time points of tasks. ends=0 when tech_for_task=False so not cause problem
obj_fun=[]
for tech in all_techs:
for task in all_tasks:
obj_fun.append(ends[(tech,task)])
model.Minimize(sum(obj_fun))
start = time.time()
solver = cp_model.CpSolver()
status = solver.Solve(model)
end = time.time()
#if problem solved then put in dataframe to show results easily
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('solved ok')
c_starts={}
c_ends={}
c_tech_for_task={}
c_tech={}
c_task={}
for tech in all_techs:
for task in all_tasks:
tt=(tech,task)
c_starts[tt]=solver.Value(starts[tt])
c_ends[tt]=solver.Value(ends[tt])
c_tech_for_task[tt]=solver.Value(tech_for_task[tt])
c_tech[tt]=tech
c_task[tt]=task
df=pd.DataFrame()
df['task']=c_task.values()
df['tech']=c_tech.values()
df['start']=c_starts.values()
df['end']=c_ends.values()
df['tech_for_task']=c_tech_for_task.values()
#sort chronologically
df=df.sort_values(by='start').reset_index(drop=True)
df['duration']=df['task'].map(durations)
#get only the tasks which were done by a tech (tech_for_task=0 are not meaningful)
df=df[df.tech_for_task==1]
for tech in all_techs:
df1=df[df.tech==tech]
print()
print('Tasks for tech {0}'.format(tech))
print(df1)
elif status==cp_model.MODEL_INVALID:
print('Model invalid :-(')
print('Total time is:',end - start)