数据流DoFn中的数据存储区查询在云中运行时减慢了管道

我试图通过在DoFn步骤中查询数据存储来增强管道中的数据。 来自Class CustomClass的对象中的字段用于对数据存储表执行查询,返回的值用于增强对象。

代码如下所示:

public class EnhanceWithDataStore extends DoFn { private static Datastore datastore = DatastoreOptions.defaultInstance().service(); private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind("article"); @Override public void processElement(ProcessContext c) throws Exception { CustomClass event = c.element(); Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId())); String articleName = ""; try{ articleName = article.getString("articleName"); } catch(Exception e) {} CustomClass enhanced = new CustomClass(event); enhanced.setArticleName(articleName); c.output(enhanced); } 

当它在本地运行时,这很快,但是当它在云中运行时,此步骤会显着减慢管道的速度。 是什么导致了这个? 有没有解决方法或更好的方法来做到这一点?

可以在此处找到管道的图片(​​最后一步是增强步骤): 管道架构

你在这里做的是输入PCollection和数据存储区中的增强function之间的连接。

对于PCollection的每个分区,对数据存储PCollection的调用将是单线程的,因此会产生大量延迟。 我希望DirectPipelineRunnerInProcessPipelineRunner中的这个也很慢。 通过自动调节和动态工作重新平衡,您应该在运行Dataflow服务时看到并行性,除非您的结构有些问题导致我们对其进行优化,因此您可以尝试增加--maxNumWorkers 。 但是你仍然无法从批量操作中受益。

使用DatastoreIO.readFrom(...)后跟CoGroupByKey变换,在管道中表达此连接可能更好。 通过这种方式,Dataflow将对所有增强function进行批量并行读取,并使用高效的GroupByKey机制将它们与事件GroupByKey

 // Here are the two collections you want to join PCollection events = ...; PCollection articles = DatastoreIO.readFrom(...); // Key them both by the common id PCollection> keyedEvents = events.apply(WithKeys.of(event -> event.getArticleId())) PCollection> = articles.apply(WithKeys.of(article -> article.getKey().getId()) // Set up the join by giving tags to each collection TupleTag eventTag = new TupleTag() {}; TupleTag articleTag = new TupleTag() {}; KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple .of(eventTag, keyedEvents) .and(articleTag, keyedArticles); PCollection enhancedEvents = coGbkInput .apply(CoGroupByKey.create()) .apply(MapElements.via(CoGbkResult joinResult -> { for (CustomClass event : joinResult.getAll(eventTag)) { String articleName; try { articleName = joinResult.getOnly(articleTag).getString("articleName"); } catch(Exception e) { articleName = ""; } CustomClass enhanced = new CustomClass(event); enhanced.setArticleName(articleName); return enhanced; } }); 

如果只有很少的文章可以在内存中存储查找,那么另一种可能性就是使用DatastoreIO.readFrom(...)然后通过View.asMap()它们全部作为地图侧输入View.asMap()并查找它们。本地表。

 // Here are the two collections you want to join PCollection events = ...; PCollection articles = DatastoreIO.readFrom(...); // Key the articles and create a map view PCollectionView> = articleView .apply(WithKeys.of(article -> article.getKey().getId()) .apply(View.asMap()); // Do a lookup join by side input to a ParDo PCollection enhanced = events .apply(ParDo.withSideInputs(articles).of(new DoFn() { @Override public void processElement(ProcessContext c) { Map articleLookup = c.sideInput(articleView); String articleName; try { articleName = articleLookup.get(event.getArticleId()).getString("articleName"); } catch(Exception e) { articleName = ""; } CustomClass enhanced = new CustomClass(event); enhanced.setArticleName(articleName); return enhanced; } }); 

根据您的数据,这些中的任何一个都可能是更好的选择。

经过一些检查后,我设法找出问题所在: 项目位于欧盟(因此,数据存储位于欧盟区域;与AppEningine区域相同),而数据流作业本身(以及工作人员)默认情况下托管在美国 (不覆盖zone-option时)。

性能差异为25-30倍 :~40个元素/秒,而15个工人为~1200个元素/秒。