Kafka:编写自定义序列化程序

我正在尝试用Kafka 0.8.1建立一个POC。 我使用自己的java类作为Kafka消息,它有一堆String数据类型。 我不能使用默认的序列化程序类或Kafka库附带的String序列化程序类。 我想我需要编写自己的序列化程序并将其提供给生产者属性。 如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享。 非常感谢,非常感谢。

编写自定义序列化程序所需的东西是:

  1. 使用为generics指定的对象实现Encoder
    • 需要提供VerifiableProperties构造函数
  2. 覆盖toBytes(...)方法,确保返回一个字节数组
  3. 将序列化程序类注入ProducerConfig

为生产者声明自定义序列化程序

正如您在问题中所述,Kafka提供了一种为生产者声明特定序列化器的方法。 序列化程序类在ProducerConfig实例中设置,该实例用于构造所需的Producer类。

如果您遵循Kafka的Producer Example,您将通过Properties对象构造ProducerConfig 。 构建属性文件时,请确保包括:

 props.put("serializer.class", "path.to.your.CustomSerializer"); 

通过类的路径,您希望Kafka在将消息附加到日志之前用它来序列化消息。

创建Kafka理解的自定义序列化程序

编写Kafka可以正确解释的自定义序列化程序需要实现Kafka提供的Encoder[T] scala类。 在java中实现traits是很奇怪的 ,但是以下方法适用于在我的项目中序列化JSON:

 public class JsonEncoder implements Encoder { private static final Logger logger = Logger.getLogger(JsonEncoder.class); // instantiating ObjectMapper is expensive. In real life, prefer injecting the value. private static final ObjectMapper objectMapper = new ObjectMapper(); public JsonEncoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(Object object) { try { return objectMapper.writeValueAsString(object).getBytes(); } catch (JsonProcessingException e) { logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e); } return "".getBytes(); } } 

您的问题听起来好像您正在使用一个对象(让我们称之为CustomMessage )来添加到您的日志中的所有消息。 如果是这种情况,您的序列化程序可能看起来更像这样:

 package com.project.serializer; public class CustomMessageEncoder implements Encoder { public CustomMessageEncoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(CustomMessage customMessage) { return customMessage.toBytes(); } } 

这将使您的属性配置看起来像这样:

 props.put("serializer.class", "path.to.your.CustomSerializer"); 

您需要实现编码和解码器

 public class JsonEncoder implements Encoder { private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class); public JsonEncoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(Object object) { ObjectMapper objectMapper = new ObjectMapper(); try { return objectMapper.writeValueAsString(object).getBytes(); } catch (JsonProcessingException e) { LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e); } return "".getBytes(); } } 

解码器代码

 public class JsonDecoder implements Decoder { private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class); public JsonDecoder(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public Object fromBytes(byte[] bytes) { ObjectMapper objectMapper = new ObjectMapper(); try { return objectMapper.readValue(bytes, Map.class); } catch (IOException e) { LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e); } return null; } } 

pom入口

  com.fasterxml.jackson.core jackson-databind 2.4.1.3  

在Kafka属性中设置默认编码器

 properties.put("serializer.class","kafka.serializer.DefaultEncoder"); 

作者和读者代码如下

 byte[] bytes = encoder.toBytes(map); KeyedMessage message =new KeyedMessage(this.topic, bytes); JsonDecoder decoder = new JsonDecoder(null); Map map = (Map) decoder.fromBytes(it.next().message());