KafkaConsumer 0.10 Java API错误消息:没有分区的当前分配

我正在使用KafkaConsumer 0.10 Java api。 我想从特定的分区和特定的偏移消耗。 我抬起头,发现有一个搜索方法,但它抛出exception。 任何人都有类似的用例或解决方案?

码:

KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.seek(new TopicPartition("mytopic", 1), 4); 

例外

 java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) at xx.xxx.xxx.Test.main(Test.java:182) 

在您可以seek()之前,首先需要subscribe()主题 assign()主题assign()分配给消费者。 另外请记住, subscribe()assign()是惰性的 – 因此,在使用seek()之前,还需要对poll()进行“虚拟调用”。

注意:从Kafka 2.0开始,新的poll(Duration timeout)是异步的,并且不能保证在poll返回时您有完整的poll 。 因此,您可能需要在使用seek()之前检查您的分配,并再次poll以刷新分配。 (参见KIP-266了解详情)

如果使用subscribe() ,则使用组管理:因此,您可以使用相同的group.id启动多个使用者,并且主题的所有分区将自动分配到组内的所有使用者(每个分区将分配给一个小组中的单个消费者)。

如果要读取特定分区,则需要通过assign()使用手动分配。 这允许您进行任何所需的任务。

顺便说一句: KafkaConsumer有一个很长的详细类JavaDoc,包括例子。 值得一读。

如果您不想使用poll()并检索地图记录,并更改偏移本身。 Kafka版本0.11试试这个:

 ... props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); List partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); coordinatorField.setAccessible(true); ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings consumer.seekToBeginning(partitions); //or other seek 

调查协调员活动。 这可确保协调器已知并且消费者已加入该组(如果它正在使用组管理)。 如果启用它们,它还会处理定期偏移提交。