2013-04-04 121 views
3

我想在cassandra中插入约5000万行(〜30列),目前只有1个节点。向cassandra插入大量数据

我从另一个数据源查询我的数据并存储在一个表对象中。我迭代通过分别解析每一行然后将其添加到增变器。目前,我一次插入100行,100万行需要40分钟!我如何加快这个过程? (我也尝试过client.batch_mutate(),但它似乎有重置连接错误块数大小的插入数千)2)。

通过搜索我看到多线程可能有所帮助。但我找不到任何例子,有人可以链接我吗?谢谢 !!

我当前的代码:

 List<String> colNames = new ArrayList<String>(); 
     List<String> colValues = new ArrayList<String>(); 
     SomeTable result = Query(...); // this contains my result set of 1M rows initially 

     for (Iterator itr = result.getRecordIterator(); itr.hasNext();) { 
       String colName =..... 
       String colValue = ..... 

      int colCount = colNames.size(); // 100 * 30 

      for (int i = 0; i < colCount; i++) { 
       //add row keys and columns to mutator 
       mutator.addInsertion(String.valueOf(rowCounter), "data", HFactory.createStringColumn(colNames.get(i), colValues.get(i))); 
      } 
      rowCounter++; 

      //insert rows of block size 100 
      if (rowCounter % 100==0) { 

       mutator.execute(); 
       //clear data 
       colNames = new ArrayList<String>(); 
       colValues = new ArrayList<String>(); 
       mutator = HFactory.createMutator(keyspace, stringSerializer); 
      } 

     } 

回答

2

多线程将有很大的帮助,是的。目前,您正在Cassandra中使用一个连接,这意味着您只能在Cassandra中使用单个线程。您需要使用多个连接,这需要客户端中有多个线程。

一种方法是使用Java ThreadPoolExecutor并将mutator.execute()包装在可运行的环境中,然后在线程池中执行它。小心处理异常。您还应该使用BlockingQueue来限制排队的突变数量,以防您读取源的速度比Cassandra可以插入的速度快。

用这个,在Hector中设置你的连接池大小为10,你的插入应该快得多。

如果您不清楚Cassandra的注意事项,请注意,Cassandra不适用于单节点操作。我假设你打算扩展和添加复制。如果不是的话,那么您可能会找到更高性能,更简单的替代解决方案来满足您的需求。使用多个节点时,多个连接和线程变得尤为重要,因此您的插入速率可以扩展。

+0

感谢您的回答!所以我需要多个节点,如果我想让我的客户端多线程?我不知道多线程,我不知道你是否知道在线的任何好的多线程cassandra例子? 是的,我现在正在测试,稍后会扩展到更多节点。 @Richard – 2013-04-04 13:09:55

+0

不,您可以为每个节点建立多个连接,这就是让您的客户端成为多线程所需的全部内容。我不知道Cassandra的例子,但ThreadPoolExecutor的javadoc是很好的http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html,并且有关于Java的教程在此处进行线程化http://docs.oracle.com/javase/tutorial/essential/concurrency/index.html – Richard 2013-04-04 13:15:19

+0

再次感谢@Richard。对不起另一个新手问题 - 看起来我会将我的代码封装在Runnable中,并创建许多线程并发送不同的“表”对象。我的问题是,我是否应该为每个线程或同一个线程创建一个新的Cluster/Mutator/Keyspace对象? – 2013-04-04 15:28:55