用spark分析日志文件?

我编写了程序来分析日志,其目的是过滤/打印日志中的错误语句

String master = "local[*]"; String inputFilePath = "C:/test.log"; SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD stringRDD = context.textFile(inputFilePath); stringRDD.filter(text -> text.contains("ERROR")) .collect().forEach(result -> System.out.println(result)); 

但是日志文件正在由不同的进程连续写入。 这是时间线示例

  1. 在T1,日志文件中存在10行
  2. 在T2(5秒后),再加入5行
  3. 在T3(5秒后),再加入7行

现在我的程序应该在5秒后读取文件并仅从新添加的行打印错误语句。 我是否应该手动生成每隔5秒钟继续读取的线程,或者是否有更好的火花方式?

更新: –

基于谷歌我尝试下面但没有帮助

 SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); //JavaSparkContext context = new JavaSparkContext(conf); JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10)); JavaDStream stringRDD = streamingContext.textFileStream(inputFilePath); stringRDD.filter(text -> text.contains("ERROR")).foreachRDD(result -> System.out.println(result));