如何在kafka中创建自定义序列化程序?

只有很少的序列化器可用,如,

org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringSerializer 

我们如何创建自己的自定义序列化程序?

这里有一个示例,可以使用您自己的序列化器/反序列化器来获取Kafka消息值。 对于Kafka消息密钥是一回事。

我们希望将MyMessage的序列化版本作为Kafka值发送,并将其再次反序列化为消费者端的MyMessage对象。

在生产者端序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Serializer的序列化程序类

serialize()方法完成工作,接收对象并返回序列化版本作为bytes数组。

 public class MyValueSerializer implements Serializer { private boolean isKey; @Override public void configure(Map configs, boolean isKey) { this.isKey = isKey; } @Override public byte[] serialize(String topic, MyMessage message) { if (message == null) { return null; } try { (serialize your MyMessage object into bytes) return bytes; } catch (IOException | RuntimeException e) { throw new SerializationException("Error serializing value", e); } } @Override public void close() { } } final IntegerSerializer keySerializer = new IntegerSerializer(); final MyValueSerializer myValueSerializer = new MyValueSerializer(); final KafkaProducer producer = new KafkaProducer<>(props, keySerializer, myValueSerializer); int messageNo = 1; int kafkaKey = messageNo; MyMessage kafkaValue = new MyMessage(); ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue); producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue)); 

在消费者方面反序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类。

deserialize()方法执行工作,接收序列化值作为字节数组并返回您的对象。

 public class MyValueDeserializer implements Deserializer { private boolean isKey; @Override public void configure(Map configs, boolean isKey) { this.isKey = isKey; } @Override public MyMessage deserialize(String s, byte[] value) { if (value == null) { return null; } try { (deserialize value into your MyMessage object) MyMessage message = new MyMessage(); return message; } catch (IOException | RuntimeException e) { throw new SerializationException("Error deserializing value", e); } } @Override public void close() { } } 

然后像这样使用它:

 final IntegerDeserializer keyDeserializer = new IntegerDeserializer(); final MyValueDeserializer myValueDeserializer = new MyValueDeserializer(); final KafkaConsumer consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer); ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { int kafkaKey = record.key(); MyMessage kafkaValue = record.value(); ... } 

您必须创建自己的序列化程序,它实现接口’Serializer’(org.apache.kafka.common.serialization.Serializer),然后将生产者选项’key.serializer / value.serializer’设置为它。

没有言语,只有代码

  1. 一些对象,你被发送到卡夫卡

     import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @AllArgsConstructor @NoArgsConstructor @ToString public class TestDto { private String name; private String version; } 
  2. 创建Serializer,将由Producer使用

     @Slf4j public class KafkaValueSerializer implements Serializer { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map configs, boolean isKey) { } @Override public byte[] serialize(String topic, TestDto data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { log.error("Unable to serialize object {}", data, e); return null; } } @Override public void close() { } } 
  3. 对于消费者来说,不要忘记Deserialiser

     @Slf4j public class KafkaValueDeserializer implements Deserializer { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map configs, boolean isKey) { } @Override public TestDto deserialize(String topic, byte[] data) { try { return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class); } catch (Exception e) { log.error("Unable to deserialize message {}", data, e); return null; } } @Override public void close() { } } 
  4. 最后一刻,将serializer / deserializer添加到application.yml

     spring: kafka: bootstrap-servers: 192.168.192.168:9092 producer: value-serializer: com.package.service.kafka.KafkaValueSerializer consumer: group-id: groupId value-deserializer: com.package.service.kafka.KafkaValueDeserializer 

就这样。 它没有必要任何配置文件或与tamboirine跳舞:)

  1. 发送

     KafkaTemplate kafkaTemplate; TestDto test = new TestDto("test name", "test-version"); kafkaTemplate.send(topic, testDto); 
  2.  @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}") public void listen(TestDto message) { log.info("Received message '{}' from Kafka.", message.toString()); }