21

这是一个关于一个有点复杂的问题的算法问题。基础是这样的:

基于可用时隙保留时隙的调度系统。插槽有一定的标准,我们称之为标签。如果可用时隙的标签集是保留时隙的超集,则保留通过这些标签与可用时隙匹配。

作为一个具体的例子,以这个场景为例:

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+

在 11:00 到 12:30 之间可以进行标签预订AB从 12:00 到 13:30可以使用,C并且D从大约 12:00 到 12:30 有重叠。

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+
  xxxxxx
  x A  x
  xxxxxx

这里A已经进行了预订,因此在 11:15-ish 和 12:00-ish-ish 之间没有其他预订AB不能进行其他预订。

简而言之就是这个想法。可用插槽没有特定限制:

  • 一个可用的插槽可以包含任意数量的标签
  • 任意数量的插槽可以随时重叠
  • 插槽是任意长度的
  • 预订可以包含任意数量的标签

系统中唯一需要遵守的规则是:

  • 添加预留时,至少有一个剩余的可用插槽必须与预留中的所有标签匹配

澄清一下:当同时有两个可用的插槽时,例如 tag A,那么当时可以进行两次预订A,但不能再进行了。

我可以使用间隔树的修改实现;作为快速概述:

  • 所有可用的插槽都添加到间隔树中(保留重复/重叠)
  • 所有保留的插槽都被迭代并且:
    • 从树中查询与预订时间匹配的所有可用时隙
    • 与预订标签匹配的第一个被切片,并从树中删除切片

当该过程完成后,剩下的就是剩余的可用槽片,我可以查询是否可以为特定时间进行新的预订并添加它。

数据结构看起来像这样:

{
  type: 'available', 
  begin: 1497857244, 
  end: 1497858244, 
  tags: [{ foo: 'bar' }, { baz: 42 }]
}
{
  type: 'reserved', 
  begin: 1497857345, 
  end: 1497857210, 
  tags: [{ foo: 'bar' }]
}

标签本身就是键值对象,它们的列表就是一个“标签集”。如果有帮助,可以将它们序列化;到目前为止,我使用的是一种 Pythonset类型,这使得比较很容易。时隙开始/结束时间是树中的 UNIX 时间戳。我并不是特别喜欢这些特定的数据结构,如果有用的话可以重构它们。


我面临的问题是这不是没有错误的。每隔一段时间,预订就会潜入与其他预订发生冲突的系统中,我还不知道这到底是怎么发生的。当标签以复杂的方式重叠时,它也不是很聪明,需要计算最佳分布,以便所有预留可以尽可能地适合可用的插槽;事实上,目前在重叠场景中如何将预订与可用时隙进行匹配是不确定的。

我想知道的是:区间树主要用于此目的,但我当前的系统将标签集匹配作为附加维度添加到此系统是笨重且固定的;有没有一种数据结构或算法可以优雅地处理这个问题?

必须支持的操作:

  1. 向系统查询与特定标签集匹配的可用时隙(考虑可能降低可用性但它们本身不是所述标签集的一部分的预留;例如,在上面的示例中查询 的可用性B)。
  2. 确保不能将没有匹配的可用插槽的预订添加到系统中。
4

3 回答 3

8

您的问题可以使用约束规划来解决。在 python 中,这可以使用python-constraint库来实现。

首先,我们需要一种方法来检查两个插槽是否彼此一致。这是一个函数,如果两个插槽共享一个标签并且它们的帧重叠,则返回 true。在python中,这可以使用以下函数来实现

def checkNoOverlap(slot1, slot2):
    shareTags = False
    for tag in slot1['tags']:
        if tag in slot2['tags']:
            shareTags = True
            break    
    if not shareTags: return True
    return not (slot2['begin'] <= slot1['begin'] <= slot2['end'] or 
                slot2['begin'] <= slot1['end'] <= slot2['end'])

我不确定您是否希望标签完全相同(如 {foo: bar} 等于 {foo: bar})或只有键(如 {foo: bar} 等于 {foo: qux}),但你可以在上面的函数中改变它。

一致性检查

我们可以将 python-constraint 模块用于您请求的两种功能。

第二个功能是最简单的。为了实现这一点,我们可以使用将isConsistent(set)提供的数据结构中的插槽列表作为输入的函数。然后,该函数会将所有插槽提供给 python-constraint 并检查插槽列表是否一致(没有 2 个不应该重叠的插槽,重叠)并返回一致性。

def isConsistent(set):
        #initialize python-constraint context
        problem = Problem()
        #add all slots the context as variables with a singleton domain
        for i in range(len(set)):
            problem.addVariable(i, [set[i]])        
        #add a constraint for each possible pair of slots
        for i in range(len(set)):
            for j in range(len(set)):
                #we don't want slots to be checked against themselves
                if i == j:
                    continue
                #this constraint uses the checkNoOverlap function
                problem.addConstraint(lambda a,b: checkNoOverlap(a, b), (i, j))
        # getSolutions returns all the possible combinations of domain elements
        # because all domains are singleton, this either returns a list with length 1 (consistent) or 0 (inconsistent)
        return not len(problem.getSolutions()) == 0

