为什么我的Cassandra准备声明数据采集速度如此之慢?

我有一个包含10万个名字的Java列表,我想将它们摄取到运行带有Cassandra 3.10.0的Datastax Enterprise 5.1的3节点Cassandra集群中

我的代码摄取但需要花费很长时间。 我对集群进行了压力测试,每秒可以进行超过25,000次写入。 使用我的摄取代码,我获得了大约200 /秒的可怕性能。

我的Java列表中有100,000个名称,称为myList。 我使用以下预准备语句和会话执行来提取数据。

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); int id = 0; for(int i = 0; i < myList.size(); i++) { id += 1; session.execute(prepared.bind(id, myList.get(i))); } 

我在代码中添加了一个集群监视器,以查看发生了什么。 这是我的监控代码。

  /// Monitoring Status of Cluster final LoadBalancingPolicy loadBalancingPolicy = cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); scheduled.scheduleAtFixedRate(() -> { Session.State state = session.getState(); state.getConnectedHosts().forEach((host) -> { HostDistance distance = loadBalancingPolicy.distance(host); int connections = state.getOpenConnections(host); int inFlightQueries = state.getInFlightQueries(host); System.out.printf("%s connections=%d, current load=%d, maxload=%d%n", host, connections, inFlightQueries, connections * poolingOptions.getMaxRequestsPerConnection(distance)); }); }, 5, 5, TimeUnit.SECONDS); 

监视5秒输出显示以下3次迭代:

 /192.168.20.25:9042 connections=1, current load=1, maxload=32768 /192.168.20.26:9042 connections=1, current load=0, maxload=32768 /192.168.20.34:9042 connections=1, current load=0, maxload=32768 /192.168.20.25:9042 connections=1, current load=1, maxload=32768 /192.168.20.26:9042 connections=1, current load=0, maxload=32768 /192.168.20.34:9042 connections=1, current load=0, maxload=32768 /192.168.20.25:9042 connections=1, current load=0, maxload=32768 /192.168.20.26:9042 connections=1, current load=1, maxload=32768 /192.168.20.34:9042 connections=1, current load=0, maxload=32768 

我似乎没有非常有效地利用我的集群。 我不确定我做错了什么,非常感谢任何提示。

谢谢!

使用executeAsync。

异步执行提供的查询。 此方法不会阻止。 一旦查询传递到底层网络堆栈,它就会返回。 特别是,从此方法返回并不保证查询有效或甚至已提交到活动节点。 访问ResultSetFuture时,将抛出与查询失败有关的任何exception。

您正在插入大量数据。 如果您使用executeAsync并且您的集群无法处理这么多数据,它可能会抛出exception。 您可以使用信号量限制executeAsync。

示例:

 PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); int numberOfConcurrentQueries = 100; final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries); int id = 0; for(int i = 0; i < myList.size(); i++) { try { id += 1; semaphore.acquire(); ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i))); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(ResultSet result) { semaphore.release(); } @Override public void onFailure(Throwable t) { semaphore.release(); } }); } catch (Exception e) { semaphore.release(); e.printStackTrace(); } } 

资源 :
https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver .core.Statement-