2012-07-09 55 views
23

我正在设计一个MySQL数据库,它需要处理各种InnoDB表中每秒约600行的插入。我目前的实现使用非批处理准备语句。但是,写入MySQL数据库瓶颈和我的队列大小会随着时间的推移而增加。在Java中插入MySQL语句的性能:批处理模式预处理语句与具有多个值的单个插入

这个实现是用Java写的,我不知道这个版本是不是手。它使用MySQLJava connector。我需要考虑明天切换到JDBC。我假设这些是两个不同的连接器包。

我看了关于这一问题的以下主题:

,并从MySQL网站:

我的问题是:

  • 是否有人在使用插入在批处理模式准备语句与使用单个INSERT语句多个值性能差异的建议和经验。

  • MySQL Java连接器与JDBC之间的性能差异是什么。我应该使用其中一种吗?

  • 这些表格用于归档目的,并且会看到约90%写入约10%读取(甚至更少)。我正在使用InnoDB。这是MyISAM的正确选择吗?

非常感谢您的帮助。

+0

当使用批量插入时,您将在单个事务中执行此操作。在其他情况下,您将每行插入事务。 – 2012-07-09 05:46:57

+0

也许dba.stackexchange对于这个问题会是一个更好的地方。 – 2012-07-09 08:39:05

+0

+1对于您已经完成的研究和努力,尽管这是您的第一篇文章。 – 2012-07-09 12:00:55

回答

27

JDBC仅仅是数据库访问的Java SE标准提供的标准接口,因此您并不真正被绑定到特定的JDBC实现。 MySQL Java连接器(Connector/J)仅用于MySQL数据库的JDBC接口的实现。出于经验,我参与了一个使用MySQL的大量数据的项目,我们大多更喜欢使用MyISAM来生成数据:它可以实现更高的性能,从而减少事务处理量,但总的来说,MyISAM速度更快,但InnoDB更可靠。

大约一年前,我还想知道INSERT语句的性能,并在我的代码架中发现了以下旧的测试代码(对不起,它有点复杂,有点超出了您的问题范围)。下面的代码包含的插入所述测试数据的4种方式的例子:

  • INSERT S;
  • 成批INSERT s;
  • 手动批量INSERT(从不使用它 - 这是危险的);
  • 和最后准备批量INSERT)。

它使用TestNG作为亚军,并使用一些自定义代码的遗产,如:

  • runWithConnection()方法 - 确保在执行回调后连接被关闭或放回连接池(但下面的代码使用了不可靠的结算策略 - 即使没有try/finally以减少代码);
  • IUnsafeIn<T, E extends Throwable> - 接受单个参数但可能抛出E类异常的方法的自定义回调接口,如:void handle(T argument) throws E;
package test; 

import test.IUnsafeIn; 

import java.sql.Connection; 
import java.sql.PreparedStatement; 
import java.sql.SQLException; 

import static java.lang.String.format; 
import static java.lang.String.valueOf; 
import static java.lang.System.currentTimeMillis; 

import core.SqlBaseTest; 
import org.testng.annotations.AfterSuite; 
import org.testng.annotations.BeforeSuite; 
import org.testng.annotations.BeforeTest; 
import org.testng.annotations.Test; 

public final class InsertVsBatchInsertTest extends SqlBaseTest { 

    private static final int ITERATION_COUNT = 3000; 

    private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB"; 
    private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1"; 
    private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1"; 

    private static void withinTimer(String name, Runnable runnable) { 
     final long start = currentTimeMillis(); 
     runnable.run(); 
     logStdOutF("%20s: %d ms", name, currentTimeMillis() - start); 
    } 