每当用户想要添加预留槽时,都可以调用此函数。可以将输入槽添加到已经存在的槽列表中,并且可以检查一致性。如果一致,则保留新的插槽。否则,新的插槽重叠并且应该被拒绝。

寻找可用的插槽

这个问题有点棘手。我们可以使用与上述相同的功能,并进行一些重大更改。我们现在不想将新插槽与现有插槽一起添加,而是将所有可能的插槽添加到已经存在的插槽中。然后我们可以检查所有可能的槽与保留槽的一致性,并向约束系统询问一致的组合。

因为如果我们不对其进行任何限制,可能的插槽数将是无限的,所以我们首先需要为程序声明一些参数:

MIN = 149780000 #available time slots can never start earlier then this time
MAX = 149790000 #available time slots can never start later then this time
GRANULARITY = 1*60 #possible time slots are always at least one minut different from each other

我们现在可以继续主函数了。它看起来很像一致性检查,但是我们现在添加了一个变量来发现所有可用的插槽,而不是来自用户的新插槽。

def availableSlots(tags, set):
    #same as above
    problem = Problem()
    for i in range(len(set)):
        problem.addVariable(i, [set[i]])
    #add an extra variable for the available slot is added, with a domain of all possible slots
    problem.addVariable(len(set), generatePossibleSlots(MIN, MAX, GRANULARITY, tags))
    for i in range(len(set) +1):
        for j in range(len(set) +1):
            if i == j:
                continue
            problem.addConstraint(lambda a, b: checkNoOverlap(a, b), (i, j))
    #extract the available time slots from the solution for clean output
    return filterAvailableSlots(problem.getSolutions())

我使用一些辅助函数来保持代码更干净。它们包括在这里。

def filterAvailableSlots(possibleCombinations):
    result = []
    for slots in possibleCombinations:
        for key, slot in slots.items():
            if slot['type'] == 'available':
                result.append(slot)

    return result

def generatePossibleSlots(min, max, granularity, tags):
    possibilities = []
    for i in range(min, max - 1, granularity):
        for j in range(i + 1, max, granularity):
            possibleSlot = {
                              'type': 'available',
                              'begin': i,
                              'end': j,
                              'tags': tags
            }
            possibilities.append(possibleSlot)
    return tuple(possibilities)

您现在可以将函数 getAvailableSlots(tags, set) 与您想要可用插槽和一组已保留插槽的标签一起使用。请注意,此函数确实返回所有一致的可能插槽,因此无需努力找到最大长度或其他优化的位置。

希望这可以帮助!(我让它像你在我的 pycharm 中描述的那样工作)

于 2017-06-20T19:27:10.350 回答
4

这是一个解决方案,我将包括下面的所有代码。

1.创建一个slots表和一个reservations表

示例表

2. 创建一个保留 x 槽的矩阵

根据保留槽组合是否可能,由真值或假值填充

示例布尔组合矩阵

3. 找出允许最多保留槽组合的最佳映射

注意:我当前的解决方案对于非常大的数组的扩展性很差,因为它涉及遍历列表的所有可能排列,其中大小 = 插槽数。我已经发布了另一个问题,看看是否有人能找到更好的方法。但是,这个解决方案是准确的,可以优化

在此处输入图像描述

Python代码源

第1部分

from IPython.display import display
import pandas as pd
import datetime

available_data = [
    ['SlotA', datetime.time(11, 0, 0), datetime.time(12, 30, 0), set(list('ABD'))],
    ['SlotB',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('C'))],
    ['SlotC',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('ABCD'))],
    ['SlotD',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('AD'))],
]

reservation_data = [
    ['ReservationA', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('AD'))],
    ['ReservationB', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('A'))],
    ['ReservationC', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationD', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationE', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('D'))]
]

reservations = pd.DataFrame(data=reservation_data, columns=['reservations', 'begin', 'end', 'tags']).set_index('reservations')
slots = pd.DataFrame(data=available_data, columns=['slots', 'begin', 'end', 'tags']).set_index('slots')

display(slots)
display(reservations)

第2部分

def is_possible_combination(r):
    return (r['begin'] >= slots['begin']) & (r['end'] <= slots['end']) & (r['tags'] <= slots['tags'])

solution_matrix = reservations.apply(is_possible_combination, axis=1).astype(int)
display(solution_matrix)

第 3 部分

import numpy as np
from itertools import permutations

# add dummy columns to make the matrix square if it is not
sqr_matrix = solution_matrix
if sqr_matrix.shape[0] > sqr_matrix.shape[1]:
    # uhoh, there are more reservations than slots... this can't be good
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc[:,'FakeSlot' + str(i)] = [1] * sqr_matrix.shape[0]
elif sqr_matrix.shape[0] < sqr_matrix.shape[1]:
    # there are more slots than customers, why doesn't anyone like us?
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc['FakeCustomer' + str(i)] = [1] * sqr_matrix.shape[1]

