如何在kafka 0.9.0中使用multithreading消费者?

卡夫卡的文件给出了以下描述的方法:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例。

我的代码:

public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final CloudKafkaConsumer consumer; private final String topicName; public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { this.consumer = consumer; this.topicName = topicName; } @Override public void run() { try { this.consumer.subscribe(topicName); ConsumerRecords records; while (!closed.get()) { synchronized (consumer) { records = consumer.poll(100); } for (ConsumerRecord tmp : records) { System.out.println(tmp.value()); } } } catch (WakeupException e) { // Ignore exception if closing System.out.println(e); //if (!closed.get()) throw e; } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } public static void main(String[] args) { CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder() .withBootstrapServers("172.31.1.159:9092") .withGroupId("test") .build(); ExecutorService executorService = Executors.newFixedThreadPool(5); executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log")); executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info")); executorService.shutdown(); } } 

但它不起作用并引发exception:

java.util.ConcurrentModificationException:KafkaConsumer对multithreading访问不安全

此外,我阅读了Flink(一个用于分布式流和批处理数据处理的开源平台)的来源。 使用multithreading消费者的Flink与我的类似。

 long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); pollLoop: while (running) { ConsumerRecords records; //noinspection SynchronizeOnNonFinalField synchronized (flinkKafkaConsumer.consumer) { try { records = flinkKafkaConsumer.consumer.poll(pollTimeout); } catch (WakeupException we) { if (running) { throw we; } // leave loop continue; } } 

flink代码的multithreading

怎么了?

Kafka消费者不是线程安全的 。 正如您在问题中指出的那样,该文件表明了这一点

一个简单的选择是为每个线程提供自己的消费者实例

但是在您的代码中,您拥有由不同KafkaConsumerRunner实例包装的相同的使用者实例。 因此,多个线程正在访问同一个消费者实例。 卡夫卡文件明确指出

Kafka消费者不是线程安全的。 所有网络I / O都发生在进行调用的应用程序的线程中。 用户有责任确保正确同步multithreading访问。 未同步的访问将导致ConcurrentModificationException。

这正是你收到的例外。

它会在您订阅的电话上抛出exception。 this.consumer.subscribe(topicName);

将该块移动到同步块中,如下所示:

 @Override public void run() { try { synchronized (consumer) { this.consumer.subscribe(topicName); } ConsumerRecords records; while (!closed.get()) { synchronized (consumer) { records = consumer.poll(100); } for (ConsumerRecord tmp : records) { System.out.println(tmp.value()); } } } catch (WakeupException e) { // Ignore exception if closing System.out.println(e); //if (!closed.get()) throw e; } } 

也许不是你的情况,但如果你是对多个主题的数据进行合金处理,那么你可以从同一个消费者的多个主题中读取数据。 如果没有,则最好创建消耗每个主题的单独作业。