用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作。 基本上,我不明白如何编写编码器。

以下是地图操作的示例:

In the example below, instead of returning Dataset, I would like to return Dataset

 Dataset output = dataset1.flatMap(new FlatMapFunction() { @Override public Iterator call(Row row) throws Exception { ArrayList obj = //some map operation return obj.iterator(); } },Encoders.STRING()); 

据我所知,编码器需要编写如下代码:

  Encoder encoder = new Encoder() { @Override public StructType schema() { return join.schema(); //return null; } @Override public ClassTag clsTag() { return null; } }; 

但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)

编辑 – 这不是所提问题的副本: 尝试将dataframe行映射到更新行时的编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不这样做),我也在寻找用于Row类的编码器而不是解决错误。 最后,我一直在寻找Java解决方案,而不是Scala。

答案是使用TypeStruct使用RowEncoder和数据集的模式。

以下是使用数据集进行flatmap操作的工作示例:

  StructType structType = new StructType(); structType = structType.add("id1", DataTypes.LongType, false); structType = structType.add("id2", DataTypes.LongType, false); ExpressionEncoder encoder = RowEncoder.apply(structType); Dataset output = join.flatMap(new FlatMapFunction() { @Override public Iterator call(Row row) throws Exception { // a static map operation to demonstrate List data = new ArrayList<>(); data.add(1l); data.add(2l); ArrayList list = new ArrayList<>(); list.add(RowFactory.create(data.toArray())); return list.iterator(); } }, encoder); 

我有同样的问题… Encoders.kryo(Row.class))为我工作。

作为奖励,Apache Spark调优文档引用Kryo它,因为序列化速度更快“通常高达10倍”:

https://spark.apache.org/docs/latest/tuning.html