# we only want the values now
A = solution_matrix.values.astype(int)

# make an identity matrix (the perfect map)
imatrix = np.diag([1]*A.shape[0])

# randomly swap columns on the identity matrix until they match. 
n = A.shape[0]

# this will hold the map that works the best
best_map_so_far = np.zeros([1,1])

for column_order in permutations(range(n)):
    # this is an identity matrix with the columns swapped according to the permutation
    imatrix = np.zeros(A.shape)
    for row, column in enumerate(column_order):
        imatrix[row,column] = 1

    # is this map better than the previous best?
    if sum(sum(imatrix * A)) > sum(sum(best_map_so_far)):
        best_map_so_far = imatrix

    # could it be? a perfect map??
    if sum(sum(imatrix * A)) == n:
        break

if sum(sum(imatrix * A)) != n:
    print('a perfect map was not found')

output = pd.DataFrame(A*imatrix, columns=solution_matrix.columns, index=solution_matrix.index, dtype=int)
display(output)
于 2017-06-24T06:30:02.790 回答
0

Arnetinker建议的方法都有帮助,但最终还不够。我想出了一种混合方法,可以很好地解决它。

主要的问题是,它是一个三维的问题,很难一次性解决所有的维度。这不仅仅是匹配时间重叠或标签重叠,而是匹配时间片与标签重叠。根据时间甚至标签将插槽与其他插槽匹配很简单,但是将已经匹配的可用性插槽与另一个时间的另一个预订匹配则非常复杂。这意味着,在这种情况下,一个可用性可以涵盖不同时间的两个预订:

+---------+
| A, B    |
+---------+
xxxxx xxxxx
x A x x A x
xxxxx xxxxx

试图将其融入基于约束的编程中需要极其复杂的约束关系,这很难管理。我的解决方案是简化问题……</p>

删除一维

它不是一次解决所有维度,而是极大地简化了问题,在很大程度上消除了时间维度。我通过使用现有的区间树并根据需要对其进行切片来做到这一点:

def __init__(self, slots):
    self.tree = IntervalTree(slots)

def timeslot_is_available(self, start: datetime, end: datetime, attributes: set):
    candidate = Slot(start.timestamp(), end.timestamp(), dict(type=SlotType.RESERVED, attributes=attributes))
    slots = list(self.tree[start.timestamp():end.timestamp()])
    return self.model_is_consistent(slots + [candidate])

为了查询一个特定的槽是否可用,我只取那个特定时间相关的槽(self.tree[..:..]),这将计算的复杂性降低到一个局部子集:

  |      |             +-+ = availability
+-|------|-+           xxx = reservation
  |  +---|------+
xx|x  xxx|x
  |  xxxx|
  |      |

然后我确认那个窄片内的一致性:

@staticmethod
def model_is_consistent(slots):
    def can_handle(r):
        return lambda a: r.attributes <= a.attributes and a.contains_interval(r)

    av = [s for s in slots if s.type == SlotType.AVAILABLE]
    rs = [s for s in slots if s.type == SlotType.RESERVED]

    p = Problem()
    p.addConstraint(AllDifferentConstraint())
    p.addVariables(range(len(rs)), av)

    for i, r in enumerate(rs):
        p.addConstraint(can_handle(r), (i,))

    return p.getSolution() is not None

(我在这里省略了一些优化和其他代码。)

这部分是 Arne 和 tinker 建议的混合方法。它使用 tinker 建议的矩阵算法,使用基于约束的编程来查找匹配槽。基本上:如果有任何解决方案可以将所有预留分配到不同的可用时隙,则此时间片处于一致状态。由于我传入了所需的预留槽,如果模型仍然一致,包括该槽,这意味着预留该槽是安全的。

如果在这个狭窄的窗口内有两个短期预订可分配给相同的可用性,这仍然是有问题的,但是这种可能性很低,结果仅仅是可用性查询的假阴性;误报会更成问题。

寻找可用的插槽

找到所有可用的插槽是一个更复杂的问题,因此再次进行一些简化是必要的。首先,只能查询模型以获取特定标签集的可用性(没有“给我所有全局可用槽”),其次只能以特定粒度(所需槽长度)查询。这非常适合我的特定用例,我只需要为用户提供他们可以保留的插槽列表,例如9:15-9:30、9:30-9:45 等。通过重用上面的代码,这使得算法非常简单:

def free_slots(self, start: datetime, end: datetime, attributes: set, granularity: timedelta):
    slots = []
    while start < end:
        slot_end = start + granularity
        if self.timeslot_is_available(start, slot_end, attributes):
            slots.append((start, slot_end))
        start += granularity

    return slots

换句话说,它只是在给定的时间间隔内遍历所有可能的插槽,并逐字检查该插槽是否可用。这是一个蛮力的解决方案,但效果很好。

于 2017-10-18T13:59:59.213 回答