Tag: 谷歌云,数据流

通过Google DataFlow Transformer查询关系数据库

我想在我的Dataflow Pipeline上实现ParDo Transformer,它基本上根据要处理的每个元素提供的数据查询关系数据库。 我知道用户定义的变换器中的每个属性都必须是可序列化的,但是要使用jdbc查询数据到数据库,我需要创建一个自然不可序列化的对象。 仍然可以在Dataflow Pipeline上下文中执行此操作吗?

数据流DoFn中的数据存储区查询在云中运行时减慢了管道

我试图通过在DoFn步骤中查询数据存储来增强管道中的数据。 来自Class CustomClass的对象中的字段用于对数据存储表执行查询,返回的值用于增强对象。 代码如下所示: public class EnhanceWithDataStore extends DoFn { private static Datastore datastore = DatastoreOptions.defaultInstance().service(); private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind(“article”); @Override public void processElement(ProcessContext c) throws Exception { CustomClass event = c.element(); Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId())); String articleName = “”; try{ articleName = article.getString(“articleName”); } catch(Exception e) {} CustomClass enhanced = new CustomClass(event); […]

在Google Cloud Dataflow中使用带有复杂PCollection类型的TextIO.Write

我有一个看起来像这样的PCollection: PCollection<KV<KV, Long>> windowed_counts 我的目标是将其写为文本文件。 我想过用的东西: windowed_counts.apply( TextIO.Write.to( “output” )); 但我很难正确设置Coders。 这是我认为会起作用的: KvCoder kvcoder = KvCoder.of(KvCoder.of(StringUtf8Coder.of(), AvroDeterministicCoder.of(EventSession.class) ), TextualLongCoder.of()); TextIO.Write.Bound io = TextIO.Write.withCoder( kvcoder ); windowed_counts.apply( io.to( “output” )); 其中TextualLongCoder是我自己的AtomicCoder的子类,类似于TextualIntegerCoder。 EventSession类注释为使用AvroDeterministicCoder作为其默认编码器。 但是有了这个,我得到了包含非文本字符等的乱码输出。有人可以建议你如何将这个特定的PCollection写成文本吗? 我确信这里有一些显而易见的东西……

Google Cloud Dataflow BigQueryIO.Write发生未知错误(http代码500)

有人问我谷歌云数据流BigQueryIO.Write发生未知错误(http代码500)吗? 我在4月,5月,6月使用Dataflow处理一些数据,我使用相同的代码处理4月数据(400MB)并写入BigQuery成功,但是当我处理May(60MB)或June(90MB)数据时,它失败了。 4月,5月和6月的数据格式相同。 将作者从BigQuery改为TextIO,工作会成功,所以我认为数据格式是好的。 日志仪表板没有任何错误日志….. 系统只有同样的未知错误 我写的代码在这里: http : //pastie.org/10907947 “执行BigQuery导入作业”后出现错误消息: Workflow failed. Causes: (cc846): S01:Read Files/Read+Window.Into()+AnonymousParDo+BigQueryIO.Write/DataflowPipelineRunner.BatchBigQueryIOWrite/DataflowPipelineRunner.BatchBigQueryIONativeWrite failed., (e19a27451b49ae8d): BigQuery import job “dataflow_job_631261” failed., (e19a745a666): BigQuery creation of import job for table “hi_event_m6” in dataset “TESTSET” in project “lib-ro-123” failed., (e19a2749ae3f): BigQuery execution failed., (e19a2745a618): Error: Message: An internal error occurred and the request could […]

在Dataflow Generic中进行转换

这与另一个SO问题[此处]( 设置自定义编码器和处理参数化类型 )有关。在解决方法之后,帮助我在变换中使用自定义类型。 但由于我的自定义类型是通用的,我希望甚至使变换类通用,然后可以使用相同的类型参数化自定义类型。 但是当我尝试这样做时,我遇到了无法为类型变量T提供编码器,因为实际类型由于擦除而未知 。 解决方案建议注册一个可以返回类型参数的编码器,但由于类型参数本身是未知的,我想这个exception会抛出,我不知道如何解决这个问题。 static class Processor extends PTransform<PCollection, PCollection<KV<String, Set<CustomType>>>> { private static final long serialVersionUID = 0; @Override public PCollection<KV<String, Set<CustomType>>> apply(PCollection items) { PCollection<KV<String, Set<CustomType>>> partitionedItems = items .apply(ParDo.of(new ParDoFn())); PCollection<KV<String, Set<CustomType>>> combinedItems = partitionedItems .apply(Combine.<String, Set<CustomType>>perKey(new Merger())); } }

如何从Cassandra增加Dataflow读取并行性

我试图将大量数据(2 TB,30kkk行)从Cassandra导出到BigQuery。 我的所有基础设施都在GCP上。 我的Cassandra集群有4个节点(4个vCPU,26 GB内存,每个2000 GB PD(HDD))。 集群中有一个种子节点。 我需要在写入BQ之前转换我的数据,所以我使用的是Dataflow。 工人类型是n1-highmem-2 。 工人和Cassandra实例位于同一区域europe-west1-c 。 我对Cassandra的限制: 我负责读取转换的部分管道代码位于此处 。 自动缩放 问题是,当我没有设置–numWorkers ,以这种方式自动调整工人数量(平均2名工人): 负载均衡 当我设置–numWorkers=15 ,读取速率不会增加,只有2名工作人员与Cassandra通信(我可以从iftop告诉它,只有这些工作人员的CPU负载大约为60%)。 同时,Cassandra节点没有很多负载(CPU使用率为20-30%)。 种子节点的网络和磁盘使用率比其他节点大约高2倍,但不是太高,我认为: 对于非种子节点: 管道发射警告 管道启动时我有一些警告: WARNING: Size estimation of the source failed: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: […]

Google Cloud Dataflow用户定义的MySQL源代码

我正在编写Google Dataflow Pipeline,并且作为源之一,我需要通过查询获得MySQL结果集。 那么几个问题: 从我的管道中提取数据的正确方法是什么?这可以简单地使用JDBC在线完成吗? 在我确实需要实现将MySQL作为源代码的 “用户定义数据格式”的情况下,是否有人知道实现是否已经存在并且我不需要重新发明轮子? (不要误解我的意思,我会喜欢写它,但我想这将是一个非常常见的情况,使用MySQL作为源) 谢谢大家!