MySQL中的MySQL插入语句的性能:批处理模式预处理语句与具有多个值的单个插入

我正在设计一个MySQL数据库,它需要在各种InnoDB表中每秒处理大约600行插入。 我当前的实现使用非批处理的预准备语句。 但是,写入MySQL数据库的瓶颈和我的队列大小会随着时间的推移而增加。

该实现是用Java编写的,我不知道该版本是否有用。 它使用MySQL的Java连接器 。 我需要考虑明天切换到JDBC 。 我假设这是两个不同的连接器包。

我已经在这个问题上阅读了以下主题:

  • 优化MySQL插入以处理数据流
  • MyISAM与InnoDB
  • 将二进制数据插入MySQL(没有PreparedStatement)

并从mysql网站:

  • http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html

我的问题是:

  • 有人在批处理模式下使用INSERT与预准备语句相比,使用具有多个VALUE的单个INSERT语句,是否有任何关于性能差异的建议或经验。

  • MySQL Java连接器与JDBC之间的性能差异是什么? 我应该使用其中一个吗?

  • 这些表用于存档目的,并且将看到约90%写入〜10%读取(甚至可能更少)。 我正在使用InnoDB。 这是MyISAM的正确选择吗?

预先感谢您的帮助。

JDBC只是提供标准接口的Java SE数据库访问标准,因此您并不真正绑定到特定的JDBC实现。 MySQL Java连接器(Connector / J)仅用于MySQL数据库的JDBC接口的实现。 出于经验,我参与了一个使用MySQL使用大量数据的项目,我们更喜欢使用MyISAM来生成可以生成的数据:它可以实现更高性能的丢失交易,但一般来说,MyISAM更快,但InnoDB更可靠。

我大约一年前就想知道INSERT语句的性能,并在我的代码架中发现了以下旧的测试代码(抱歉,它有点复杂,有点超出了你的问题范围)。 下面的代码包含4种插入测试数据的方法示例:

  • 单个 INSERT ;
  • 批量 INSERT ;
  • 手动批量 INSERT (从不使用它 – 它很危险);
  • 最后准备批量 INSERT )。

