1

我需要将很多实体保存到数据库中。保存一个实体包括将行添加到不同的表中,并通过在一个表中插入一行用于将某行插入到另一个表中来自动生成键。这样的逻辑使我创建和使用存储过程。分别为每个实体调用这个存储过程(即通过statement.execute(...))可以正常工作,除非有数十亿个实体要保存。所以我试图分批做到这一点。但是,如果是批处理,则批处理执行会导致抛出org.postgresql.util.PSQLException,并显示一条消息'如果没有预期结果,则返回结果。'在PostgreSQL中批量存储过程

我的存储过程是这样的:

CREATE OR REPLACE FUNCTION insertSentence(warcinfoID varchar, recordID varchar, sentence varchar, 
    sent_timestamp bigint, sect_ids smallint[]) RETURNS void AS $$ 
DECLARE 
    warcinfoIdId integer := 0; 
    recordIdId integer := 0; 
    sentId integer := 0; 
    id integer := 0; 
BEGIN 
    SELECT warcinfo_id_id INTO warcinfoIdId FROM warcinfo_id WHERE warcinfo_id_value = warcinfoID; 
    IF NOT FOUND THEN 
     INSERT INTO warcinfo_id (warcinfo_id_value) VALUES (warcinfoID) 
      RETURNING warcinfo_id_id INTO STRICT warcinfoIdId; 
    END IF; 
    SELECT record_id_id INTO recordIdId FROM record_id WHERE record_id_value = recordID; 
    IF NOT FOUND THEN 
     INSERT INTO record_id (record_id_value) VALUES (recordID) 
      RETURNING record_id_id INTO STRICT recordIdId; 
    END IF; 
    LOOP 
     SELECT sent_id INTO sentId FROM sentence_text 
      WHERE md5(sent_text) = md5(sentence) AND sent_text = sentence; 
     EXIT WHEN FOUND; 
     BEGIN 
      INSERT INTO sentence_text (sent_text) VALUES (sentence) RETURNING sent_id INTO STRICT sentId; 
     EXCEPTION WHEN unique_violation THEN 
      sentId := 0; 
     END; 
    END LOOP; 
    INSERT INTO sentence_occurrence (warcinfo_id, record_id, sent_id, timestamp, sect_ids) 
     VALUES (warcinfoIdId, recordIdId, sentId, TO_TIMESTAMP(sent_timestamp), sect_ids) 
     RETURNING entry_id INTO STRICT id; 
END; 
$$ LANGUAGE plpgsql; 

和Scala代码是这样的:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Int])]): Unit = { 
    Class.forName(driver) 
    val conn = DriverManager.getConnection(connectionString) 

    try { 
    val statement = conn.createStatement() 
    var i = 0 
    iterator.foreach(r => { 
     i += 1 
     statement.addBatch(
     "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
      r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
    ) 
     if (i % 1000 == 0) statement.executeBatch() 
    }) 
    if (i % 1000 != 0) statement.executeBatch() 
    } catch { 
    case e: SQLException => println("exception caught: " + e.getNextException()); 
    } finally { 
    conn.close 
    } 
} 

奇怪的是,即使statement.executeBatch()抛出一个异常,它在此之前保存的实体。所以这种解决方法,使事情的工作:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Int])]): Unit = { 
    Class.forName(driver) 
    val conn = DriverManager.getConnection(connectionString) 

    try { 
    var statement = conn.createStatement() 
    var i = 0 
    iterator.foreach(r => { 
     i += 1 
     statement.addBatch(
     "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
      r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
    ) 
     if (i % 1000 == 0) { 
     i = 0 
     try { 
      statement.executeBatch() 
     } catch { 
      case e: SQLException => statement = conn.createStatement() 
     } 
     } 
    }) 
    if (i % 1000 != 0) { 
     try { 
     statement.executeBatch() 
     } catch { 
     case e: SQLException => statement = conn.createStatement() 
     } 
    } 
    } catch { 
    case e: SQLException => println("exception caught: " + e.getNextException()); 
    } finally { 
    conn.close 
    } 
} 

不过,我希望不要轻信的PostgreSQL无证功能我目前使用。 我看到其他人也碰到这个问题来了:

有人能提出一个解决办法?

