在Java 1.8中找到spark groupBy中的部门平均值

我有一个下面的数据集,第一列是部门,第二列是工资。 我想按部门计算工资的平均值。

IT 2000000 HR 2000000 IT 1950000 HR 2200000 Admin 1900000 IT 1900000 IT 2200000 

我在下面进行了操作

 JavaPairRDD<String, Iterable> rddY = employees.groupByKey(); System.out.println("" + rddY.collect()); 

得到以下输出:

 [(IT,[2000000, 1950000, 1900000, 2200000]), (HR,[2000000, 2200000]), (Admin,[1900000])] 

我需要的是

  1. 我想用spark RDD计算总平均值和部门平均值。

  2. 如何在spark中使用groupBy函数来计算平均值。

下面是使用Spark JavaPairRDD按键计算平均值的代码。 希望这可以帮助。

 import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkAverageCalculation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Average Calculation").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); //inputList List> inputList = new ArrayList>(); inputList.add(new Tuple2("a1", 30)); inputList.add(new Tuple2("b1", 30)); inputList.add(new Tuple2("a1", 40)); inputList.add(new Tuple2("a1", 20)); inputList.add(new Tuple2("b1", 50)); //parallelizePairs JavaPairRDD pairRDD = sc.parallelizePairs(inputList); //count each values per key JavaPairRDD> valueCount = pairRDD.mapValues(value -> new Tuple2(value,1)); //add values by reduceByKey JavaPairRDD> reducedCount = valueCount.reduceByKey((tuple1,tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); //calculate average JavaPairRDD averagePair = reducedCount.mapToPair(getAverageByKey); //print averageByKey averagePair.foreach(data -> { System.out.println("Key="+data._1() + " Average=" + data._2()); }); //stop sc sc.stop(); sc.close(); } private static PairFunction>,String,Integer> getAverageByKey = (tuple) -> { Tuple2 val = tuple._2; int total = val._1; int count = val._2; Tuple2 averagePair = new Tuple2(tuple._1, total / count); return averagePair; }; } 
 import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import scala.Tuple2; import java.util.Map; public class ElasticsearchMetricProcessor { private static final String ES_HOST_PORT = "localhost:9200"; private static PairFunction>,String,Long> getAverageByKey = (tuple) -> { Tuple2 val = tuple._2; long total = val._1; int count = val._2; Tuple2 averagePair = new Tuple2(tuple._1, total / count); return averagePair; }; public static void main(String args[]) throws InterruptedException { System.setProperty("hadoop.home.dir","C:\\Users\\anki\\metering\\winutils"); SparkConf sparkConf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]"); sparkConf.set("es.nodes.wan.only","false"); sparkConf.set("es.nodes",ES_HOST_PORT); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); JavaRDD> esRDD = JavaEsSpark.esRDD(jsc.sparkContext(), "portal_analytics/report-execution").values(); JavaPairRDD> valueCount = esRDD.mapToPair( x -> new Tuple2(x.get("id").toString(),Long.valueOf(x.get("duration").toString()))).mapValues(value -> new Tuple2(value,1)); JavaPairRDD> reducedCount = valueCount.reduceByKey((tuple1,tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); //calculate average JavaPairRDD averagePair = reducedCount.mapToPair(getAverageByKey); //print averageByKey averagePair.foreach(data -> { System.out.println("Key="+data._1() + " Average=" + data._2()); }); //stop sc jsc.stop(); jsc.close(); } } -------------------------------------------------------- Elasticsearch Test Data { "took": 3, "timed_out": false, "_shards": { "total": 3, "successful": 3, "failed": 0 }, "hits": { "total": 16, "max_score": 1, "hits": [ { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8aPGm2uMcgoWFwdx", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8eOcm2uMcgoWFwd3", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvTL5ACm2uMcgoWFweC", "_score": 1, "_source": { "type": "report-execution", "id": "b37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 70 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvTL96Xm2uMcgoWFweD", "_score": 1, "_source": { "type": "report-execution", "id": "b37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvTNrKPm2uMcgoWFweF", "_score": 1, "_source": { "type": "report-execution", "id": "b37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8dWFm2uMcgoWFwdy", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8dlim2uMcgoWFwdz", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8d7am2uMcgoWFwd1", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8eX0m2uMcgoWFwd4", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 30 } }, { "_index": "portal_analytics", "_type": "report-execution", "_id": "AVvS8nplm2uMcgoWFwd7", "_score": 1, "_source": { "type": "report-execution", "id": "a37cacc3-71d5-40f0-a329-a051a3949ced", "date-time": 1475733719123, "tenant": "default", "user": "317f1e761f2faa8da781a4762b9dcc2c5cad209a", "report": "72efd670-bb95-11e5-632f-54ee7539b24c", "duration": 50 } } ], } } 

产量

Key = b37cacc3-71d5-40f0-a329-a051a3949ced Average = 50 Key = a37cacc3-71d5-40f0-a329-a051a3949ced Average = 37

以上解决方案在java中,对于任何正在寻找scala的人来说,都可以尝试以下解决方案。

 val mapp = data.map(x => x.split(" ")) val dept = mapp.map( x => (x(0),(x(1).toInt,1))) val avg = dept.reduceByKey((x,y) => ((x._1+y._1),(x._2+y._2))) val count = avg.mapValues{case (x,y) => x/y} count.foreach(println) 

产量

 (Admin,1900000) (HR,4200000) (IT,8050000)