Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组。 让我们假设一个非常简单的Tuple1 。 元组可以在其值字段中具有任意值(例如,’P1’,’P2’等)。 可能值的集合是有限的,但我事先并不知道全集(因此可能存在’P362’)。 我想根据元组内部的值将该元组写入某个输出位置。 所以我希望有以下文件结构:

  • /output/P1
  • /output/P2

在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere") ),但没有办法让数据的内容决定数据实际结束的位置。

我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的)。

可以使用Flink API完成,如果是这样,怎么做? 如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?

更新

按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化后将元组写入相应的文件。 我把它放在这里供参考,也许对其他人有用:

 public class SiftingSinkFunction extends RichSinkFunction { private final OutputSelector outputSelector; private final MapFunction serializationFunction; private final String basePath; Map<String, TextOutputFormat> formats = new HashMap(); /** * @param outputSelector the selector which determines into which output(s) a record is written. * @param serializationFunction a function which serializes the record to a string. * @param basePath the base path for writing the records. It will be appended with the output selector. */ public SiftingSinkFunction(OutputSelector outputSelector, MapFunction serializationFunction, String basePath) { this.outputSelector = outputSelector; this.serializationFunction = serializationFunction; this.basePath = basePath; } @Override public void invoke(IT value) throws Exception { // find out where to write. Iterable selection = outputSelector.select(value); for (String s : selection) { // ensure we have a format for this. TextOutputFormat destination = ensureDestinationExists(s); // then serialize and write. destination.writeRecord(serializationFunction.map(value)); } } private TextOutputFormat ensureDestinationExists(String selection) throws IOException { // if we know the destination, we just return the format. if (formats.containsKey(selection)) { return formats.get(selection); } // create a new output format and initialize it from the context. TextOutputFormat format = new TextOutputFormat(new Path(basePath, selection)); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); format.configure(context.getTaskStubParameters()); format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); // put it into our map. formats.put(selection, format); return format; } @Override public void close() throws IOException { Exception lastException = null; try { for (TextOutputFormat format : formats.values()) { try { format.close(); } catch (Exception e) { lastException = e; format.tryCleanupOnError(); } } } finally { formats.clear(); } if (lastException != null) { throw new IOException("Close failed.", lastException); } } } 

您可以实现自定义接收器。 inheritance以下两者之一:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

在你的程序中使用:

 stream.addSink(SinkFunction sinkFunction); 

而不是stream.writeCsv("/output/somewhere")