在Java 1.8中找到spark groupBy中的部门平均值
我有一个下面的数据集,第一列是部门,第二列是工资。 我想按部门计算工资的平均值。
IT 2000000 HR 2000000 IT 1950000 HR 2200000 Admin 1900000 IT 1900000 IT 2200000
我在下面进行了操作
JavaPairRDD<String, Iterable> rddY = employees.groupByKey(); System.out.println("" + rddY.collect());
得到以下输出:
[(IT,[2000000, 1950000, 1900000, 2200000]), (HR,[2000000, 2200000]), (Admin,[1900000])]
我需要的是
-
我想用spark RDD计算总平均值和部门平均值。
-
如何在spark中使用groupBy函数来计算平均值。
下面是使用Spark JavaPairRDD按键计算平均值的代码。 希望这可以帮助。
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkAverageCalculation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Average Calculation").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); //inputList List> inputList = new ArrayList>(); inputList.add(new Tuple2("a1", 30)); inputList.add(new Tuple2("b1", 30)); inputList.add(new Tuple2("a1", 40)); inputList.add(new Tuple2("a1", 20)); inputList.add(new Tuple2("b1", 50)); //parallelizePairs JavaPairRDD pairRDD = sc.parallelizePairs(inputList); //count each values per key JavaPairRDD> valueCount = pairRDD.mapValues(value -> new Tuple2(value,1)); //add values by reduceByKey JavaPairRDD> reducedCount = valueCount.reduceByKey((tuple1,tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); //calculate average JavaPairRDD averagePair = reducedCount.mapToPair(getAverageByKey); //print averageByKey averagePair.foreach(data -> { System.out.println("Key="+data._1() + " Average=" + data._2()); }); //stop sc sc.stop(); sc.close(); } private static PairFunction>,String,Integer> getAverageByKey = (tuple) -> { Tuple2 val = tuple._2; int total = val._1; int count = val._2; Tuple2 averagePair = new Tuple2(tuple._1, total / count); return averagePair; }; }
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import scala.Tuple2; import java.util.Map; public class ElasticsearchMetricProcessor { private static final String ES_HOST_PORT = "localhost:9200"; private static PairFunction>,String,Long> getAverageByKey = (tuple) -> { Tuple2 val = tuple._2; long total = val._1; int count = val._2; Tuple2 averagePair = new Tuple2(tuple._1, total / count); return averagePair; }; public static void main(String args[]) throws InterruptedException { System.setProperty("hadoop.home.dir","C:\\Users\\anki\\metering\\winutils"); SparkConf sparkConf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]"); sparkConf.set("es.nodes.wan.only","false"); sparkConf.set("es.nodes",ES_HOST_PORT); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); JavaRDD
产量
Key = b37cacc3-71d5-40f0-a329-a051a3949ced Average = 50 Key = a37cacc3-71d5-40f0-a329-a051a3949ced Average = 37
以上解决方案在java中,对于任何正在寻找scala的人来说,都可以尝试以下解决方案。
val mapp = data.map(x => x.split(" ")) val dept = mapp.map( x => (x(0),(x(1).toInt,1))) val avg = dept.reduceByKey((x,y) => ((x._1+y._1),(x._2+y._2))) val count = avg.mapValues{case (x,y) => x/y} count.foreach(println)
产量
(Admin,1900000) (HR,4200000) (IT,8050000)