spring boot kafka consumer – 如何从spring boot中正确使用kafka消息

我正在开发一个弹簧启动应用程序,它假设使用kafka消息。 我有一个奇怪的结果:当我使用kafka-console-producer.sh发送消息时,我的消费者只检测并打印另一条消息。 例如 – 在kafka控制台生产者中,我输入“one” – > Enter – >“two” – > Enter – >“three” – > Enter。 在我的春季靴子消费者中,我只会看到“两个”,“四个”等…

我的ConsumeConfigFactory.java:

import java.util.Properties; import javax.annotation.PostConstruct; import kafka.consumer.ConsumerConfig; import org.springframework.stereotype.Component; @Component public class ConsumerConfigFactory { private static final String ZK_CONNECT = "10.211.55.2:2181"; private ConsumerConfig consumerConfig; @PostConstruct private void createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", ZK_CONNECT); props.put("group.id", "Video-cg-0"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "100"); consumerConfig = new ConsumerConfig(props); } public ConsumerConfig getConsumerConfig() { return consumerConfig; } } 

我的ConsumerThreadPool.java:

 import static kafka.consumer.Consumer.createJavaConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.annotation.PostConstruct; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.utiltube.kafka.config.ConsumerConfigFactory; @Component public class ConsumerThreadPool { private static final String TOPIC = "test"; private static final Integer NUM_THREADS = 1; @Autowired private ConsumerConfigFactory consumerConfigFactory; private ConsumerConnector consumer; private ExecutorService threadPool; public ConsumerThreadPool() { threadPool = Executors.newFixedThreadPool(NUM_THREADS); } @PostConstruct public void startConsuming() { ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig(); consumer = createJavaConsumerConnector(consumerConfig); consume(); } public void consume() { Map topicCountMap = new HashMap(); topicCountMap.put(TOPIC, NUM_THREADS); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(TOPIC); int threadNumber = 0; for (final KafkaStream stream : streams) { threadPool.submit(new VideoConsumer(stream, threadNumber)); threadNumber++; } } } 

我的Consumer.java:

 import java.io.IOException; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.utiltube.kafka.video.model.Video; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class VideoConsumer implements Runnable { private ObjectMapper objectMapper; private KafkaStream kafkaStream; private int threadNumber; public VideoConsumer(KafkaStream kafkaStream, int threadNumber) { this.threadNumber = threadNumber; this.kafkaStream = kafkaStream; this.objectMapper = new ObjectMapper(); } @Override public void run() { ConsumerIterator it = kafkaStream.iterator(); while (it.hasNext()) { byte[] messageData = it.next().message(); try { //String videoFromMessage = objectMapper.readValue(messageData, String.class); //byte[] videoFromMessage = it.next().message(); //System.out.print("got message"); String videoFromMessage = new String(it.next().message()); System.out.print("Thread:" + threadNumber + ".Consuming video: " + videoFromMessage + "\n"); } catch (Exception e) { e.printStackTrace(); } } System.out.println("Shutting down Thread: " + kafkaStream); } } 

为了捕捉和显示每条消息,O可以改变什么?