Cassandra集群具有差的插入性能和插入稳定性

我必须每个客户每秒存储大约250个数值,每小时大约900k个数字。 它可能不是一整天的录音(可能每天5-10个小时),但我会根据客户端ID和读取日期对数据进行分区。 最大行长约为22-23M,仍然可以控制。 毫无意义,我的计划看起来像这样:

CREATE TABLE measurement ( clientid text, date text, event_time timestamp, value int, PRIMARY KEY ((clientid,date), event_time) ); 

密钥空间的复制因子为2,仅用于测试,snitch是GossipingPropertyFileSnitchNetworkTopologyStrategy 。 我知道复制因子3是更多的生产标准。

接下来,我在公司服务器上创建了一个小型集群,三个裸机虚拟化机器,具有2个CPU x 2核心和16GB RAM以及大量空间。 我和他们在千兆局域网中。 群集基于nodetool运行。

这是我用来测试我的设置的代码:

  Cluster cluster = Cluster.builder() .addContactPoint("192.168.1.100") .addContactPoint("192.168.1.102") .build(); Session session = cluster.connect(); DateTime time = DateTime.now(); BlockingQueue queryQueue = new ArrayBlockingQueue(50, true); try { ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)"; PreparedStatement preparedStatement = session.prepare(insertQuery); BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also //generating the entries for (int i = 0; i = 65534) { queryQueue.put(batch); batch = new BatchStatement(); } } queryQueue.put(batch); //the last batch, perhaps shorter than 65535 //storing the data System.out.println("Starting storing"); while (!queryQueue.isEmpty()) { pool.execute(() -> { try { long threadId = Thread.currentThread().getId(); System.out.println("Started: " + threadId); BatchStatement statement = queryQueue.take(); long start2 = System.currentTimeMillis(); session.execute(statement); System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2)); } catch (Exception ex) { System.out.println(ex.toString()); } }); } pool.shutdown(); pool.awaitTermination(120,TimeUnit.SECONDS); } catch (Exception ex) { System.out.println(ex.toString()); } finally { session.close(); cluster.close(); } 

我通过阅读这里以及其他博客和网站上的post来提出代码。 据我所知,客户端使用多个线程很重要,这就是我这样做的原因。 我也尝试过使用异步操作。

最重要的结果是,无论我使用哪种方法,一个批处理在5-6秒内执行,尽管可能需要10个。如果我只输入一个批次(因此,只有~65k列)或如果我使用愚蠢的单线程应用程序。 老实说,我期待更多。 特别是因为我在笔记本电脑上使用本地实例获得了或多或少相似的性能。

第二个,也许是更重要的问题,是我以不可预测的方式面对的例外。 这两个:

com.datastax.driver.core.exceptions.WriteTimeoutException:在一致性ONE的写查询期间的Cassandra超时(需要1个副本,但只有0确认写入)

com.datastax.driver.core.exceptions.NoHostAvailableException:尝试查询失败的所有主机(已尝试:/192.168.1.102:9042(com.datastax.driver.core.TransportException:[/192.168.1.102:9042]连接已关闭),/ 192.168.1.100:9042(com.datastax.driver.core.TransportException:[/ 192.168.1.100:9042]连接已关闭),/ 192.168.1.101:9042(com.datastax.driver.core .TransportException:[/ 192.168.1.101:9042]连接已关闭))

在底线,我做错了什么? 我应该重新组织我加载数据的方式,还是改变方案。 我尝试减少行长度(所以我有12小时的行),但这没有太大的区别。

==============================更新:

我很粗鲁,忘记粘贴问题解答后我使用的代码示例。 它工作得相当好,但是我继续我的研究与KairosDB和Astyanax的二进制转移。 看起来我可以通过CQL获得更好的性能,虽然KairosDB在超载时可能会遇到一些问题(但我正在研究它)而Astyanax对于我的品味来说有点冗长。 不过,这是代码,我可能在某处错了。

当信号量超过5000时,信号量槽号对性能没有影响,几乎不变。

 String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)"; PreparedStatement preparedStatement = session.prepare(insertQuery); Semaphore semaphore = new Semaphore(15000); System.out.println("Starting " + Thread.currentThread().getId()); DateTime time = DateTime.parse("2015-01-05T12:00:00"); //generating the entries long start = System.currentTimeMillis(); for (int i = 0; i < 900000; i++) { BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important semaphore.acquire(); ResultSetFuture resultSetFuture = session.executeAsync(statement); Futures.addCallback(resultSetFuture, new FutureCallback() { @Override public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) { semaphore.release(); } @Override public void onFailure(Throwable throwable) { System.out.println("Error: " + throwable.toString()); semaphore.release(); } }); time = time.plus(4); //4ms between each entry } 

使用未记录的批处理有什么结果? 您确定要使用批处理语句吗? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e