在Dataflow Generic中进行转换
这与另一个SO问题[此处]( 设置自定义编码器和处理参数化类型 )有关。在解决方法之后,帮助我在变换中使用自定义类型。 但由于我的自定义类型是通用的,我希望甚至使变换类通用,然后可以使用相同的类型参数化自定义类型。 但是当我尝试这样做时,我遇到了无法为类型变量T提供编码器,因为实际类型由于擦除而未知 。 解决方案建议注册一个可以返回类型参数的编码器,但由于类型参数本身是未知的,我想这个exception会抛出,我不知道如何解决这个问题。
static class Processor extends PTransform<PCollection, PCollection<KV<String, Set<CustomType>>>> { private static final long serialVersionUID = 0; @Override public PCollection<KV<String, Set<CustomType>>> apply(PCollection items) { PCollection<KV<String, Set<CustomType>>> partitionedItems = items .apply(ParDo.of(new ParDoFn())); PCollection<KV<String, Set<CustomType>>> combinedItems = partitionedItems .apply(Combine.<String, Set<CustomType>>perKey(new Merger())); } }
这看起来也是由Github问题#57引起的,应该与该问题一起修复。
与此同时,Dataflow实际上包含可以立即解决您的问题的高级function。 从您的代码段看来,整个系统可能看起来像这样:
class CustomType { ... } class Processor extends PTransform, PCollection>>>> { class ParDoFn extends DoFn>>> { … } class Merger extends BinaryCombineFn>> { … } @Override public PCollection>>> apply(PCollection items) { PCollection>>> partitionedItems = items.apply(ParDo.of(new ParDoFn())); PCollection>>> combinedItems = partitionedItems.apply( Combine.>, Set>>perKey( new Merger())); return combinedItems; } } … PCollection input = ... input.apply(new Processor ());
Dataflow通过使用getOutputTypeDescriptor
返回的TypeDescriptor
获取每个DoFn
的输出类型
因为ParDoFn
是Processor
的内部类,所以输出类型描述符只是Set
,即使它被实例化为新的Processor
。
为了获得类型信息,我们需要ParDoFn
静态地知道为T
提供的类型。 这有两个步骤。
1.创建Processor
的匿名子类
PCollection input = ... input.apply(new Processor () {});
这确保了对于此Processor
实例的所有内部类,类型变量T
静态地绑定到String
类型。 在这种情况下,最好将Processor
设为抽象类,以便消费者对其进行子类化。
2.Override ParDoFn的getOutputTypeDescriptor
以针对外部类Processor
解析其类型。
class Processor extends ... { class ParDoFn extends DoFn>>> { @Override protected TypeDescriptor>>> getOutputTypeDescriptor() { return new TypeDescriptor>>>( Processor.this.getClass()) {}; } }
从一开始,代码的完整工作版本如下。 再次注意,当Github问题#57得到解决时,这些都不是必需的。
class CustomType { ... } abstract class Processor extends PTransform, PCollection>>>> { class ParDoFn extends DoFn>>> { ... @Override protected TypeDescriptor>>> getOutputTypeDescriptor() { return new TypeDescriptor>>>( Processor.this.getClass()) {}; } } class Merger extends BinaryCombineFn>> { ... } @Override public PCollection>>> apply(PCollection items) { PCollection>>> partitionedItems = items.apply(ParDo.of(new ParDoFn())); PCollection>>> combinedItems = partitionedItems.apply( Combine.>, Set>>perKey( new Merger())); return combinedItems; } } PCollection input = …; input.apply(new Processor () {});
这不是唯一的解决方案 – 您也可以覆盖Processor.getDefaultOutputCoder
或在中间partitionedItems
集合上显式调用setCoder
– 但它似乎是最常用的。