如何从Cassandra增加Dataflow读取并行性

我试图将大量数据(2 TB,30kkk行)从Cassandra导出到BigQuery。 我的所有基础设施都在GCP上。 我的Cassandra集群有4个节点(4个vCPU,26 GB内存,每个2000 GB PD(HDD))。 集群中有一个种子节点。 我需要在写入BQ之前转换我的数据,所以我使用的是Dataflow。 工人类型是n1-highmem-2 。 工人和Cassandra实例位于同一区域europe-west1-c 。 我对Cassandra的限制:

Cassandra限制设置

我负责读取转换的部分管道代码位于此处 。

自动缩放

问题是,当我没有设置--numWorkers ,以这种方式自动调整工人数量(平均2名工人):

工作人员编号与自动缩放

负载均衡

当我设置--numWorkers=15 ,读取速率不会增加,只有2名工作人员与Cassandra通信(我可以从iftop告诉它,只有这些工作人员的CPU负载大约为60%)。

同时,Cassandra节点没有很多负载(CPU使用率为20-30%)。 种子节点的网络和磁盘使用率比其他节点大约高2倍,但不是太高,我认为:

种子的网络使用情况

种子节点的磁盘使用情况

对于非种子节点:

非种子的网络使用情况

非种子节点的磁盘使用情况

管道发射警告

管道启动时我有一些警告:

 WARNING: Size estimation of the source failed: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details]) 

我的Cassandra集群位于GCE本地网络中,它接收到一些查询是从我的本地计算机进行的,无法访问集群(我正在使用Dataflow Eclipse插件启动管道,如此处所述)。 这些查询是关于表的大小估计。 我可以手动指定尺寸估算或从GCE实例启动pipline吗? 或者我可以忽略这些警告吗? 它对阅读率有影响吗?

我试图从GCE VM启动管道。 连接没有问题。 我的表中没有varchar列,但是我收到了这样的警告(datastax驱动程序中没有编解码器[varchar java.lang.Long])。 :

 WARNING: Can't estimate the size com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar  java.lang.Long] at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741) at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588) at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137) at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246) at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) at com.google.common.cache.LocalCache.get(LocalCache.java:4053) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986) at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467) at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69) at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135) at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142) at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

管道读取代码

 // Read data from Cassandra table PCollection pcollection = p.apply(CassandraIO.read() .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042) .withKeyspace(keyspaceName).withTable(tableName) .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class)) .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL)); // Transform pcollection to KV PCollection by rowName PCollection<KV> pcollection_by_rowName = pcollection .apply(ParDo.of(new DoFn<Model, KV>() { @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element().rowName, c.element())); } })); 

拆分数(Stackdriver日志)

 W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 

我尝试了什么

没有效果:

  1. 将读取一致性级别设置为ONE
  2. nodetool setstreamthroughput 1000nodetool setinterdcstreamthroughput 1000
  3. 增加Cassandra读取并发性(在cassandra.yaml ): concurrent_reads: 32
  4. 设置不同数量的工人1-40。

一些影响:1。我设置numSplits = 10为@jkff提议。 现在我可以在日志中看到:

 I Murmur3Partitioner detected, splitting W Can't estimate the size W Can't estimate the size W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 I Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 I Murmur3Partitioner detected, splitting W Can't estimate the size I Splitting source [0, 0) produced 0 bundles with total serialized response size 76 W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

但我有另一个例外:

 java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra... (5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296 at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58) at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80) at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579) ... 14 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$' at com.datastax.driver.core.Responses$Error.asException(Responses.java:144) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179) at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186) at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more 

也许有一个错误: CassandraServiceImpl.java#L220

这句话看起来像是错误的: CassandraServiceImpl.java#L207

我对CassandraIO代码所做的更改

正如@jkff所提议的那样,我以我需要的方式改变了CassandraIO:

 @VisibleForTesting protected List<BoundedSource> split(CassandraIO.Read spec, long desiredBundleSizeBytes, long estimatedSizeBytes) { long numSplits = 1; List<BoundedSource> sourceList = new ArrayList(); if (desiredBundleSizeBytes > 0) { numSplits = estimatedSizeBytes / desiredBundleSizeBytes; } if (numSplits <= 0) { LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits); numSplits = 10; } LOG.info("Number of splits is {}", numSplits); Long startRange = MIN_TOKEN; Long endRange = MAX_TOKEN; Long startToken, endToken; String pk = "$pk"; switch (spec.table()) { case "table1": pk = "table1_pk"; break; case "table2": case "table3": pk = "table23_pk"; break; } endToken = startRange; Long incrementValue = endRange / numSplits - startRange / numSplits; String splitQuery; if (numSplits == 1) { // we have an unique split splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString(); sourceList.add(new CassandraIO.CassandraSource(spec, splitQuery)); } else { // we have more than one split for (int i = 0; i  0) { builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken)); } if (i < (numSplits - 1)) { builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken)); } sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString())); } } return sourceList; } 

我认为这应该被归类为CassandraIO中的一个错误。 我提交了BEAM-3424 。 您可以尝试构建自己的Beam版本,默认值1更改为100或类似的东西,而此问题正在修复。

我还在尺寸估算期间提交了BEAM-3425的bug。