    @BeforeSuite 
    public void createTable() { 
     runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
      @Override 
      public void handle(Connection connection) throws SQLException { 
       final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY); 
       statement.execute(); 
       statement.close(); 
      } 
     }); 
    } 

    @AfterSuite 
    public void dropTable() { 
     runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
      @Override 
      public void handle(Connection connection) throws SQLException { 
       final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY); 
       statement.execute(); 
       statement.close(); 
      } 
     }); 
    } 

    @BeforeTest 
    public void clearTestTable() { 
     runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
      @Override 
      public void handle(Connection connection) throws SQLException { 
       final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY); 
       statement.execute(); 
       statement.close(); 
      } 
     }); 
    } 

    @Test 
    public void run1SingleInserts() { 
     withinTimer("Single inserts", new Runnable() { 
      @Override 
      public void run() { 
       runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
        @Override 
        public void handle(Connection connection) throws SQLException { 
         for (int i = 0; i < ITERATION_COUNT; i++) { 
          final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)"); 
          statement.setInt(1, i); 
          statement.setFloat(2, i); 
          statement.setString(3, valueOf(i)); 
          statement.execute(); 
          statement.close(); 
         } 
        } 
       }); 
      } 
     }); 
    } 

    @Test 
    public void run2BatchInsert() { 
     withinTimer("Batch insert", new Runnable() { 
      @Override 
      public void run() { 
       runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
        @Override 
        public void handle(Connection connection) throws SQLException { 
         final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)"); 
         for (int i = 0; i < ITERATION_COUNT; i++) { 
          statement.setInt(1, i); 
          statement.setFloat(2, i); 
          statement.setString(3, valueOf(i)); 
          statement.addBatch(); 
         } 
         statement.executeBatch(); 
         statement.close(); 
        } 
       }); 
      } 
     }); 
    } 

    @Test 
    public void run3DirtyBulkInsert() { 
     withinTimer("Dirty bulk insert", new Runnable() { 
      @Override 
      public void run() { 
       runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
        @Override 
        public void handle(Connection connection) throws SQLException { 
         final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES "); 
         for (int i = 0; i < ITERATION_COUNT; i++) { 
          if (i != 0) { 
           builder.append(","); 
          } 
          builder.append(format("(%s, %s, '%s')", i, i, i)); 
         } 
         final String query = builder.toString(); 
         final PreparedStatement statement = connection.prepareStatement(query); 
         statement.execute(); 
         statement.close(); 
        } 
       }); 
      } 
     }); 
    } 

    @Test 
    public void run4SafeBulkInsert() { 
     withinTimer("Safe bulk insert", new Runnable() { 
      @Override 
      public void run() { 
       runWithConnection(new IUnsafeIn<Connection, SQLException>() { 
        private String getInsertPlaceholders(int placeholderCount) { 
         final StringBuilder builder = new StringBuilder("("); 
         for (int i = 0; i < placeholderCount; i++) { 
          if (i != 0) { 
           builder.append(","); 
          } 
          builder.append("?"); 
         } 
         return builder.append(")").toString(); 
        } 

        @SuppressWarnings("AssignmentToForLoopParameter") 
        @Override 
        public void handle(Connection connection) throws SQLException { 
         final int columnCount = 3; 
         final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES "); 
         final String placeholders = getInsertPlaceholders(columnCount); 
         for (int i = 0; i < ITERATION_COUNT; i++) { 
          if (i != 0) { 
           builder.append(","); 
          } 
          builder.append(placeholders); 
         } 
         final int maxParameterIndex = ITERATION_COUNT * columnCount; 
         final String query = builder.toString(); 
         final PreparedStatement statement = connection.prepareStatement(query); 
         int valueIndex = 0; 
         for (int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++) { 
          statement.setObject(parameterIndex++, valueIndex); 
          statement.setObject(parameterIndex++, valueIndex); 
          statement.setObject(parameterIndex++, valueIndex); 
         } 
         statement.execute(); 
         statement.close(); 
        } 
       }); 
      } 
     }); 
    } 

} 

看看与@Test注解的方法:他们实际上执行INSERT语句。还请大家看看CREATE_TABLE_QUERY不变:在源代码中,它使用InnoDB的生产在我的机器下面的结果与MySQL安装5.5(MySQL的连接器/ J 5.1.12):

InnoDB 
Single inserts: 74148 ms 
Batch insert: 84370 ms 
Dirty bulk insert: 178 ms 
Safe bulk insert: 118 ms 

如果改变CREATE_TABLE_QUERY InnoDB到MyISAM,你会看到显着的性能提升:

MyISAM 
Single inserts: 604 ms 
Batch insert: 447 ms 
Dirty bulk insert: 63 ms 
Safe bulk insert: 26 ms 

希望这会有所帮助。

UPD:

因为你必须正确定制max_allowed_packetmysql.ini(在[mysqld]部分)要大到足以支持真正的大包4的方式。

+0

感谢您的基准测试,这是我可以要求的最直接的答案。 我今天实施了批量准备插入,它的工作就像一个魅力! – Darren 2012-07-10 01:57:52

+0

不客气。 :) – 2012-07-10 05:32:23

+4

任何想法为什么批量插入比InnoDB上的单插入慢? – stracktracer 2012-11-15 23:17:23

1

您是否对任何受影响的表有触发器?如果不是,每秒600个插入看起来不是很多。

来自JDBC的批插入功能将在同一事务中多次发出相同的语句,而多值SQL将在单个语句中挤压所有值。在多值语句的情况下,您将不得不动态地构造插入SQL,这可能是更多代码,更多内存,SQL注入保护机制等方面的开销。首先尝试定期的批处理功能,为您的工作负载,它应该不成问题。

如果您没有批量接收数据,请考虑在插入之前对其进行批处理。 我们在单独的线程上使用队列来实现生产者 - 消费者安排。在此我们阻止插入,直到某个时间过去或者队列的大小超过了阈值。

如果您想要通知生产者插入成功,那么需要更多管道。

有时只是阻塞线程可以更直接和实用。

if(System.currentTimeMills()-lastInsertTime>TIME_THRESHOLD || queue.size()>SIZE_THRESHOLD) { 
    lastInsertTime=System.currentTimeMills(); 
    // Insert logic 
    } else { 
    // Do nothing OR sleep for some time OR retry after some time. 
    } 
+0

谢谢你的建议。我今天做了一些研究,创建了一个基本的生产者 - 消费者关系。我的数据处理器在一个线程中工作,向属于mysql插入线程的队列添加信息。它似乎很好地工作。 我在使用innodb,因为有一些重要的外键关系我会尽量保留。看起来,他们可能并不是真正有必要的事情,所以我可能会在明天切换到myISAM,看看事情进展如何。 – Darren 2012-07-10 02:01:35

8

我知道这个线程很旧,但我只是想我会提到,如果在使用mysql时为jdbc url添加“rewriteBatchedStatements = true”,在使用批处理语句时可以带来巨大的性能提升。