2

我正在尝试从 Java 接收 Azure 队列消息。我能够在 java 中使用 BrokeredMessage 接收消息。(我是java新手)。

我的问题是 message.getBody() 也返回了一些标题信息(不仅仅是我需要的消息)。

I get     string3http://schemas.mi@string3http://schemas.microsoft.com/2003/10/Serialization/?? message appended with my body in front of it. How can I get rid of this header information.

而且我还注意到我分两批收到消息。(不是一次全部)

My first batch of message.getBody()returns below message
'@string3http://schemas.mi'
My Second batch of message.getBody()returns below message
@crosoft.com/2003/10/Serialization/?? + My actual message.

我的总消息大小小于 500B,但我已将字节大小设置为 4096。因此,由于大小问题,不是夹板。

这是我使用的接收器代码。

ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.PEEK_LOCK);

while (true)
{
ReceiveQueueMessageResult resultQM =
service.receiveQueueMessage("MyqueueName", opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null)
{

byte[] b = new byte[4096];
String s = null;
int numRead = message.getBody().read(b);
while (-1 != numRead)
{
s = new String(b);
s = s.trim();
System.out.print(s);
numRead = message.getBody().read(b);
}
}

}

这是 System.out.print(s); 的总输出。(但正如我之前提到的,分两批)

total output
string3http://schemas.microsoft.com/2003/10/Serialization/??+ My actual message.

任何帮助深表感谢!

4

2 回答 2

0

我想你从这里得到了上面的样本。在继续编写代码之前,我们需要了解接收模式锁定类型,它们是PeekLockReceiveAndDelete

接收并删除:

使用 ReceiveAndDelete 模式时,receive 是一个single-shot operation- 即当 Service Bus 接收到对队列中消息的读取请求时,它会将消息标记为 is consumed and returns it to the application。ReceiveAndDelete 模式(这是默认模式)是最简单的模式,最适用于应用程序可以容忍在发生故障时不处理消息的场景。要理解这一点,请考虑消费者发出接收请求然后在处理它之前崩溃的场景。因为服务总线会将消息标记为已使用,所以当应用程序重新启动并再次开始使用消息时,它将错过崩溃前已使用的消息。

偷看

在 PeekLock 模式下,receive 变为two-stage operation,这使得支持不能容忍丢失消息的应用程序成为可能。当服务总线收到请求时,它会找到下一条要使用的消息locks it to prevent other consumers receiving it,然后将其返回给应用程序。在应用程序完成处理消息(或将其可靠地存储以供将来处理)后,它通过对接收到的消息调用 Delete 来完成接收过程的第二阶段。当服务总线看到删除调用时,它会将消息标记为已使用并将其从队列中删除。

您正在尝试使用 PeekLock 模式,在这种模式下,您需要显式调用deleteMessage(message)以将消息标记为已使用,除非您没有调用此方法,即使它看起来像已使用,但实际上并未使用。它仍在队列中。

我认为您提到的标头不是实际的标头,是队列中的初始消息,实际上根本没有消耗

您可以像下面这样更改您的代码并尝试

if (message != null && message.getMessageId() != null)
{

byte[] b = new byte[4096];
String s = null;
int numRead = message.getBody().read(b);
while (-1 != numRead)
{
s = new String(b);
s = s.trim();
System.out.print(s);
numRead = message.getBody().read(b);
}
//Add the below to ack that you are consumed the message
service.deleteMessage(message);
}    
}
于 2018-08-08T04:49:50.680 回答
0

谢谢,Jayendaran 的回复。将模式类型从 PeekLock 更改为 ReceiveAndDelete 并不能解决问题。

但是,我找到了解决方案在发送之前将消息蒸熟可以解决问题。

而不是这个

BrokeredMessage message = new BrokeredMessage(encryptData);
message.ContentType = "application/json";
testQueueSender.Send(message);

这样做可以解决问题

byte[] bytes = Encoding.UTF8.GetBytes(encryptData);
MemoryStream stream = new MemoryStream(bytes, writable: false);
BrokeredMessage message = new BrokeredMessage(stream);
message.ContentType = "application/json";
testQueueSender.Send(message);

但由于某种原因,我仍然分两部分收到消息。第一部分一直是固定的 27 个字节。但我在.NET 中没有这个问题。

问候

于 2018-08-08T11:28:31.230 回答