7

从不会很酷的问题类别中......

我所说的“类似队列的东西”是指支持以下操作:

  • append(entry:Entry) - 将条目添加到队列尾部
  • take(): Entry - 从队列头部删除条目并返回它
  • 提升(entry_id) - 将条目移动到靠近头部的位置;当前占据该位置的条目被移动到旧位置
  • demote(entry_id) - 与promote(entry_id) 相反

可选操作类似于:

  • 提升(entry_id, amount) - 与提升(entry_id) 类似,但您指定职位数
  • 降级(entry_id, amount) - 与promote(entry_id, amount) 相反
  • 当然,如果我们允许 amount 为正数或负数,我们可以将提升/降级方法与单个 move(entry_id, amount) 方法合并

如果可以以分布式方式在队列上执行以下操作(多个客户端与队列交互),那将是理想的:

queue = ...

queue.append( a )
queue.append( b )
queue.append( c )

print queue
"a b c"

queue.promote( b.id )
print queue
"b a c"

queue.demote( a.id )
"b c a"

x = queue.take()
print x
"b"
print queue
"c a"

有没有特别适合这个用例的数据存储?即使多个用户同时修改队列,队列也应始终处于一致状态。

如果不是因为升级/降级/移动要求,就不会有太大问题。

编辑:如果有 Java 和/或 Python 库来完成上述任务,则加分。

解决方案应该非常好地扩展。

4

6 回答 6

9

Redis 支持列表和有序集:http ://redis.io/topics/data-types#lists

它还支持事务和发布/订阅消息。所以,是的,我想说这可以在 redis 上轻松完成。

更新:事实上,大约 80% 已经做了很多次了:http ://www.google.co.uk/search?q=python+redis+queue

其中一些点击可以升级以添加您想要的内容。您必须使用事务来实现提升/降级操作。

可以在服务器端使用 lua 来创建该功能,而不是在客户端代码中使用它。或者,您可以在服务器上围绕 redis 创建一个瘦包装器,它只实现您想要的操作。

于 2012-05-04T01:24:45.360 回答
4

Python:“包括电池”

与其寻找像 RabbitMQ、Redis 或 RDBMS 这样的数据存储,我认为 python 和几个库足以解决这个问题。有些人可能会抱怨这种自己动手的方法是在重新发明轮子,但我更喜欢运行 100 行 Python 代码而不是管理另一个数据存储。

实现优先队列

您定义的操作:追加、获取、提升和降级,描述了一个优先级队列。不幸的是,python 没有内置的优先级队列数据类型。但它确实有一个名为 heapq 的堆库,优先级队列通常以堆的形式实现。这是我实现的满足您要求的优先级队列:

class PQueue:
    """
    Implements a priority queue with append, take, promote, and demote
    operations.
    """
    def __init__(self):
        """
        Initialize empty priority queue.
        self.toll is max(priority) and max(rowid) in the queue
        self.heap is the heap maintained for take command
        self.rows is a mapping from rowid to items
        self.pris is a mapping from priority to items
        """
        self.toll = 0
        self.heap = list()
        self.rows = dict()
        self.pris = dict()

    def append(self, value):
        """
        Append value to our priority queue.
        The new value is added with lowest priority as an item. Items are
        threeple lists consisting of [priority, rowid, value]. The rowid
        is used by the promote/demote commands.
        Returns the new rowid corresponding to the new item.
        """
        self.toll += 1
        item = [self.toll, self.toll, value]
        self.heap.append(item)
        self.rows[self.toll] = item
        self.pris[self.toll] = item
        return self.toll

    def take(self):
        """
        Take the highest priority item out of the queue.
        Returns the value of the item.
        """
        item = heapq.heappop(self.heap)
        del self.pris[item[0]]
        del self.rows[item[1]]
        return item[2]

    def promote(self, rowid):
        """
        Promote an item in the queue.
        The promoted item swaps position with the next highest item.
        Returns the number of affected rows.
        """
        if rowid not in self.rows: return 0
        item = self.rows[rowid]
        item_pri, item_row, item_val = item
        next = item_pri - 1
        if next in self.pris:
            iota = self.pris[next]
            iota_pri, iota_row, iota_val = iota
            iota[1], iota[2] = item_row, item_val
            item[1], item[2] = iota_row, iota_val
            self.rows[item_row] = iota
            self.rows[iota_row] = item
            return 2
        return 0

降级命令与提升命令几乎相同,因此为简洁起见,我将省略它。请注意,这仅取决于 python 的列表、字典和 heapq 库。

服务于我们的优先队列

现在使用 PQueue 数据类型,我们希望允许与实例进行分布式交互。一个很棒的库是gevent。尽管 gevent 相对较新且仍处于测试阶段,但它的速度非常快且经过良好测试。使用 gevent,我们可以很容易地设置一个监听 localhost:4040 的套接字服务器。这是我的服务器代码:

pqueue = PQueue()

def pqueue_server(sock, addr):
    text = sock.recv(1024)
    cmds = text.split(' ')
    if cmds[0] == 'append':
        result = pqueue.append(cmds[1])
    elif cmds[0] == 'take':
        result = pqueue.take()
    elif cmds[0] == 'promote':
        result = pqueue.promote(int(cmds[1]))
    elif cmds[0] == 'demote':
        result = pqueue.demote(int(cmds[1]))
    else:
        result = ''
    sock.sendall(str(result))
    print 'Request:', text, '; Response:', str(result)

