如何使Spark Streaming计算unit testing中文件中的单词?

我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中的HdfsCount示例 。

当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数。 我按Ctrl + C终止应用程序。

现在我已经尝试为这个function创建一个非常基本的unit testing,但是在测试中我无法打印相同的信息,即单词的数量。

我错过了什么?

下面是unit testing文件,之后我还包含了显示countWords方法的代码片段:

StarterAppTest.java

import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext("local", "test", new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void tearDown() { ssc.stop(); ssc = null; } @Test public void testInitialization() { Assert.assertNotNull(ssc.sc()); } @Test public void testCountWords() { StarterApp starterApp = new StarterApp(); try { JavaDStream lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStream wordCounts = starterApp.countWords(lines); ssc.start(); File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt"); PrintWriter writer = new PrintWriter(tmpFile, "UTF-8"); writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin"); writer.close(); System.err.println("===== Word Counts ======="); wordCounts.print(); System.err.println("===== Word Counts ======="); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Assert.assertTrue(true); } } 

这个测试编译并开始运行,Spark Streaming在控制台上打印了很多诊断消息,但是对wordCounts.print()的调用不会打印任何东西,而在StarterApp.java本身,它们会打印。

我也尝试过添加ssc.awaitTermination();ssc.start()但在这方面没有任何改变。 之后我还试图在这个Spark Streaming应用程序正在检查的目录中手动创建一个新文件,但这次它给出了一个错误。

为了完整性,下面是wordCounts方法:

 public JavaPairDStream countWords(JavaDStream lines) { JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream wordCounts = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }).reduceByKey((i1, i2) -> i1 + i2); return wordCounts; } 

几个指针:

  • 为SparkStreaming上下文提供至少2个内核。 1表示Streaming,1表示Spark处理。 “本地” – >“本地[2]”
  • 您的流间隔时间为3000毫秒,因此您的程序中的某个位置需要等待 – 至少 – 该时间以期望输出。
  • Spark Streaming需要一些时间来设置侦听器。 在发出ssc.start后立即创建该文件。 文件系统监听器已经到位,无法保证。 我会在ssc.start之后做一些sleep(xx)

在Streaming中,所有关于正确的时机。