2012-03-13 116 views
3

我有一个Java程序需要将大量较大的行插入到SQL Server数据库中。行数是800k,每个的大小大约是200字节。使用Java线程并行插入到数据库中

当前它们被分成50个批次,然后每个批次都使用一个语句插入。 (我们已经通过JTDS日志记录确认每个批次都使用一次sp_exec调用。)调整批量大小在25到250之间看起来没有任何显着影响,50几乎是最佳的。

我已经尝试将批次分成(比如说)5个组,并使用线程并行处理每个组。这显着更快 - 比5个线程快两倍以上。

我的问题是关于使线程使用健壮。特别是,如果任何批次失败,将会抛出异常。我想让这个异常被捕获并传递给调用者,并且我希望在我们传递它之前100%确定其他线程已经完成(中止或完成)。因为在稍后在程序中恢复异常时,我们不希望意外的行继续到达表中。

这里是我做了什么:

/** Method to insert a single batch. */ 
private void insertBatchPostings(Collection<Posting> postings) throws PostingUpdateException 
{ 
    // insert the batch using a single INSERT invokation 
    // throw a PostingUpdateException if anything goes wrong 
} 

private static final int insertionThreads = 5; 

/** Method to insert a collection of batches in parallel, using the above. */ 
protected void insertBatchPostingsThreaded(Collection<Collection<Posting>> batches) throws PostingUpdateException 
{ 
    ExecutorService pool = Executors.newFixedThreadPool(insertionThreads); 
    Collection<Future> futures = new ArrayList<Future>(batches.size()); 

    for (final Collection<Posting> batch : batches) { 
     Callable c = new Callable() { 
      public Object call() throws PostingUpdateException { 
       insertBatchPostings(batch); 
       return null; 
      }    
     }; 
     /* So we submit each batch to the pool, and keep a note of its Future so we can check it later. */ 
     futures.add(pool.submit(c)); 
    } 

    /* Pool is running, indicate that no further work will be submitted to it. */ 
    pool.shutdown(); 

    /* Check all the futures for problems. */ 
    for (Future f : futures) { 
     try { 
      f.get(); 
     } catch (InterruptedException ex) { 
      throw new PostingUpdateException("Interrupted while processing insert results: " + ex.getMessage(), ex); 
     } catch (ExecutionException ex) { 
      pool.shutdownNow(); 
      throw (PostingUpdateException) ex.getCause(); 
     } 
    } 
} 

通过这个返回我要保证所有的线程都处于休眠状态的时间。

问题

(我想澄清我在问什么。)

  1. 是上面的代码完全健壮,在没有线程插入将继续insertBatchPostingsThreaded后操作回报?
  2. 是否有更好更简单的方法使用Java并发功能来实现这一点?我的代码看起来过于复杂(让我怀疑错过边缘案例)。
  3. 一旦任何一个线程出现故障,最好的方法是让它失效吗?

我不是一个自然的Java程序员,所以我希望最终得到的东西不会宣传这个事实。 :)

+0

Augh。你可以使用泛型来使你的代码更具可读性吗? – 2012-03-13 00:03:12

+0

@Edmund禁用批量插入表索引可提高速度。你必须触发索引重新计算。 – hidralisk 2012-03-13 00:24:49

+0

@Louis - 我从工作计划中逐字拷贝它以确保它是准确的;这是一个传统的应用程序。但我试图将它翻译成现代Java。我认为for循环最让你感到愤怒,但我也翻译了集合类型。 – Edmund 2012-03-13 00:29:22

回答

1

番石榴的Futures.successfulAsList采取期货列表作为输入,并返回一个未来“其价值是包含所有成功输入期货价值的列表”。您可以在生成的Future上调用get(),然后遍历您的原始未来列表以检查是否有任何故障。

+0

我的其他要求(我已经添加到问题中)是,如果有任何失败,池中的其余任务可以被取消或中止,以便快速失败。番石榴有什么可以帮助吗? – Edmund 2012-03-13 00:39:00

+0

啊。我没有看到你想要所有其他线程都失败。然而,使用'ListenableFuture'来添加回调函数来取消所有其他期货并不困难...... – 2012-03-13 01:12:29

+0

ListenableFuture也会调用监听器,监听器又会调用池中的shutdownNow?看看Java源代码,似乎shutdownNow努力取消所有排队的任务,所以它可能已经在我的代码中做到了,但是如果我可以使用Guava中的某些东西来使代码更清洁,那么我就是为了这一点。 – Edmund 2012-03-13 22:20:39