Bunny
一直给我一个错误,我认为是从 RabbitMQ 发回的,以下字符串是无效的日期格式:
无效的:
- ISO8601 时间戳字符串
- 毫秒整数
- YYYY-MM-DD 字符串
- Unix 时间或纪元时间整数
Time.now.to_i
有效的:
- 整数(取回请求的消息和之后的所有内容)
- first (string) (取回从 first 开始的所有消息)
- 最后一个(字符串)
- 下一个(字符串)
iso8601 = '2021-08-28T13:40:31-07:00'
opts = {
exclusive: false,
manual_ack: true,
block: true,
arguments: {
'x-stream-offset': iso8601
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
我收到一个错误,说它是无效的流偏移参数
下面是我正在运行它的 rake 任务的错误消息
Bunny::PreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-stream-offset' for queue 'stream_test' in vhost '/': {invalid_stream_offset_arg,{longstr,<<"2021-08-28T13:40:31-07:00">>}}
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:2014:in `raise_if_channel_close!'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:944:in `basic_consume_with'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/queue.rb:191:in `subscribe'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/lib/stream/lib/read.rb:39:in `read'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/tasks/read.rake:9:in `block in <top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/rake-12.3.3/exe/rake:27:in `<top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `eval'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `<main>'
--
下面是一个作为流连接到rabbitMQ的工作示例(没有二进制协议) -需要流插件
# frozen_string_literal: true
require 'bunny'
require 'json'
require 'pp'
rabbit_user = 'guest'
rabbit_pass = 'guest'
rabbit_host = 'localhost:5672'
conn = Bunny.new(
"amqp://#{rabbit_user}:#{rabbit_pass}@#{rabbit_host}",
client_properties: { connection_name: :stream }
)
conn.start
ch = conn.create_channel(nil, 16)
queue = ch.queue(
'stream_test',
durable: true,
auto_delete: false,
exclusive: false,
arguments: {
'x-queue-type': 'stream',
'x-max-length-bytes': 500_000_000
}
)
50000.times do |i|
queue.publish(JSON.dump({ hello: "world #{i + 1}" }), routing_key: 'stream_test')
puts "published #{i + 1}"
end
ch.basic_qos(25)
opts = {
exclusive: false,
manual_ack: true,
## block will make it consume the main IO instead of being a seperate thread
## it is not recommended in production
block: true,
arguments: {
'x-stream-offset': 'first'
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
puts 'done'
sleep 1
conn.close
Java客户端示例显示它应该能够接受时间戳,但似乎我无法发送时间戳,是否有可接受的日期格式?
流功能的 Java 客户端文档说
时间戳 - 一个时间戳值,指定附加到日志的时间点。它将钳制到最接近的偏移量,如果时间戳超出流的范围,它将分别钳制日志的开始或结束。对于 AMQP 0.9.1,使用的时间戳是 POSIX 时间,精度为一秒,即自 1970 年 1 月 1 日 00:00:00 UTC 以来的秒数。
我找不到 POSIX 时间戳的示例