Apache Kafka中消费者消费消息的延迟

我正在使用Kafka 0.8.0并试图实现下面提到的场景。

JCA API(作为生产者并发送数据)—–> Consumer ——> HBase

一旦我使用JCA客户端获取数据,我就会将每条消息发送给消费者。 例如,一旦生产者发送消息no.1,我想从消费者那里获取相同的信息并在HBase中“put”。 但是我的消费者在一些随机的消息之后开始获取消息。 我想让生产者和消费者同步,以便他们两个开始一起工作。

我用过:

1经纪人

1个单一主题

1个单一生产者和高级消费者

任何人都可以建议我需要做些什么才能达到同样的目的?

编辑:

添加一些相关的代码段。

Consumer.java

public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; PrintWriter pw = null; int t = 0; StringDecoder kd = new StringDecoder(null); Map topicCountMap = new HashMap(); Map<String, List<KafkaStream>> consumerMap; KafkaStream stream; ConsumerIterator it; public Consumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; topicCountMap.put(topic, new Integer(1)); consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer( new VerifiableProperties())); stream = consumerMap.get(topic).get(0); it = stream.iterator(); } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.size", "1024"); return new ConsumerConfig(props); } synchronized public void run() { while (it.hasNext()) { t = (it.next().message()).getChannelid(); System.out.println("In Consumer received msg" + t); } } } 

producer.java

 public class Producer { public final kafka.javaapi.producer.Producer producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "org.bigdata.kafka.Serializer"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type userdefined Object . producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); this.topic = topic; } } 

KafkaProperties.java

 public interface KafkaProperties { final static String zkConnect = "127.0.0.1:2181"; final static String groupId = "group1"; final static String topic = "test00"; final static String kafkaServerURL = "localhost"; final static int kafkaServerPort = 9092; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 100000; final static int reconnectInterval = 10000; final static String clientId = "SimpleConsumerDemoClient"; } 

这就是消费者对前10条消息的行为方式,它不会消除消费者收到的消息,但从第11条消息开始,它开始正常运行。

  producer sending msg1 producer sending msg2 producer sending msg3 producer sending msg4 producer sending msg5 producer sending msg6 producer sending msg7 producer sending msg8 producer sending msg9 producer sending msg10 producer sending msg11 producer sending msg12 In Consumer received msg12 producer sending msg13 In Consumer received msg13 producer sending msg14 In Consumer received msg14 producer sending msg15 In Consumer received msg15 producer sending msg16 In Consumer received msg16 producer sending msg17 In Consumer received msg17 producer sending msg18 In Consumer received msg18 producer sending msg19 In Consumer received msg19 producer sending msg20 In Consumer received msg20 producer sending msg21 In Consumer received msg21 

编辑:在生产者向消费者发送消息的地方添加监听器function。 而我使用的默认生产者配置没有覆盖它

 public synchronized void onValueChanged(final MonitorEvent event_) { // Get the value from the DBR try { final DBR dbr = event_.getDBR(); final String[] val = (String[]) dbr.getValue(); producer1.producer.send(new KeyedMessage (KafkaProperties.topic,new Signal(messageNo))); System.out.println("producer sending msg"+messageNo); messageNo++; } catch (Exception ex) { ex.printStackTrace(); } } 

  1. 尝试将props.put("request.required.acks", "1")到生产者配置中。 默认情况下,生产者不会等待确认,并且无法保证消息传递。 因此,如果您在测试之前启动代理,生产者可能会在代理完全初始化之前开始发送消息,并且可能会丢失前几条消息。

  2. 尝试将props.put("auto.offset.reset", "smallest")到使用者配置中。 它等于kafka-console-consumer.sh的–from --from-beginning选项。 如果您的使用者比生产者晚启动并且Zookeeper中没有保存偏移数据,那么默认情况下它将仅开始消耗新消息(请参阅文档中的消费者配置 )。

这可能是由于没有分区而不是消费者。 检查主题是否仅使用单个分区创建,然后您将不会错过消费者中的任何消息。