Tag: apache kafka

Kafka Resiliency – 集团协调员

据我了解,其中一位经纪人被选为负责消费者再平衡的集团协调员。 Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group 我有3个节点,复制因子为3和3个分区。 一切都很好,当我在非协调节点上杀死kafka时,消费者仍在接收消息。 但是当我用协调器杀死那个特定节点时,重新平衡没有发生,我的java消费者应用程序没有收到任何消息。 2018-05-29 16:34:22.668 INFO AbstractCoordinator:555 – Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group. 2018-05-29 16:34:22.689 INFO AbstractCoordinator:600 – Marking the coordinator host:9092 (id: 2147483646 rack: null) dead for group good_group 2018-05-29 16:34:22.801 INFO AbstractCoordinator:555 – Discovered coordinator host:9092 (id: […]

Kafka 0.9.0.1 Java消费者陷入awaitMetadataUpdate()

我正在尝试使用Java API v0.9.0.1让一个简单的Kafka Consumer工作。 我正在使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。 以下是消费者代码: public class Consumer { public static void main(String[] args) throws IOException { KafkaConsumer consumer; try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); consumer = new KafkaConsumer(properties); } consumer.subscribe(Arrays.asList(“messages”)); try { while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.println(“Message received: ” […]

Kafka Java Producer与kerberos

在kerberosed环境中向kafka主题发送消息时收到错误。 我们在hdp 2.3上有集群 我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/ 但是对于发送消息,我必须首先明确地执行kinit,然后才能将消息发送到kafka主题。 我试图通过java类编织,但这也行不通。 PFB代码: package com.ct.test.kafka; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { String principalName = “ctadmin”; String keyTabPath = “/etc/security/keytabs/ctadmin.keytab”; boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath); if (!authStatus) { System.out.println(“Authntication fails, try something else ” + authStatus); […]

Spring引导application.yml中的Spring Kafka SSL设置

我正在尝试使用Kafka客户端设置Spring Boot应用程序以使用SSL。 我将我的keystore.jks和truststore.jks存储在文件系统(在docker容器上)上,因为: https : //github.com/spring-projects/spring-kafka/issues/710 这是我的application.yml: spring: kafka: ssl: key-password: pass keystore-location: /tmp/kafka.client.keystore.jks keystore-password: pass truststore-location: /tmp/kafka.client.truststore.jks truststore-password: pass 但是,当我启动应用程序(在docker容器中)时,它说: Caused by: java.lang.IllegalStateException: Resource ‘class path resource [tmp/kafka.client.keystore.jks]’ must be on a file system [..] Caused by: java.io.FileNotFoundException: class path resource [tmp/kafka.client.keystore.jks] cannot be resolved to URL because it does not exist 我检查了容器,/ tmp中有.jks。 […]

java.lang.IllegalArgumentException:尚未正确配置Jetty ALPN / NPN

获取java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been properly configured ,同时使用gRPC(google pub/sub)发布/使用来自Kafka的消息。

kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException

我正在为kafka运行一个简单的消费者,例如: int timeout = 80000; int bufferSize = 64*1024; consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName); 这运行好几个小时,但我稍后在kafka.consumer.SimpleConsumer上得到一个例外:由于套接字错误重新连接: java.nio.channels.ClosedChannelException 和消费者停止……以前有人遇到过这个问题吗?

Kafka KStream – 使用带窗口的AbstractProcessor

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储。 我期待看到.punctuate()大约每30秒调用一次。 我得到的是保存在这里 。 (原始文件长达数千行) 总结 – .punctuate()看似随机然后重复调用。 它似乎不符合通过ProcessorContext.schedule()设置的值。 编辑: 另一次运行相同的代码大约每四分钟调用一次.punctuate() 。 这次我没有看到疯狂的重复值。 来源没有变化 – 只是结果不同。 使用以下代码: 主要 StreamsConfig streamsConfig = new StreamsConfig(config); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream lines = kStreamBuilder.stream(TOPIC); lines.process(new BPS2()); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 处理器 public class BP2 extends AbstractProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class); […]

在使用Java创建之前检查kafka中是否存在主题

我试图通过使用以下方法在kafka 0.8.2中创建一个主题: AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties); 如果我在本地多次运行代码进行测试,则会因为主题已经创建而失败。 有没有办法在创建主题之前检查主题是否存在? TopicCommand api似乎没有为listTopics或describeTopic返回任何内容。

Kafka Consumer挂在java的.hasNext

我在java中有一个简单的Kafka Consumer,代码如下 public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()&& !done){ try { System.out.println(“Parsing data”); byte[] data = it.next().message(); System.out.println(“Found data: “+data); values.add(data); // array list } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } done = true; } 发布消息时,会成功读取数据,但是当它返回到检查it.hasNext()时,它会保持挂起状态并且永远不会返回。 什么可能拖延这个? m_stream是一个获得如下的KafkaStream: Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream>> consumerMap = […]

Apache Kafka中消费者消费消息的延迟

我正在使用Kafka 0.8.0并试图实现下面提到的场景。 JCA API(作为生产者并发送数据)—–> Consumer ——> HBase 一旦我使用JCA客户端获取数据,我就会将每条消息发送给消费者。 例如,一旦生产者发送消息no.1,我想从消费者那里获取相同的信息并在HBase中“put”。 但是我的消费者在一些随机的消息之后开始获取消息。 我想让生产者和消费者同步,以便他们两个开始一起工作。 我用过: 1经纪人 1个单一主题 1个单一生产者和高级消费者 任何人都可以建议我需要做些什么才能达到同样的目的? 编辑: 添加一些相关的代码段。 Consumer.java public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; PrintWriter pw = null; int t = 0; StringDecoder kd = new StringDecoder(null); Map topicCountMap = new HashMap(); Map<String, List<KafkaStream>> consumerMap; KafkaStream […]