回答

1

Strangely, even though statement.executeBatch() throw an exception, it saves entities before this.

这是因为您没有在事务中包装批处理。 JDBC规范并没有说明IIRC是否应该将事件隐式包装在一个事务中,如果事件还没有进行,或者作为单独的语句被触发的话。错误发生后,实施是否应该继续进行。

为了获得良好定义的行为(和更好的性能),请将该批次包装在一个事务中。

statement.addBatch(
    "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
    r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
) 

不!远离键盘!来吧,你不是一个PHP程序员:p

你知道比插入字符串到SQL更好。不要这样做。使用PreparedStatement。除了更安全和更安全外,它还会更快,因为PgJDBC只需发送一条语句进行解析,然后重新使用它。 PreparedStatement非常适合用于JDBC批处理。

现在,退一步有点...

Saving an entity involves adding rows to different tables with keys autogenerated by inserting a row in one table being used for inserting some row into another table. Such a logic made me create and use a stored procedure.

这是简单的方式来写它,但它不会奇妙演出。你在不同的表上做了很多独立的操作,很多零散的索引更新等等。还有一些过程调用开销,每个单独查询的开销等等。pl/pgsql中的每个块都有一个不平凡的开销。

用这种方法你会遇到数十万甚至上百万行的问题,更不用说数十亿了。

关系数据库认为集合最好。如果你真的看上十亿行,基于proc的方法将无法工作。您需要批量处理原始输入,将它们插入临时表中,然后对临时数据使用一系列查询将其插入到目标表中。

如果你对PostgreSQL的9.5,你会使用INSERT ... ON CONFLICT ...您UPSERT般的操作中受益你需要熟悉INSERT INTO ... SELECT ...UPDATE ... FROM ...data-modifying common-table expressions等。

一段时间以后,这种想法会很痛苦,但这很有价值,你不会相信你在工作时获得的表现而不是单个项目。

我无法为你写出所有东西 - 你没有显示原始数据,没有模式,也没有解释细节。这很好,因为那不是你的问题。无论如何它会太长,所以SO不是一个代码为我的网站。

1

好吧,我摆脱了存储过程,以防止批次失败,并因此在批次失败的情况下依靠无证行为。 现在批处理被包装在事务中,并且Statement被PreparedStatement替换(事实上,它并没有在这个脚本中导致更好的速度性能)。 我使用了INSERT INTO ... SELECT ...和INSERT ... ON CONFLICT ...所以很多逻辑从一个存储过程转移到了SQL命令。

