Tag: apache flink

FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群中读取

我想从FLINK中的多个KAFKA集群中读取数据。 但结果是kafkaMessageStream只从第一个Kafka读取。 只有当我为Kafka分别有2个流时,我才能从两个Kafka集群中读取,这不是我想要的。 是否可以将多个源连接到单个阅读器。 示例代码 public class KafkaReader implements Reader{ private StreamExecutionEnvironment executionEnvironment ; public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){ executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500)); executionEnvironment.enableCheckpointing( Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,”5000″)), CheckpointingMode.EXACTLY_ONCE); executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000); //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //try { // executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH))); // The RocksDBStateBackend or The FsStateBackend //} catch (IOException e) { // LOGGER.error(“Exception during initialization of stateBackend in execution environment”+e.getMessage()); […]

Apache Flink – 在作业中无法识别自定义Java选项

我已将以下行添加到flink-conf.yaml: env.java.opts:“ – Ddy.props.path = / PATH / TO / PROPS / FILE” 当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – JVM Options: 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Xms256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Xmx256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -XX:MaxPermSize=256m 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager – -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log 2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager […]

Apache光束计数器/度量标准在Flink WebUI中不可用

我正在使用Flink 1.4.1和Beam 2.3.0,并且想知道是否可以在Flink WebUI(或任何地方)中提供指标,如Dataflow WebUI中那样? 我用过像这样的柜台: import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; … Counter elementsRead = Metrics.counter(getClass(), “elements_read”); … elementsRead.inc(); 但我找不到Flink WebUI中任何地方可用的”elements_read”计数(任务指标或累加器)。 我认为在BEAM-773之后这将是直截了当的。

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组。 让我们假设一个非常简单的Tuple1 。 元组可以在其值字段中具有任意值(例如,’P1’,’P2’等)。 可能值的集合是有限的,但我事先并不知道全集(因此可能存在’P362’)。 我想根据元组内部的值将该元组写入某个输出位置。 所以我希望有以下文件结构: /output/P1 /output/P2 在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv(“/output/somewhere”) ),但没有办法让数据的内容决定数据实际结束的位置。 我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的)。 可以使用Flink API完成,如果是这样,怎么做? 如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西? 更新 按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化后将元组写入相应的文件。 我把它放在这里供参考,也许对其他人有用: public class SiftingSinkFunction extends RichSinkFunction { private final OutputSelector outputSelector; private final MapFunction serializationFunction; private final String basePath; Map<String, TextOutputFormat> formats = new HashMap(); /** * @param outputSelector the selector which determines into which output(s) a […]

flink – 使用匕首注射 – 不可序列化?

我使用Flink(最新通过git)从kafka流到cassandra。 为了简化unit testing我通过Dagger添加dependency injection。 ObjectGraph似乎正在正确设置自己,但是’内部对象’被Flink标记为“不可序列化”。 如果我直接包含这些对象,它们会起作用 – 那么区别是什么? 有问题的类实现了MapFunction和@Inject一个用于cassandra的模块和一个用于读取配置文件的模块。 有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能? 编辑: fwiw – dependency injection(通过匕首)和RichMapFunction不能共存。 Dagger不允许您包含任何在其定义中扩展的对象。 进一步: 通过Dagger Lazy 实例化的对象也不会序列化。 线程“main”中的exceptionorg.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化 … 引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1

Apache Flink:如何计算DataStream中的事件总数

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和事件没有多少。 我是通过在joinedEventDataStream上使用map来joinedEventDataStream ,如下所示 joinedEventDataStream.map(new RichMapFunction() { @Override public Object map(JoinedEvent joinedEvent) throws Exception { number_of_joined_events += 1; return null; } }); 问题1:这是计算流中事件数量的适当方法吗? 问题2:我注意到有线行为,有些人可能不相信。 问题是,当我在IntelliJ IDE中运行我的Flink程序时,它会向我显示number_of_joined_events正确值,但在我将此程序设置为jar时会显示0 。 所以当我将程序作为jar文件而不是实际计数运行时,我得到number_of_joined_events的初始值。 为什么只在jar文件提交而不在IDE中才会发生这种情况?

如何在kafka 0.9.0中使用multithreading消费者?

卡夫卡的文件给出了以下描述的方法: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例。 我的代码: public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final CloudKafkaConsumer consumer; private final String topicName; public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { this.consumer = consumer; this.topicName = topicName; } @Override public void run() { try { this.consumer.subscribe(topicName); ConsumerRecords records; while (!closed.get()) { synchronized (consumer) { records = […]

Flink中的java.lang.NoSuchMethodError

我试图使用以下方法读取文件: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet line = env.readTextFile(“file:///pathtofile/myfile.txt”); 我收到以下错误: java.lang.NoSuchMethodError: org.apache.flink.api.common.io.DelimitedInputFormat: method (Lorg/apache/flink/core/fs/Path;)V not found 我使用的是flink 1.3.2版,java版“1.8.0_91”