用spark分析日志文件?
我编写了程序来分析日志,其目的是过滤/打印日志中的错误语句
String master = "local[*]"; String inputFilePath = "C:/test.log"; SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD stringRDD = context.textFile(inputFilePath); stringRDD.filter(text -> text.contains("ERROR")) .collect().forEach(result -> System.out.println(result));
但是日志文件正在由不同的进程连续写入。 这是时间线示例
- 在T1,日志文件中存在10行
- 在T2(5秒后),再加入5行
- 在T3(5秒后),再加入7行
现在我的程序应该在5秒后读取文件并仅从新添加的行打印错误语句。 我是否应该手动生成每隔5秒钟继续读取的线程,或者是否有更好的火花方式?
更新: –
基于谷歌我尝试下面但没有帮助
SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); //JavaSparkContext context = new JavaSparkContext(conf); JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10)); JavaDStream stringRDD = streamingContext.textFileStream(inputFilePath); stringRDD.filter(text -> text.contains("ERROR")).foreachRDD(result -> System.out.println(result));
- 为什么SparkSession为一个动作执行两次?
- SparkSQL并在Java中的DataFrame上爆炸
- Apache Spark – 添加两列
- 使用Java从另一个应用程序部署Apache Spark应用程序,这是最佳实践
- 如果我在Spark中缓存两次相同的RDD会发生什么
- 在google dataproc集群实例中的spark-submit上运行app jar文件
- Spark – foreach Vs foreachPartitions何时使用什么?
- 将JavaRDD转换为DataFrame时出现Spark错误:java.util.Arrays $ ArrayList不是数组模式的有效外部类型
- Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行