在Dataflow Generic中进行转换

这与另一个SO问题[此处]( 设置自定义编码器和处理参数化类型 )有关。在解决方法之后,帮助我在变换中使用自定义类型。 但由于我的自定义类型是通用的,我希望甚至使变换类通用,然后可以使用相同的类型参数化自定义类型。 但是当我尝试这样做时,我遇到了无法为类型变量T提供编码器,因为实际类型由于擦除而未知 。 解决方案建议注册一个可以返回类型参数的编码器,但由于类型参数本身是未知的,我想这个exception会抛出,我不知道如何解决这个问题。

static class Processor extends PTransform<PCollection, PCollection<KV<String, Set<CustomType>>>> { private static final long serialVersionUID = 0; @Override public PCollection<KV<String, Set<CustomType>>> apply(PCollection items) { PCollection<KV<String, Set<CustomType>>> partitionedItems = items .apply(ParDo.of(new ParDoFn())); PCollection<KV<String, Set<CustomType>>> combinedItems = partitionedItems .apply(Combine.<String, Set<CustomType>>perKey(new Merger())); } } 

这看起来也是由Github问题#57引起的,应该与该问题一起修复。

与此同时,Dataflow实际上包含可以立即解决您的问题的高级function。 从您的代码段看来,整个系统可能看起来像这样:

 class CustomType { ... } class Processor extends PTransform, PCollection>>>> { class ParDoFn extends DoFn>>> { … } class Merger extends BinaryCombineFn>> { … } @Override public PCollection>>> apply(PCollection items) { PCollection>>> partitionedItems = items.apply(ParDo.of(new ParDoFn())); PCollection>>> combinedItems = partitionedItems.apply( Combine.>, Set>>perKey( new Merger())); return combinedItems; } } … PCollection input = ... input.apply(new Processor()); 

Dataflow通过使用getOutputTypeDescriptor返回的TypeDescriptor获取每个DoFn的输出类型

因为ParDoFnProcessor的内部类,所以输出类型描述符只是Set> ,即使它被实例化为新的Processor

为了获得类型信息,我们需要ParDoFn静态地知道为T提供的类型。 这有两个步骤。

1.创建Processor的匿名子类

 PCollection input = ... input.apply(new Processor() {}); 

这确保了对于此Processor实例的所有内部类,类型变量T 静态地绑定到String类型。 在这种情况下,最好将Processor设为抽象类,以便消费者对其进行子类化。

2.Override ParDoFn的getOutputTypeDescriptor以针对外部类Processor解析其类型。

 class Processor extends ... { class ParDoFn extends DoFn>>> { @Override protected TypeDescriptor>>> getOutputTypeDescriptor() { return new TypeDescriptor>>>( Processor.this.getClass()) {}; } } 

从一开始,代码的完整工作版本如下。 再次注意,当Github问题#57得到解决时,这些都不是必需的。

 class CustomType { ... } abstract class Processor extends PTransform, PCollection>>>> { class ParDoFn extends DoFn>>> { ... @Override protected TypeDescriptor>>> getOutputTypeDescriptor() { return new TypeDescriptor>>>( Processor.this.getClass()) {}; } } class Merger extends BinaryCombineFn>> { ... } @Override public PCollection>>> apply(PCollection items) { PCollection>>> partitionedItems = items.apply(ParDo.of(new ParDoFn())); PCollection>>> combinedItems = partitionedItems.apply( Combine.>, Set>>perKey( new Merger())); return combinedItems; } } PCollection input = …; input.apply(new Processor() {}); 

这不是唯一的解决方案 – 您也可以覆盖Processor.getDefaultOutputCoder或在中间partitionedItems集合上显式调用setCoder – 但它似乎是最常用的。