2

我想首先说我不仅是 python 的新手,而且是一般的编程新手。考虑到这一点,这是我的问题。我正在尝试将一些数据发送到我的 Rabbitmq 服务器。下面是我的 Python 代码。json_data 只是一个保存一些 json 格式数据的变量。

with Connection("amqp://username:password@test_server.local:5672/#/") as conn:
    channel = conn.channel()
    producer = Producer(channel, exchange = "test_exchange", serializer="json")

    producer.publish(json_data)

print "Message sent"

这会产生以下错误:

Traceback (most recent call last): File "test.py", line 43, in <module> producer = Producer(channel, exchange = "test_exchange", serializer="json") File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 83, in __init__ self.revive(self._channel) File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 210, in revive self.exchange = self.exchange(channel) TypeError: 'str' object is not callable

任何帮助,将不胜感激。谢谢。

4

2 回答 2

1

把头撞在桌子上后,我发现 Python 不喜欢交换的调用。然后我的同事告诉我,如果我在发布者中包含交换和路由密钥,它可能会更好。所以现在我的新代码看起来像这样。

# Creates the exchange and queue and binds them
# If already created on the server side these steps are not needed
exchange = Exchange("test_exchange", "direct", durable=True)
queue = Queue("test_q", exchange = exchange, routing_key = "test")

# The last foreword slash is the virtual host
with Connection("amqp://username:password@hostname:5672/", "/") as conn:
    channel = conn.channel()
    producer = Producer(channel, exchange = exchange, serializer="json")

    for key, value in json_data.items():
        producer.publish(exchange = exchange, routing_key = "test", body = {key:value})

print "Message sent!"
于 2013-08-22T16:51:33.953 回答
0

我正在回答这个问题 - 因为你的答案没有确定真正的问题,即 TypeError: 'str' object is not callable at self.exchange = self.exchange(channel)- 表明你正在将错误的类型传递给exchange参数,即将一个 ' str' 传递给exchange

producer = Producer(channel, exchange = "test_exchange", serializer="json")

解决方案:无论您在哪里将值传递给“交换”参数,它都必须是一个Exchange对象。

您的代码必须是:

with Connection("amqp://username:password@test_server.local:5672/#/") as conn:
    channel = conn.channel()
    exchange = Exchange(name='inbound') # the minimal declaration
    producer = Producer(channel, exchange=exchange, serializer="json")

    producer.publish(json_data)

print "Message sent"

有关 Exchange 参数文档的完整列表。


我在队列声明中遇到了同样的错误,即

queue = Queue(name=queue_name, exchange='host_tasks', routing_key=binding_key)
bound_queue = queue(channel) # only once bound, we can call declare(), purge(), delete() on exchange

所以宣布了一个交易所,这样的thah

exchange = Exchange('host_tasks', 'direct', durable=True)
queue = Queue(name=queue_name, exchange=exchange, routing_key=binding_key)
bound_queue = queue(channel)

不要忘记导入Exchange

于 2016-04-29T13:09:20.887 回答