在Kafka Streams中反序列化POJO

我的Kafka主题包含此格式的消息

user1,subject1,80|user1,subject2,90 user2,subject1,70|user2,subject2,100 and so on. 

我创建了用户POJO如下。

 class User implements Serializable{ /** * */ private static final long serialVersionUID = -253687203767610477L; private String userId; private String subject; private String marks; public User(String userId, String subject, String marks) { super(); this.userId = userId; this.subject = subject; this.marks = marks; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getMarks() { return marks; } public void setMarks(String marks) { this.marks = marks; } } 

此外,我已创建默认键值序列化

 streamProperties.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamProperties.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

我试图通过userID查找计数如下。 我还需要User对象来执行其他一些function。

 KTable wordCount = streamInput .flatMap(new KeyValueMapper<String, String, Iterable<KeyValue>>() { @Override public Iterable<KeyValue> apply(String key, String value) { String[] userObjects = value.split("|"); List<KeyValue> userList = new LinkedList(); for(String userObject: userObjects) { String[] userData = userObject.split(","); userList.add(KeyValue.pair(userData[0], new User(userData[0],userData[1],userData[2]))); } return userList; } }) .groupByKey() .count(); 

我收到以下错误

 Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.example.testing.dao.User). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 

我想我需要为用户类提供正确的Serde

有人可以指导我吗?