现在看起来是这样的:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Short])]): Unit = { 
    val batchSize = 1000 
    val nRetries = 10 

    def updStatements(item: (String, String, String, Long, Array[Short]), c: Connection, statement1: PreparedStatement, 
        statement2: PreparedStatement, statement3: PreparedStatement, statement4: PreparedStatement) = { 
    val sentence = if (item._3.length > 2712) item._3.substring(0, 2712) else item._3 
    statement1.setString(1, item._1) 
    statement2.setString(1, item._2) 
    statement3.setString(1, sentence) 
    statement4.setString(1, item._1) 
    statement4.setString(2, item._2) 
    statement4.setString(3, sentence) 
    statement4.setString(4, sentence) 
    statement4.setLong(5, item._4) 
    statement4.setArray(6, c.createArrayOf("int4", item._5.map(new Integer(_)).asInstanceOf[Array[Object]])) 
    statement1.addBatch() 
    statement2.addBatch() 
    statement3.addBatch() 
    statement4.addBatch() 
    } 
    def executeStatements(statement1: PreparedStatement, statement2: PreparedStatement, 
         statement3: PreparedStatement, statement4: PreparedStatement) = { 
    statement1.executeBatch() 
    statement2.executeBatch() 
    statement3.executeBatch() 
    statement4.executeBatch() 
    } 

    Class.forName(driver) 
    var conn: Connection = null 

    try { 
    conn = DriverManager.getConnection(connectionString) 
    conn.setAutoCommit(false) 
    val statement1 = conn.prepareStatement("INSERT INTO warcinfo_id (warcinfo_id_value) VALUES (?) ON CONFLICT (warcinfo_id_value) DO NOTHING;") 
    val statement2 = conn.prepareStatement("INSERT INTO record_id (record_id_value) VALUES (?) ON CONFLICT (record_id_value) DO NOTHING;") 
    val statement3 = conn.prepareStatement("INSERT INTO sentence_text (sent_text) VALUES (?) ON CONFLICT (sent_text) DO NOTHING;") 
    val statement4 = conn.prepareStatement(
     """ 
     |INSERT INTO sentence_occurrence (warcinfo_id, record_id, sent_id, timestamp, sect_ids) VALUES (
     | (SELECT warcinfo_id_id FROM warcinfo_id WHERE warcinfo_id_value = ?), 
     | (SELECT record_id_id FROM record_id WHERE record_id_value = ?), 
     | (SELECT sent_id FROM sentence_text WHERE md5(sent_text) = md5(?) AND sent_text = ?), 
     | TO_TIMESTAMP(?), 
     | ? 
     |) 
     """.stripMargin) 
    var i = 0 
    val batch = ListBuffer[(String, String, String, Long, Array[Short])]() 
    conn.setAutoCommit(false) 

    def executeBatch() = { 
     var attempts = 0 
     while (attempts < nRetries) { 
     try { 
      for (item <- batch) updStatements(item, conn, statement1, statement2, statement3, statement4) 
      executeStatements(statement1, statement2, statement3, statement4) 
      conn.commit() 
      batch.clear() 
      attempts += nRetries 
     } catch { 
      case e: SQLException => { 
      attempts += 1 
      println("exception caught: " + e.getNextException) 
      conn.rollback() 
      } 
     } 
     } 
    } 

    iterator.foreach(r => { 
     i += 1 
     batch += r 
     if (i % batchSize == 0) { 
     executeBatch() 
     } 
    }) 
    if (i % batchSize != 0) { 
     executeBatch() 
    } 
    } catch { 
    case e: SQLException => println("exception caught: " + e) 
    } finally { 
    conn.close() 
    } 
} 

此代码似乎并没有对我虽然很整齐......

数据是相应的一些句子,其时间戳和一些标识物品的流。因此,r变量的内容如下: ('4af93233-3515-43da-8b47-71b0dad99ccc','d5ea8a14-be65-4281-9a87-24dcbdc3f879','权威指南是互联网',1362484800 ,[1])

每个项目存储表'sentence_occurrence',如果需要'warcinfo_id','record_id','sentence_text'。

模式是以下几点:

statement.executeUpdate(
    """ 
    |CREATE TABLE warcinfo_id (
    | warcinfo_id_id serial PRIMARY KEY, 
    | warcinfo_id_value char(36) UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE record_id (
    | record_id_id serial PRIMARY KEY, 
    | record_id_value char(36) UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE sentence_text (
    | sent_id serial PRIMARY KEY, 
    | sent_text varchar UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE sentence_occurrence (
    | entry_id serial PRIMARY KEY, 
    | warcinfo_id integer NOT NULL, 
    | record_id integer NOT NULL, 
    | sent_id integer NOT NULL, 
    | timestamp timestamp NOT NULL, 
    | sect_ids smallint ARRAY 
    |); 
    """.stripMargin) 

添加后克雷格的评论:

谢谢,克雷格。什么是对输入组的操作?你能发布一个例子的链接吗?

此外,我有以下问题。如果两个批处理尝试同时在某个表中插入相同的记录,则会收到java.sql.BatchUpdateException,并显示如下消息“ERROR:deadlock detected。详细信息:进程31959在事务24298876上等待ShareLock;由进程31955阻止。 31955在交易24298877上等待ShareLock;被进程31959阻止。“什么是这种情况下的正确解决方案?我可以考虑重试失败的尝试,直到它成功或达到重试次数的限制,重复存储,然后用SELECT DISTICT ...生成最终结果表,玩隔离级别(例如尝试“未提交读取”) 。但是,它们都是危险的解决方法(重试次数达到极限,磁盘空间不足,数据库中出现错误数据)。

+0

干得好。如果插入操作的是多组输入,而不是逐个调用,那么您将获得更大的改进,但它应该已经是一种改进。理想情况下,您可以使用PgJDBC的CopyManager加载临时表,然后处理临时表。 –