Spark Combinebykey JAVA lambda表达式

我想使用lambda函数来计算( JavaPairRDD pairs )的密钥的平均值。 出于这个原因,我开发了以下代码:

 java.util.function.Function<Double, Tuple2> createAcc = x -> new Tuple2(x, 1); BiFunction<Tuple2, Double, Tuple2> addAndCount = (Tuple2 x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); }; BiFunction<Tuple2, Tuple2, Tuple2> combine = (Tuple2 x, Tuple2 y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); }; JavaPairRDD<Integer, Tuple2> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine); 

但是, eclipse显示了这个错误

 The method combineByKey(Function, Function2, Function2) in the type JavaPairRDD is not applicable for the arguments (Function<Double,Tuple2>, BiFunction<Tuple2,Double,Tuple2>, BiFunction<Tuple2,Tuple2,Tuple2>) 

combineByKey方法需要org.apache.spark.api.java.function.Function2而不是java.util.function.BiFunction 。 所以要么你写:

 java.util.function.Function> createAcc = x -> new Tuple2(x, 1); Function2, Double, Tuple2> addAndCount = (Tuple2 x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); }; Function2, Tuple2, Tuple2> combine = (Tuple2 x, Tuple2 y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); }; JavaPairRDD> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine); 

解释是内联注释

 List> intList = new ArrayList<>(); intList.add(Arrays.asList(1,2)); intList.add(Arrays.asList(5,6)); intList.add(Arrays.asList(3,8)); intList.add(Arrays.asList(5,7)); intList.add(Arrays.asList(3,4)); System.out.println(intList); //[[1, 2], [5, 6], [3, 8], [5, 7], [3, 4]] JavaRDD> intRdd = jsc.parallelize(intList); JavaPairRDD> key_ValueSumsAndEncounters = intPairRdd.combineByKey( /* Lambda 1 argument: a combiner for newly encountered key - When key k is first time encountered, combiner returns tuple with content: (value associated k, 1). - Second element in the tuple is 1, since we have encountered first time. - For example, when we encounter (k,v) = (1,4), combiner will return (1,(4,1)) */ v -> new Tuple2(v, 1) /* Lambda 2 argument: a combiner for combining value for subsequent encounters (2nd and afterwards) of key - When key k is encountered for 2nd (or more) time, combiner returns tuple with content: (earlier addition result from combiner + value associated with k , earlier number of encounters from combiner + 1) - For example, when we encounter (1,6) and we have earlier combiner result (1,(4,1)), this combiner will return (1,(4+6,1+1)) = (1,(10,2)) */ , (c1, v) -> new Tuple2(c1._1 + v, c1._2 + 1) /* Lambda 3 argument: a combiner for combining two combiners across different partitions - Combiner returns tuple with content: (addition result from combiner1 + addition result from combiner2 , number of encounters from combiner1 + number of encounters from combiner2) - For example, if we have combiner1 from partition1 = (1,(10,2)) combiner2 from partition2 = (1,(15,4)) then this combiner will return (1,(10+15,2+4)) = (1,(25,6)) */ , (c1, c2) -> new Tuple2(c1._1 + c2._1, c1._2 + c2._2) ); System.out.println(key_ValueSumsAndEncounters.collect()); //[(1,(2,1)), (3,(12,2)), (5,(13,2))] //kse for (key,(sum,encounters)) JavaRDD> key_avg = key_ValueSumsAndEncounters.map(kse -> new Tuple2(kse._1, ((double)kse._2._1/kse._2._2))); System.out.println(key_avg.collect()); //[(1,2.0), (3,6.0), (5,6.5)]