FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群中读取

我想从FLINK中的多个KAFKA集群中读取数据。

但结果是kafkaMessageStream只从第一个Kafka读取。

只有当我为Kafka分别2个流时,我才能从两个Kafka集群中读取,这不是我想要的。

是否可以将多个源连接到单个阅读器。

示例代码

public class KafkaReader implements Reader{ private StreamExecutionEnvironment executionEnvironment ; public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){ executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500)); executionEnvironment.enableCheckpointing( Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE); executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000); //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //try { // executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH))); // The RocksDBStateBackend or The FsStateBackend //} catch (IOException e) { // LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage()); } return executionEnvironment; } public DataStream readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema deserializationSchema) { DataStream kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08( properties_k1.getProperty(Constants.TOPIC),deserializationSchema, properties_k1)); executionEnvironment.addSource(new FlinkKafkaConsumer08( properties_k2.getProperty(Constants.TOPIC),deserializationSchema, properties_k2)); return kafkaMessageStream; } public DataStream readFromKafka(Properties properties,DeserializationSchema deserializationSchema) { DataStream kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08( properties.getProperty(Constants.TOPIC),deserializationSchema, properties)); return kafkaMessageStream; } 

}

我的电话:

  public static void main( String[] args ) throws Exception { Properties pk1 = new Properties(); pk1.setProperty(Constants.TOPIC, "flink_test"); pk1.setProperty("zookeeper.connect", "localhost:2181"); pk1.setProperty("group.id", "1"); pk1.setProperty("bootstrap.servers", "localhost:9092"); Properties pk2 = new Properties(); pk2.setProperty(Constants.TOPIC, "flink_test"); pk2.setProperty("zookeeper.connect", "localhost:2182"); pk2.setProperty("group.id", "1"); pk2.setProperty("bootstrap.servers", "localhost:9093"); Reader reader = new KafkaReader(); //Do not work StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); DataStream dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema()); DataStream transform = new TsvTransformer().transform(dataStream); transform.print(); //Works: StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); DataStream dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); DataStream dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); DataStream<Tuple2> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0) .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); DataStream<Tuple2> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0) .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); transform1.print(); transform2.print(); environment.execute("Kafka Reader"); } 

要解决此问题,我建议您为每个群集创建单独的FlinkKafkaConsumer实例(这就是您正在做的事情),然后将生成的流联合起来:

 StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); DataStream dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); DataStream dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); DataStream finalStream = dataStream1.union(dataStream2);