它使用TestNG作为跑步者,并使用一些自定义代码遗留如:

  • runWithConnection()方法 – 确保在执行回调后连接被关闭或放回连接池(但下面的代码使用的语句不是可靠的策略关闭 – 即使没有try / finally来减少代码);
  • IUnsafeIn – 接受单个参数的方法的自定义回调接口,但可能抛出类型E的exception,如: void handle(T argument) throws E;
 package test; import test.IUnsafeIn; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import static java.lang.String.format; import static java.lang.String.valueOf; import static java.lang.System.currentTimeMillis; import core.SqlBaseTest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; public final class InsertVsBatchInsertTest extends SqlBaseTest { private static final int ITERATION_COUNT = 3000; private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB"; private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1"; private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1"; private static void withinTimer(String name, Runnable runnable) { final long start = currentTimeMillis(); runnable.run(); logStdOutF("%20s: %d ms", name, currentTimeMillis() - start); } @BeforeSuite public void createTable() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY); statement.execute(); statement.close(); } }); } @AfterSuite public void dropTable() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY); statement.execute(); statement.close(); } }); } @BeforeTest public void clearTestTable() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY); statement.execute(); statement.close(); } }); } @Test public void run1SingleInserts() { withinTimer("Single inserts", new Runnable() { @Override public void run() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { for ( int i = 0; i < ITERATION_COUNT; i++ ) { final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)"); statement.setInt(1, i); statement.setFloat(2, i); statement.setString(3, valueOf(i)); statement.execute(); statement.close(); } } }); } }); } @Test public void run2BatchInsert() { withinTimer("Batch insert", new Runnable() { @Override public void run() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)"); for ( int i = 0; i < ITERATION_COUNT; i++ ) { statement.setInt(1, i); statement.setFloat(2, i); statement.setString(3, valueOf(i)); statement.addBatch(); } statement.executeBatch(); statement.close(); } }); } }); } @Test public void run3DirtyBulkInsert() { withinTimer("Dirty bulk insert", new Runnable() { @Override public void run() { runWithConnection(new IUnsafeIn() { @Override public void handle(Connection connection) throws SQLException { final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES "); for ( int i = 0; i < ITERATION_COUNT; i++ ) { if ( i != 0 ) { builder.append(","); } builder.append(format("(%s, %s, '%s')", i, i, i)); } final String query = builder.toString(); final PreparedStatement statement = connection.prepareStatement(query); statement.execute(); statement.close(); } }); } }); } @Test public void run4SafeBulkInsert() { withinTimer("Safe bulk insert", new Runnable() { @Override public void run() { runWithConnection(new IUnsafeIn() { private String getInsertPlaceholders(int placeholderCount) { final StringBuilder builder = new StringBuilder("("); for ( int i = 0; i < placeholderCount; i++ ) { if ( i != 0 ) { builder.append(","); } builder.append("?"); } return builder.append(")").toString(); } @SuppressWarnings("AssignmentToForLoopParameter") @Override public void handle(Connection connection) throws SQLException { final int columnCount = 3; final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES "); final String placeholders = getInsertPlaceholders(columnCount); for ( int i = 0; i < ITERATION_COUNT; i++ ) { if ( i != 0 ) { builder.append(","); } builder.append(placeholders); } final int maxParameterIndex = ITERATION_COUNT * columnCount; final String query = builder.toString(); final PreparedStatement statement = connection.prepareStatement(query); int valueIndex = 0; for ( int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++ ) { statement.setObject(parameterIndex++, valueIndex); statement.setObject(parameterIndex++, valueIndex); statement.setObject(parameterIndex++, valueIndex); } statement.execute(); statement.close(); } }); } }); } } 

看看使用@Test注释注释的方法:它们实际上执行INSERT语句。 另请查看CREATE_TABLE_QUERY常量:在源代码中,它使用InnoDB在安装了MySQL 5.5的机器上生成以下结果(MySQL Connector / J 5.1.12):

 InnoDB Single inserts: 74148 ms Batch insert: 84370 ms Dirty bulk insert: 178 ms Safe bulk insert: 118 ms 

如果您将CREATE_TABLE_QUERY InnoDB更改为MyISAM,您会看到显着的性能提升:

 MyISAM Single inserts: 604 ms Batch insert: 447 ms Dirty bulk insert: 63 ms Safe bulk insert: 26 ms 

希望这可以帮助。

UPD:

对于第四种方法,你必须正确地定制mysql.ini[mysqld]部分)中的max_allowed_packet ,使其足够大以支持真正的大数据包。

我知道这个线程很老了,但我想我会提到如果你在使用mysql时将“rewriteBatchedStatements = true”添加到jdbc url,它可以在使用批处理语句时带来巨大的性能提升。

您是否在任何受影响的表上有任何触发器? 如果没有,每秒600次插入看起来不是很多。

来自JDBC的批量插入function将在同一事务中多次发出相同的语句,而多值SQL将在单个语句中挤压所有值。 在多值语句的情况下,您将不得不动态构造插入SQL,这可能是更多代码,更多内存,SQL注入保护机制等方面的开销。首先尝试常规批处理function,对于您的工作负载,它应该不是问题。

如果您没有批量接收数据,请考虑在插入之前对其进行批处理。 我们在单独的线程上使用Queue来实现Producer-Consumer安排。 在此,我们阻止插入直到某个时间过去或队列的大小超过阈值。

如果您希望生产者收到有关成功插入的通知,则需要更多管道。

有时只是在线程上阻塞可以更直接和实用。

 if(System.currentTimeMills()-lastInsertTime>TIME_THRESHOLD || queue.size()>SIZE_THRESHOLD) { lastInsertTime=System.currentTimeMills(); // Insert logic } else { // Do nothing OR sleep for some time OR retry after some time. }