使用RabbitMQ发送对象

我理解这个问题重复了使用rabbitmq发送消息而不是字符串而不是struct的问题

如果用第一种方式做到这一点

第一种方式

我有以下痕迹:

java.io.EOFException at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798) at java.io.ObjectInputStream.(ObjectInputStream.java:298) at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78) at com.mdnaRabbit.worker.App.main(App.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 

我检查并确保在发件人类中将消息转换为字节,但是消费者无法接收它。

这是我的制作人类:

 package com.mdnaRabbit.newt; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.apache.commons.lang.SerializationUtils; import com.mdnaRabbit.worker.data.Data; public class App { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main( String[] argv) throws IOException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); int i = 0; do { Data message = getMessage(); byte [] byteMessage = message.getBytes(); //System.out.println(byteMessage); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage); System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody()); i++; } while (i<15); channel.close(); connection.close(); } private static Data getMessage(){ Data data = new Data(); data.setHeader("header"); data.setDomainId("abc.com"); data.setReceiver("me"); data.setSender("he"); data.setBody("body"); return data; } private static String joinStrings(String[] strings, String delimiter){ int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++){ words.append(delimiter).append(strings[i]); } return words.toString(); } } 

这是我的消费者类:

  package com.mdnaRabbit.worker; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.mdnaRabbit.worker.data.Data; import org.apache.commons.lang.SerializationUtils; public class App { private static final String TASK_QUEUE_NAME = "task_queue"; private static int i = 0; public static void main( String[] argv ) throws IOException, InterruptedException{ ExecutorService threader = Executors.newFixedThreadPool(20); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(threader); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(20); final QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); try { while (true) { try {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); Data message = Data.fromBytes(delivery.getBody()); //Data message = (Data) SerializationUtils.deserialize(delivery.getBody()); System.out.println(" [" + (i++) +"] Received" + message.getBody()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }catch (Exception e){ } } } catch (Exception e){ e.printStackTrace(); } channel.close(); connection.close(); } } 

这是我的Data类:

 package com.mdnaRabbit.worker.data; import java.io.*; import java.util.logging.Level; import java.util.logging.Logger; public class Data implements Serializable{ public String header; public String body; public String domainId; public String sender; public String receiver; public void setHeader(String head){ this.header = head; } public String getHeader(){ return header; } public void setBody(String body){ this.body = body; } public String getBody(){ return body; } public void setDomainId(String domainId){ this.domainId = domainId; } public String getDomainId(){ return domainId; } public void setSender(String sender){ this.sender = sender; } public String getSender(){ return sender; } public String getReceiver(){ return receiver; } public void setReceiver(String receiver){ this.receiver = receiver; } public byte[] getBytes() { byte[]bytes; ByteArrayOutputStream baos = new ByteArrayOutputStream(); try{ ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.flush(); oos.reset(); bytes = baos.toByteArray(); oos.close(); baos.close(); } catch(IOException e){ bytes = new byte[] {}; Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e); } return bytes; } public static Data fromBytes(byte[] body) { Data obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(body); ObjectInputStream ois = new ObjectInputStream(bis); obj = (Data) ois.readObject(); ois.close(); bis.close(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return obj; } } 

我似乎总是看消费者收到消息,因为当我没有尝试将其转换为对象并只写System.out.println(delivery.getBody)它显示字节

看起来您收到的字节数组是空的。 这是因为:

  } catch(IOException e){ bytes = new byte[] {}; } 

当产生exception时,代码不会警告您某些内容被破坏而只是发送一个空数组。 您应该至少记录错误。

正在生成exception可能是因为您尝试序列化不可序列化的类。 要使类可序列化,您必须在其声明中添加“implements Serializable”:

 public class Data implements Serializable {