Tag: flink streaming

FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群中读取

我想从FLINK中的多个KAFKA集群中读取数据。 但结果是kafkaMessageStream只从第一个Kafka读取。 只有当我为Kafka分别有2个流时,我才能从两个Kafka集群中读取,这不是我想要的。 是否可以将多个源连接到单个阅读器。 示例代码 public class KafkaReader implements Reader{ private StreamExecutionEnvironment executionEnvironment ; public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){ executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500)); executionEnvironment.enableCheckpointing( Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,”5000″)), CheckpointingMode.EXACTLY_ONCE); executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000); //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //try { // executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH))); // The RocksDBStateBackend or The FsStateBackend //} catch (IOException e) { // LOGGER.error(“Exception during initialization of stateBackend in execution environment”+e.getMessage()); […]

Apache Flink – 在作业中无法识别自定义Java选项

我已将以下行添加到flink-conf.yaml: env.java.opts:“ – Ddy.props.path = / PATH / TO / PROPS / FILE” 当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – JVM Options: 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Xms256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Xmx256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -XX:MaxPermSize=256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager […]

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组。 让我们假设一个非常简单的Tuple1 。 元组可以在其值字段中具有任意值(例如,’P1’,’P2’等)。 可能值的集合是有限的,但我事先并不知道全集(因此可能存在’P362’)。 我想根据元组内部的值将该元组写入某个输出位置。 所以我希望有以下文件结构: /output/P1 /output/P2 在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv(“/output/somewhere”) ),但没有办法让数据的内容决定数据实际结束的位置。 我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的)。 可以使用Flink API完成,如果是这样,怎么做? 如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西? 更新 按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化后将元组写入相应的文件。 我把它放在这里供参考,也许对其他人有用: public class SiftingSinkFunction extends RichSinkFunction { private final OutputSelector outputSelector; private final MapFunction serializationFunction; private final String basePath; Map<String, TextOutputFormat> formats = new HashMap(); /** * @param outputSelector the selector which determines into which output(s) a […]

Apache Flink:如何计算DataStream中的事件总数

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和事件没有多少。 我是通过在joinedEventDataStream上使用map来joinedEventDataStream ,如下所示 joinedEventDataStream.map(new RichMapFunction() { @Override public Object map(JoinedEvent joinedEvent) throws Exception { number_of_joined_events += 1; return null; } }); 问题1:这是计算流中事件数量的适当方法吗? 问题2:我注意到有线行为,有些人可能不相信。 问题是,当我在IntelliJ IDE中运行我的Flink程序时,它会向我显示number_of_joined_events正确值,但在我将此程序设置为jar时会显示0 。 所以当我将程序作为jar文件而不是实际计数运行时,我得到number_of_joined_events的初始值。 为什么只在jar文件提交而不在IDE中才会发生这种情况?