我知道这个问题重复了 使用rabbitmq发送消息而不是字符串而是结构的问题
如果使用第一种方法来做到这一点
我有以下跟踪:
java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
我已经检查并确保该消息在发送者类中完全转换为字节,但消费者无法接收它。
这是我的制片人班:
package com.mdnaRabbit.newt;
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main( String[] argv) throws IOException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
int i = 0;
do {
Data message = getMessage();
byte [] byteMessage = message.getBytes();
//System.out.println(byteMessage);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
i++;
} while (i<15);
channel.close();
connection.close();
}
private static Data getMessage(){
Data data = new Data();
data.setHeader("header");
data.setDomainId("abc.com");
data.setReceiver("me");
data.setSender("he");
data.setBody("body");
return data;
}
private static String joinStrings(String[] strings, String delimiter){
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
这是我的消费类:
package com.mdnaRabbit.worker;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
private static int i = 0;
public static void main( String[] argv )
throws IOException,
InterruptedException{
ExecutorService threader = Executors.newFixedThreadPool(20);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(threader);
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(20);
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
try {
while (true) {
try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Data message = Data.fromBytes(delivery.getBody());
//Data message = (Data) SerializationUtils.deserialize(delivery.getBody());
System.out.println(" [" + (i++) +"] Received" + message.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch (Exception e){
}
}
} catch (Exception e){
e.printStackTrace();
}
channel.close();
connection.close();
}
}
这是我的数据类:
package com.mdnaRabbit.worker.data;
import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Data implements Serializable{
public String header;
public String body;
public String domainId;
public String sender;
public String receiver;
public void setHeader(String head){
this.header = head;
}
public String getHeader(){
return header;
}
public void setBody(String body){
this.body = body;
}
public String getBody(){
return body;
}
public void setDomainId(String domainId){
this.domainId = domainId;
}
public String getDomainId(){
return domainId;
}
public void setSender(String sender){
this.sender = sender;
}
public String getSender(){
return sender;
}
public String getReceiver(){
return receiver;
}
public void setReceiver(String receiver){
this.receiver = receiver;
}
public byte[] getBytes() {
byte[]bytes;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try{
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.flush();
oos.reset();
bytes = baos.toByteArray();
oos.close();
baos.close();
} catch(IOException e){
bytes = new byte[] {};
Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
}
return bytes;
}
public static Data fromBytes(byte[] body) {
Data obj = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bis);
obj = (Data) ois.readObject();
ois.close();
bis.close();
}
catch (IOException e) {
e.printStackTrace();
}
catch (ClassNotFoundException ex) {
ex.printStackTrace();
}
return obj;
}
}
我似乎总是消费者收到消息,因为当我不尝试将其转换为对象并只写
System.out.println(delivery.getBody)
它时显示字节