如何在spark数据框中展平结构?
我有一个具有以下结构的dataframe:
|-- data: struct (nullable = true) | |-- id: long (nullable = true) | |-- keyNote: struct (nullable = true) | | |-- key: string (nullable = true) | | |-- note: string (nullable = true) | |-- details: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true)
如何展平结构并创建新的数据框:
|-- id: long (nullable = true) |-- keyNote: struct (nullable = true) | |-- key: string (nullable = true) | |-- note: string (nullable = true) |-- details: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
有什么像爆炸,但结构?
这应该适用于Spark 1.6或更高版本:
df.select(df.col("data.*"))
要么
df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))
这是一个正在执行您想要的function的函数,它可以处理包含具有相同名称的列的多个嵌套列:
def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df
之前:
root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true)
后:
root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true)
一种简单的方法是使用SQL,您可以构建一个SQL查询字符串,将嵌套列别名为平面列。
- 检索dataframe架构(df.schema())
- 将模式转换为SQL(for(field:schema()。fields())….
- 查询“val newDF = sqlContext.sql(”SELECT“+ sqlGenerated +”FROM source“)
Java中的一个例子:
https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48
(我更喜欢SQL方式,所以你可以在Spark-shell上轻松测试它,它是跨语言的)。
我对stecos的解决方案进行了更广泛的推广,因此可以在两个以上的结构层深度上进行展平:
def flatten_df(nested_df, layers): flat_cols = [] nested_cols = [] flat_df = [] flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']) nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']) flat_df.append(nested_df.select(flat_cols[0] + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols[0] for c in nested_df.select(nc+'.*').columns]) ) for i in range(1, layers): print (flat_cols[i-1]) flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct']) nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct']) flat_df.append(flat_df[i-1].select(flat_cols[i] + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols[i] for c in flat_df[i-1].select(nc+'.*').columns]) ) return flat_df[-1]
只需打电话:
my_flattened_df = flatten_df(my_df_having_nested_structs, 3)
(第二个参数是要展平的图层级别,在我的例子中是3)
- 我应该将变量保留为瞬态变量吗?
- 使用–jars的spark-submit yarn-cluster不起作用?
- Spark ML Pipeline api保存不起作用
- Java中Spark MLlib中的矩阵运算
- Bluemix Spark与Java
- 线程主java.lang.exceptionininitializerError中的exception当没有hadoop安装spark时
- java + spark:org.apache.spark.SparkException:作业已中止:任务不可序列化:java.io.NotSerializableException
- 多节点hadoop集群中的Apache Spark Sql问题
- 将JavaPairRDD转换为JavaRDD