如何在spark数据框中展平结构?

我有一个具有以下结构的dataframe:

|-- data: struct (nullable = true) | |-- id: long (nullable = true) | |-- keyNote: struct (nullable = true) | | |-- key: string (nullable = true) | | |-- note: string (nullable = true) | |-- details: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) 

如何展平结构并创建新的数据框:

  |-- id: long (nullable = true) |-- keyNote: struct (nullable = true) | |-- key: string (nullable = true) | |-- note: string (nullable = true) |-- details: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) 

有什么像爆炸,但结构?

这应该适用于Spark 1.6或更高版本:

 df.select(df.col("data.*")) 

要么

 df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details")) 

这是一个正在执行您想要的function的函数,它可以处理包含具有相同名称的列的多个嵌套列:

 def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df 

之前:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) 

后:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true) 

一种简单的方法是使用SQL,您可以构建一个SQL查询字符串,将嵌套列别名为平面列。

  1. 检索dataframe架构(df.schema())
  2. 将模式转换为SQL(for(field:schema()。fields())….
  3. 查询“val newDF = sqlContext.sql(”SELECT“+ sqlGenerated +”FROM source“)

Java中的一个例子:

https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(我更喜欢SQL方式,所以你可以在Spark-shell上轻松测试它,它是跨语言的)。

我对stecos的解决方案进行了更广泛的推广,因此可以在两个以上的结构层深度上进行展平:

 def flatten_df(nested_df, layers): flat_cols = [] nested_cols = [] flat_df = [] flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']) nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']) flat_df.append(nested_df.select(flat_cols[0] + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols[0] for c in nested_df.select(nc+'.*').columns]) ) for i in range(1, layers): print (flat_cols[i-1]) flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct']) nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct']) flat_df.append(flat_df[i-1].select(flat_cols[i] + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols[i] for c in flat_df[i-1].select(nc+'.*').columns]) ) return flat_df[-1] 

只需打电话:

 my_flattened_df = flatten_df(my_df_having_nested_structs, 3) 

(第二个参数是要展平的图层级别,在我的例子中是3)