2016-02-05 56 views
2

我使用Datastax驱动程序从Cassandra获取大量行,并且需要尽快处理它们。使用具有Java 8并行数据流的Datastax Cassandra ResultSet - 快速地

我已经调查使用List::parallelStream().forEach()这似乎很大,首先因为ResultSet行为很像0​​,但遗憾的是我不能直接在ResultSet使用parallelStream()。为了得到这个工作,我首先必须使用ResultSet::all()这真的很慢 - 我假设它遍历每个元素。

ResultSet rs = this.getResultSet(); // Takes <1 second 

// Convert the ResultSet to a list so as I can use parallelStream(). 
List<Row> rsList = rs.all(); // Takes 21 seconds 

rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds 

有没有更快的方法可以处理结果集的每一行?

+1

出于好奇,多少时间RLIST的顺序处理走?看看你的时间数字,你的瓶颈不是行的处理,而是他们的检索。所以平行化行处理会给你一个相当小的改进。对我来说,你似乎应该考虑优化你的查询/架构/集群设置/网络。 – Ralf

+0

我来自一个PHP背景,所以我习惯于调用execute()后包含行的结果集。这不是这种情况吗? –

+1

独立于您使用问题的驱动程序是相同的:您通过驱动程序向C *提交查询; C *处理查询并计算结果集的行; C *将结果行发送给驱动程序(在@doanduyhai指出的页面中)。只要驱动程序使第一行可用,客户端就可以开始处理行('one()'返回第一行)。因此,您可以开始并行处理行,以便发送更多行,从而有效地并行处理流程。但总体而言,您的速度永远不会比C *向您发送最后一行结果所花费的时间更快。 – Ralf

回答

2

为了得到这个工作,我首先要使用的ResultSet ::所有()这确实是慢

ResultSet.all()将使用服务器端分页所有行。您可以使用statement.setFetchSize()

控制页面大小有没有更快的方法可以处理结果集的每一行?

这取决于您的查询,它是什么?如果你正在做一个完整的分区扫描,只有几台机器在做这项工作,但是如果你从多个分区获取数据,你可以尝试用多个查询并行化它们,每个分区一个。

2

你可以试试这个:

ResultSet rs = this.getResultSet(); // Takes <1 second 

StreamSupport.stream(
    Spliterators.spliteratorUnknownSize(
       rs.iterator(), Spliterator.ORDERED), false) 
     .parallel().forEach(this::processRow); 

省略调用rs.all()

希望,如果ResultSet允许立即开始迭代,你将能够更早并行处理。

更新

后检查ResultSet源本是我看到:

方法all()创建一个新的ArrayList和填充它,这需要21秒你的情况

List<Row> result = new ArrayList<Row>(rows.size()); 
for (Row row : this) 
    result.add(row); 

方法next()在迭代器中实现轮询队列而不是

public Row next() { 
    return Row.fromData(metadata, rows.poll()); 
} 

这意味着数据处理在开始处理第一行之前不需要等待21秒。

+0

实施你的第一个建议仍然需要20秒钟来处理行。看起来好像在后端,线程无法同时从ResultSet中读取。 –

+0

我明白了。很抱歉听到这个消息。这不是处理中的并行性问题。我相信它的工作正常。只是从数据库中获取行太慢了。根据Amdahl的法则https://en.wikipedia.org/wiki/Amdahl%27s_law,当链式计算的并行部分与连续计算相比较小时,整体加速很小。 – nolexa

0

与作者描述的结果几乎相同。 我的解决方案是将FetchSize设置为更高的价值。因为我读取默认值是5000.并且获取所有并迭代它为我花了〜25秒。使用.setFetchSize(50000)迭代需要0.8秒。我甚至不相信它。用简单的foreach循环遍历

我的代码:

String sql = "...." 
prepearedSql = session.prepare(sql); 
Statement statement = prepearedSql.bind().setValues(...).setFetchSize(50000); 
ResultSet result = session.execute(statement); 
for (Row row : result) 
    {...