1

我需要建立一个可以编程控制的 Kombu 消费者。我看到的所有示例都只是告诉您使用 ctrl-c 停止程序的琐碎示例。

我的主要应用程序作为 Twisted Thrift 服务运行,我想我可以以某种方式使用 Twisted reactor 来处理我的消费者内部的事件循环,但我不知道如何。

这是我的消费类。start_sumption() 部分很好,只是它是阻塞的,我不能从外部调用 stop_sumption()。

from kombu import BrokerConnection, Exchange, eventloop, Queue, Consumer


class DMS():
    __routing_key = None
    __is_consuming = None
    __message_counter = 0

    def __init__(self, routing_key):
        print 'server: __init__()'
        self.__routing_key = routing_key

    def __handle_message(self, body, message):
        self.__message_counter += 1

        # Print count every 10,000 messsages.
        if (self.__message_counter % 10000) == 0:
            print self.__message_counter

    def start_consuming(self):
        print 'server: start_consuming()'
        self.__is_consuming = True
        exchange = Exchange('raven-exchange', type='topic', durable=False)
        queue = Queue(self.__routing_key, exchange, routing_key=self.__routing_key)

        with BrokerConnection('amqp://guest:guest@10.1.1.121:5672//') as connection:
            with Consumer(connection, queue, callbacks=[self.__handle_message]) as consumer:
                for _ in eventloop(connection):

                    if self.__is_consuming:
                        pass
                    else:
                        break

                consumer.cancel()
            connection.close()

    def stop_consuming(self):
        print 'server: stop_consuming()'
        self.__is_consuming = False
4

1 回答 1

0

The recommended way to route Thrift service calls through a MQ system is via oneway calls, because that is the most natural way to communicate via MQ and MessageBus systems.

struct Foo {
  1: string whoa
  2: i32 counter
}

service Whatever {
    oneway void FooBar(1: Foo someData, 2:i32 moreData)
}

A oneway call is a special form of a Thrift RPC call: As the name suggests, the call goes only in one direction. Neither return values nor exceptions (which are actually return values) are used with oneway. The call does send only the input args and does not wait for any values in return.

In order to establish a bi-di communication, the client needs to implement a similar service, designed to receive incoming answer messages. There are some samples in the Thrift /contrib folder, featuring 0MQ, Rebus and Stomp. Although they do not specifically deal with Python, the main idea should become clear.

于 2014-06-21T01:33:07.863 回答