是否存在查看相邻元素的RDD转换函数?

有人知道在转换过程中是否有办法查看已排序的RDD中的相邻元素? 我知道我可以收集然后执行下面示例中的操作,但是它有点破坏了分布式系统的目的,我试图利用它分布的事实。

例:

RDD of(string name,int val)映射到RDD(string name,int val,int diff)

这样:

name | val becomes -> name | val | diff (current - prior) a | 3 a | 3 | 3 b | 6 b | 6 | 3 c | 4 c | 4 | -2 d | 20 d | 20 | 16 

最有效的方法可能是最简单的方法是将RDD转换为dataframe并使用滞后:

 case class NameValue(name: String, value: Int) val rdd = sc.parallelize( NameValue("a", 3) :: NameValue("b", 6) :: NameValue("c", 4) :: NameValue("d", 20) :: Nil) val df = sqlContext.createDataFrame(rdd) df.registerTempTable("df") sqlContext.sql("""SELECT name, value, value - lag(value) OVER (ORDER BY name, value) lag FROM df""").show 

不幸的是,此时没有PARTITION BY子句的窗口函数会将所有数据移动到单个分区,因此如果您有大型数据集,则它特别有用。

使用低级操作,您可以使用zipWithIndex然后使用flatMapgroupByKey

 case class NameValueWithLag(name: String, value: Int, lag: Int) val cnt = rdd.count() - 1 rdd. zipWithIndex. flatMap{case (x, i) => (0 to 1).map(lag => (i - lag, (i, x)))}. groupByKey. filter{ case (k, v) => k != cnt}. values. map(vals => { val sorted = vals.toArray.sortBy(_._1).map(_._2) if (sorted.length == 1) { NameValueWithLag(sorted(0).name, sorted(0).value, sorted(0).value) } else { NameValueWithLag( sorted(1).name, sorted(1).value, sorted(1).value - sorted(0).value ) } }) 

编辑:

如果您不介意使用开发人员API,可以尝试RDDFunctions.sliding但需要手动处理

 import org.apache.spark.mllib.rdd.RDDFunctions._ val first = rdd.first match { case NameValue(name, value) => NameValueWithLag(name, value, value) } sc.parallelize(Seq(first)).union(rdd .sliding(2) .map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))