使用Java更新kafka中特定主题的TTL

更新主题的TTL ,以便记录在主题中保留10天。 我必须为特定主题执行此操作,只需将所有其他主题TTL保持相同,当前配置,我必须使用java执行此操作,因为我正在通过Java将主题推送到kafka 。 我正在设置以下属性以将主题推送到kafka

 Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_SERVERS); props.put("acks", ACKS); props.put("retries", RETRIES); props.put("linger.ms", new Integer(LINGER_MS)); props.put("buffer.memory", new Integer(BUFFER_MEMORY)); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

您可以使用AdminClient执行此操作,按照获取当前配置的代码片段(仅用于测试),然后在名为“ test ”的主题上更新“ retention.ms ”配置。

 Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient adminClient = AdminClient.create(props); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test"); // get the current topic configuration DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(resource)); Map config = describeConfigsResult.all().get(); System.out.println(config); // create a new entry for updating the retention.ms value on the same topic ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000"); Map updateConfig = new HashMap(); updateConfig.put(resource, new Config(Collections.singleton(retentionEntry))); AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig); alterConfigsResult.all(); describeConfigsResult = adminClient.describeConfigs(Collections.singleton(resource)); config = describeConfigsResult.all().get(); System.out.println(config); adminClient.close();