if args.listen:
    server = StreamServer(('127.0.0.1', 4040), pqueue_server)
    print 'Starting pqueue server on port 4040...'
    server.serve_forever()

在生产中运行之前,您当然希望做一些更好的错误/缓冲区处理。但它适用于快速原型制作。请注意,这不需要围绕 pqueue 对象进行任何锁定。Gevent 实际上并没有并行运行代码,它只是给人一种印象。缺点是更多的内核无济于事,但好处是无锁代码。

别误会,gevent SocketServer 会同时处理多个请求。但它通过协作多任务处理在响应请求之间切换。这意味着您必须让出协程的时间片。虽然 gevents 套接字 I/O 函数旨在让出,但我们的 pqueue 实现却不是。幸运的是,pqueue 非常快地完成了它的任务。

也是一个客户

在进行原型设计时,我发现拥有一个客户也很有用。编写客户端需要一些谷歌搜索,所以我也会分享该代码:

if args.client:
    while True:
        msg = raw_input('> ')
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(msg)
        text = sock.recv(1024)
        sock.close()
        print text

要使用新的数据存储,首先启动服务器,然后启动客户端。在客户提示下,您应该能够:

> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two

扩展性非常好

考虑到您对数据存储的考虑,您似乎真的很关心吞吐量和持久性。但是“规模非常好”并不能量化您的需求。所以我决定用一个测试功能对上面的内容进行基准测试。这是测试功能:

def test():
    import time
    import urllib2
    import subprocess

    import random
    random = random.Random(0)

    from progressbar import ProgressBar, Percentage, Bar, ETA
    widgets = [Percentage(), Bar(), ETA()]

    def make_name():
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        return ''.join(random.choice(alphabet)
                       for rpt in xrange(random.randrange(3, 20)))

    def make_request(cmds):
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(cmds)
        text = sock.recv(1024)
        sock.close()

    print 'Starting server and waiting 3 seconds.'
    subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
                    shell=True)
    time.sleep(3)

    tests = []
    def wrap_test(name, limit=10000):
        def wrap(func):
            def wrapped():
                progress = ProgressBar(widgets=widgets)
                for rpt in progress(xrange(limit)):
                    func()
                secs = progress.seconds_elapsed
                print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
                    name, limit, secs, limit / secs)
            tests.append(wrapped)
            return wrapped
        return wrap

    def direct_append():
        name = make_name()
        pqueue.append(name)

    count = 1000000
    @wrap_test('Loaded', count)
    def direct_append_test(): direct_append()

    def append():
        name = make_name()
        make_request('append ' + name)

    @wrap_test('Appended')
    def append_test(): append()

    ...

    print 'Running speed tests.'
    for tst in tests: tst()

基准测试结果

我对笔记本电脑上运行的服务器进行了 6 次测试。我认为结果非常好。这是输出:

Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s

最终边界:耐用性

最后,耐用性是我没有完全原型化的唯一问题。但我也不认为这有那么难。在我们的优先级队列中,项目的堆(列表)包含我们需要将数据类型保存到磁盘的所有信息。由于使用 gevent,我们还可以以多处理方式生成函数,我想象使用这样的函数:

def save_heap(heap, toll):
    name = 'heap-{0}.txt'.format(toll)
    with open(name, 'w') as temp:
        for val in heap:
            temp.write(str(val))
            gevent.sleep(0)

并向我们​​的优先级队列添加保存功能:

def save(self):
    heap_copy = tuple(self.heap)
    toll = self.toll
    gevent.spawn(save_heap, heap_copy, toll)

您现在可以复制 Redis 的分叉模型,并每隔几分钟将数据存储写入磁盘。如果您需要更高的耐用性,请将上述内容与将命令记录到磁盘的系统相结合。这些是 Redis 使用的 AFP 和 RDB 持久化方法。

于 2012-05-12T03:00:36.820 回答
2

RabbitMQ 有什么问题?这听起来完全符合您的需要。

我们在生产环境中也广泛使用 Redis,但它没有队列通常具有的一些功能,例如将任务设置为完成,或者在某些 TTL 未完成时重新发送任务。另一方面,它确实具有队列所没有的其他功能,例如它是一个通用存储,而且速度非常快。

于 2012-05-07T08:16:08.917 回答
2

Websphere MQ几乎可以完成所有这些工作。

提升/降级几乎是可能的,方法是从队列中删除消息并以更高/更低的优先级将其放回,或者使用“CORRELID”作为序列号。

于 2012-05-04T01:43:13.747 回答
1

使用Redisson它以Redis提供的分布式方法实现熟悉ListQueue、、、、 java 接口。示例:BlockingQueueDequeDeque

Redisson redisson = Redisson.create();

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

redisson.shutdown();

其他样品:

https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#77-list
https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#78-queue https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#710-blocking-queue

于 2015-08-05T14:11:52.257 回答
0

如果您出于某种原因决定使用 SQL 数据库作为后端,我不会使用 MySQL,因为它需要轮询(当然也不会因为许多其他原因而使用它),但是 PostgreSQL 支持LISTEN/NOTIFY用于向其他客户端发出信号,以便他们不必轮询更改。但是,它会同时向所有侦听客户端发出信号,因此您仍然需要一种机制来选择获胜的侦听器。

作为旁注,我不确定提升/降级机制是否有用;最好在插入时适当安排作业......

于 2012-05-10T07:21:29.537 回答