0

Bunny一直给我一个错误,我认为是从 RabbitMQ 发回的,以下字符串是无效的日期格式:

无效的:

  • ISO8601 时间戳字符串
  • 毫秒整数
  • YYYY-MM-DD 字符串
  • Unix 时间或纪元时间整数Time.now.to_i

有效的:

  • 整数(取回请求的消息和之后的所有内容)
  • first (string) (取回从 first 开始的所有消息)
  • 最后一个(字符串)
  • 下一个(字符串)
iso8601 = '2021-08-28T13:40:31-07:00'

opts = {
  exclusive: false,
  manual_ack: true,
  block: true,
  arguments: {
    'x-stream-offset': iso8601
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end

我收到一个错误,说它是无效的流偏移参数

下面是我正在运行它的 rake 任务的错误消息

Bunny::PreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-stream-offset' for queue 'stream_test' in vhost '/': {invalid_stream_offset_arg,{longstr,<<"2021-08-28T13:40:31-07:00">>}}
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:2014:in `raise_if_channel_close!'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:944:in `basic_consume_with'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/queue.rb:191:in `subscribe'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/lib/stream/lib/read.rb:39:in `read'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/tasks/read.rake:9:in `block in <top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/rake-12.3.3/exe/rake:27:in `<top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `eval'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `<main>'

--

下面是一个作为流连接到rabbitMQ的工作示例(没有二进制协议) -需要流插件

# frozen_string_literal: true

require 'bunny'
require 'json'
require 'pp'

rabbit_user = 'guest'
rabbit_pass = 'guest'
rabbit_host = 'localhost:5672'

conn = Bunny.new(
  "amqp://#{rabbit_user}:#{rabbit_pass}@#{rabbit_host}",
  client_properties: { connection_name: :stream }
)

conn.start

ch = conn.create_channel(nil, 16)

queue = ch.queue(
  'stream_test',
  durable: true,
  auto_delete: false,
  exclusive: false,
  arguments: {
    'x-queue-type': 'stream',
    'x-max-length-bytes': 500_000_000
  }
)

50000.times do |i|
  queue.publish(JSON.dump({ hello: "world #{i + 1}" }), routing_key: 'stream_test')
  puts "published #{i + 1}"
end

ch.basic_qos(25)

opts = {
  exclusive: false,
  manual_ack: true,
  ## block will make it consume the main IO instead of being a seperate thread
  ## it is not recommended in production
  block: true,
  arguments: {
    'x-stream-offset': 'first'
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end

puts 'done'
sleep 1
conn.close

Java客户端示例显示它应该能够接受时间戳,但似乎我无法发送时间戳,是否有可接受的日期格式?

流功能的 Java 客户端文档说

时间戳 - 一个时间戳值,指定附加到日志的时间点。它将钳制到最接近的偏移量,如果时间戳超出流的范围,它将分别钳制日志的开始或结束。对于 AMQP 0.9.1,使用的时间戳是 POSIX 时间,精度为一秒,即自 1970 年 1 月 1 日 00:00:00 UTC 以来的秒数。

我找不到 POSIX 时间戳的示例

4

1 回答 1

0

在查看了此处的 Erlang RabbitMQ 源代码(未在任何开发人员文档中显示)之后,您似乎可以通过传入{number_of_seconds}s获取秒数或{number_of_minutes}m获取分钟数来获取时间序列数据

因此,时间序列数据可以格式化为字符串 - 看起来它确实接受某种时间格式,但同样不清楚如何让它工作,为了我的需要,字符串值是可以接受的

%% Erlang Source Code for RabbitMQ
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
    Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
    case lists:keyfind(Key, 1, KVList) of
        {_, {timestamp, Ts}} ->
            [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
        {_, {utf8, Spec}} ->
            [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc
        {_, {_, Offset}} when is_integer(Offset) ->
            [{<<"x-stream-offset">>, long, Offset}]; %% integer offset
        _ ->
            []
    end;
source_filters_to_consumer_args(_Source) ->
    [].

Ruby 中的答案是执行以下操作:

## float/decimal values are rejected, only accepts whole numbers,
## so you have to round seconds to the nearest whole number
offset = "10s"

opts = {
  exclusive: false,
  manual_ack: true,
  ## block will make it consume the main IO instead of being a seperate thread
  ## it is not recommended in production
  block: true,
  arguments: {
    'x-stream-offset': offset
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end
于 2021-11-27T19:50:07.350 回答