Kafka消费者(0.8.2.2)可以批量阅读消息吗?
根据我的理解,Kafka消费者按顺序从指定的分区读取消息…
我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,生产者发布消息,如每秒40个。消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我…
在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理…有没有更好的解决方案???
根据您的问题澄清。
Kafka Consumer可以一次读取多条消息。 但是Kafka Consumer并不真正读取消息,说消费者读取一定数量的字节然后根据各个消息的大小来确定将读取多少消息更为正确。 通过Kafka Consumer Configs读取,您不能指定要获取的消息数,您可以指定消费者可以获取的最大/最小数据大小。 但是,在该范围内的许多消息是您将获得多少消息。 正如您所指出的那样,您将始终按顺序获取消息。
相关消费者配置(0.9.0.0及更高版本)
- fetch.min.bytes
- max.partition.fetch.bytes
UPDATE
在评论中使用您的示例,“我的理解是,如果我在配置中指定读取10个字节,并且如果每个消息是2个字节,则消费者一次读取5个消息。” 那是真实的。 你的下一个声明,“这意味着这5条消息的偏移在分区中是随机的”,这是错误的。 阅读顺序并不意味着一个接一个,它只是意味着它们仍然是有序的。 您可以批量处理项目并使其保持顺序/有序。 请看以下示例。
在Kafka日志中,如果有10条消息(每2个字节)具有以下偏移量,则为[0,1,2,3,5,5,7,8,9]。
如果您读取10个字节,您将获得一个包含偏移量[0,1,2,3,4]的消息的批处理。
如果您读取6个字节,您将获得一个包含偏移量[0,1,2]的消息的批处理。
如果您读取6个字节,然后再读取6个字节,您将获得包含消息[0,1,2]和[3,4,5]的两个批处理。
如果你读取8个字节,然后是4个字节,你将得到两个包含消息[0,1,2,3]和[4,5]的批次。
更新:澄清提交
我不是100%确定如何工作,我主要是在Storm环境下与Kafka合作。 提供的KafkaSpout会自动提交Kafka消息。
但是看看0.9.0.1 Consumer API ,我建议你这样做。 似乎有三种与本讨论相关的方法。
- 轮询(长时间超时)
- commitSync()
- commitSync(java.util.Map偏移量)
poll方法检索消息,可能只有1,可能是20,对于你的例子,可以说3条消息被返回[0,1,2]。 你现在有这三条消息。 现在由您决定如何处理它们。 您可以处理它们0 => 1 => 2,1 => 0 => 2,2 => 0 => 1,它只是取决于。 但是你处理它们,在处理之后你会想要提交告诉Kafka服务器你已完成这些消息。
使用commitSync()提交上次轮询时返回的所有内容,在这种情况下,它将提交偏移量[0,1,2]。
另一方面,如果选择使用commitSync(java.util.Map偏移),则可以手动指定要提交的偏移量。 如果您按顺序处理它们,则可以处理偏移量0然后提交它,处理偏移量1然后提交它,最后处理偏移量2并提交。
总而言之,Kafka为您提供了处理消息如何渴望的自由,您可以根据自己的选择顺序或完全随机地处理它们。
要实现并行性,这似乎是您所要求的,您使用主题分区(您在N个部分上分割主题,称为分区)。 然后,在使用者中,您生成多个线程以从这些分区中使用。
在Producer端,您将消息发布到随机分区(默认),或者向Kafka提供一些消息属性来计算散列(如果需要排序),这可以确保具有相同散列的所有msgs都转到同一个分区。
编辑(偏移提交请求的示例):
这就是我做到的。 所有未提供的方法都是非必要的。
/** * Commits the provided offset for the current client (ie unique topic/partition/clientName combination) * * @param offset * @return {@code true} or {@code false}, depending on whether commit succeeded * @throws Exception */ public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer, long offset) throws Exception { try { TopicAndPartition tap = new TopicAndPartition(topic, partition); OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L); Map mapForCommitOffset = new HashMap<>(1); mapForCommitOffset.put(tap, offsetMetaAndErr); kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest( ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName, ConsumerContext.getOffsetStorageType()); OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq); Short errCode = (Short) offsetCommitResp.errors().get(tap); if (errCode != 0) { processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host())); ErrorMapping.maybeThrowException(errCode); } LOG.debug("Successfully committed offset [{}].", offset); } catch (Exception e) { LOG.error("Error while committing offset [" + offset + "].", e); throw e; } return true; }
您可以批量使用消息并以批处理方式处理它们。 batch.max.wait.ms(property)消费者将等待这段时间并轮询新消息
- KafkaProducer未成功将消息发送到队列中
- 在Kafka Streams中反序列化POJO
- Apache Kafka:无法更新Metadata / java.nio.channels.ClosedChannelException
- Kafka – 使用高级消费者实现延迟队列
- kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException
- Kafka使用者 – 消费者进程和线程与主题分区的关系是什么
- 连接到Apache Kafka多节点群集中的Zookeeper
- KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord
- 如何在kafka中创建自定义序列化程序?