如何动态地将主题传递给kafka监听器?

几天后,我正在尝试将主题动态传递给Kafka监听器,而不是通过Java DSL中的密钥使用它们。 周围的人之前做过这个或者可以说明实现这个目标的最佳方法是什么?

你不能“动态地将主题传递给Kafka听众”; 您必须以编程方式创建一个侦听器容器。

你可以动态地在运行时更改主题!!!!

 @Component public class StoppingErrorHandler implements ErrorHandler { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Override public void handle(Exception thrownException, ConsumerRecord record) { ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer)kafkaListenerEndpointRegistry.getListenerContainer("fence"); ContainerProperties cp=listenerContainer.getContainerProperties(); String[] topics =cp.getTopics(); topics[0]="gaonb"; listenerContainer.stop(); listenerContainer.start(); } } 

这是一个有效的解决方案:

 // Start brokers without using the "@KafkaListener" annotation Map consumerProps = consumerProps("my-srv1:9092", "my-group", "false"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); ContainerProperties containerProperties = new ContainerProperties("my-topic"); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties); final BlockingQueue> records = new LinkedBlockingQueue<>(); container.setupMessageListener((MessageListener) record -> { log.error("Message received: " + record); records.add(record); }); container.start(); /** * Set up test properties for an {@code } consumer. * @param brokersCommaSep the bootstrapServers property (comma separated servers). * @param group the group id. * @param autoCommit the auto commit. * @return the properties. */ public static Map consumerProps(String brokersCommaSep, String group, String autoCommit) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersCommaSep); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } 

希望它可以提供帮助。