0

我正在使用诸如https://github.com/google/or-tools/blob/master/examples/python/task_allocation_sat.py之类的 or-tools 解决方案将任务分配到每个任务具有周期性且每个时间槽具有容量的时隙(最多可以在时隙内放置多少任务)。现在我想用基于任务持续时间的约束替换容量约束,其中每个任务都有持续时间,每个槽都有最大持续时间,所以每个时间槽只能有与其在最大持续时间限制下一样多的任务。但我不明白如何建立约束,即“检查插槽中任务的持续时间总和,它应该小于 max_slot_duration”。

def main():
    available = [
        [1, 0, 0, 0, 0, 1, 0],
        [1, 1, 1, 1, 1, 0, 0],
        [0, 1, 1, 1, 1, 0, 0],
        [0, 0, 1, 1, 1, 0, 0],
        [0, 0, 1, 1, 1, 0, 0],
    ]

    periodicity = [
        2, 2, 1, 1, 3
    ]

    capacity = 3

    max_slot_duration = 124

    task_durations = [
        15, 20, 30, 50, 10
    ]

    ntasks = len(available)
    nslots = len(available[0])

    all_tasks = range(ntasks)
    all_slots = range(nslots)

    model = cp_model.CpModel()
    assign = {}
    for task in all_tasks:
        for slot in all_slots:
            assign[(task, slot)] = model.NewBoolVar('x[%i][%i]' % (task, slot))
    count = model.NewIntVar(0, nslots, 'count')
    slot_used = [model.NewBoolVar('slot_used[%i]' % s) for s in all_slots]

    for task in all_tasks:
        model.Add(
            sum(assign[(task, slot)] for slot in all_slots if available[task][slot] == 1) == periodicity[task])

    for slot in all_slots:
        model.Add(
            sum(assign[(task, slot)] for task in all_tasks
                if available[task][slot] == 1) <= capacity)
        for task in all_tasks:
            if available[task][slot] == 1:
                model.AddImplication(slot_used[slot].Not(),
                                     assign[(task, slot)].Not())
            else:
                model.Add(assign[(task, slot)] == 0)

    model.Add(count == sum(slot_used))

    model.Minimize(count)

    solver = cp_model.CpSolver()
    solver.parameters.log_search_progress = True
    solver.parameters.num_search_workers = 6
    solution_printer = TaskAssigningSolutionPrinter(all_tasks, all_slots, assign)
    status = solver.Solve(model, solution_printer)
    print(solution_printer.get_solution())

和解决方案打印机

class TaskAssigningSolutionPrinter(cp_model.CpSolverSolutionCallback):
    def __init__(self, tasks, slots, assign):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.__tasks = tasks
        self.__slots = slots
        self.__assign = assign
        self.__solutions = {}
        self.__solution_count = 0

    def on_solution_callback(self):
        self.__solutions[self.__solution_count] = {}
        for slot in self.__slots:
            self.__solutions[self.__solution_count][slot] = []
            for task in self.__tasks:
                if self.Value(self.__assign[(task, slot)]) == 1:
                    self.__solutions[self.__solution_count][slot].append(task)
        self.__solution_count += 1

    def get_solution(self):
        return self.__solutions
4

1 回答 1

0

感谢 Stradivari (Xiang) 用户 or-tools 找到了不和谐通道解决方案。我们在这里只需要总和约束。

for slot in all_slots:
  model.Add(sum(assign[(task, slot)] * task_durations[task] for task in all_tasks if
                      available[task][slot] == 1) <= max_slot_duration * workers[slot])
于 2021-05-14T05:57:14.873 回答