2011-11-23 99 views
3

我有一个控制器,我试图从远程源获取XML文件。一次多个Nokogiri请求

喜欢的东西:

@artist = Nokogiri.XML(open(url).read) 

不过,我想立刻获得不同的数据来执行这些倍数。我能以某种方式使用线程吗?

单独执行需要400ms。所以当它们连续执行三次时,响应高达大约1s +。

+0

你可能想看看在百头巨怪和水润,如果你担心并行加载网址。他们是经过充分测试的工具,而不是自己写。 –

回答

5

是的,你可以使用线程:

named_urls = { 
    artist: 'http://foo.com/bar', 
    song: 'http://foo.com/jim', 
    # etc. 
} 
@named_xmls = {} 
one_at_a_time = Mutex.new 
named_urls.map do |name,url| 
    Thread.new do 
    doc = Nokogiri.XML(open(url).read) 
    one_at_a_time.synchronize{ @named_xmls[name] = doc } 
    end 
end.each(&:join) 

# At this point @named_xmls will be populated will all Nokogiri documents 

我不能肯定,如果在一个共享的哈希写入不同的密钥,需要一个互斥或没有,但它不会伤害是安全的。

+0

太棒了。这工作很好!非常感谢。 – Jonovono

4

对于大量的网址,您无法打开大量的线程,因为您将饱和连接带宽,并且您将开始发生连接错误。对于我的特定电缆调制解调器和特定的服务器,我发现16个线程是一个很好的价值。

我使用了一个Mathematica来控制和改变我的ruby web scrapping程序的线程数并监视它的性能以获得不同数量的线程。这是结果:

performance vs threads plot

,而不是直接使用Thread.new,我写道,打开一个新的线程,只有当线程总数少于你的配置最大的包装功能:

def maybe_new_thread 
    File.open('max_threads.cfg', 'r') { |file| @MAX_THREADS = file.gets.to_i } 
    if Thread.list.size < @MAX_THREADS 
    Thread.new { yield } 
    else 
    yield 
    end 
end 

请注意,所需线程的最大数量仅为存储在名为max_threads.cfg的文件中的数字,并且每次调用函数时都会读取此文件。这允许您在程序运行时更改此变量的值。

程序的一般结构是这样的:

named_urls = [ 'http://foo.com/bar', (... hundreds of urls ...),'http://foo.com/jim'] 
named_urls.each do |url| 
    maybe_new_thread do 
    doc = Nokogiri.HTML(open(url)) 
    process_and_insert_in_database(doc) 
    end 
end 

注意,每个线程存储其结果在数据库中,所以我并不需要使用Mutex类来协调线程之间的任何东西。

当我插入到数据库中时,我将包含每个结果插入时的精确时间的列。这是至关重要的,以便您可以计算您获得的表现。确保你用毫秒支持来定义这个列(我使用MariaDB 5.3)。

这是我在用数学控制线程的最大数量,并实时绘制图中的代码:当它运行

named_urls = { 
    'http://foo.com/bar', (... hundreds of urls ...),'http://foo.com/jim', 
} 
named_urls.each do |url| 
    maybe_new_thread do 
    doc = Nokogiri.HTML(open(url)) 
    process_and_insert_in_database(doc) 
    end 
end 

setNumberOfThreads[n_] := Module[{}, 
    Put[n, "max_threads.cfg"]; 
    SQLExecute[conn,"DELETE FROM results"]] 

operationsPerSecond := SQLExecute[conn, 
    "SELECT 
    (SELECT COUNT(*) FROM results)/ 
    (SELECT TIME_TO_SEC(TIMEDIFF((SELECT fin FROM results ORDER BY finishTime DESC LIMIT 1), 
            (SELECT fin FROM results ORDER BY finishTime  LIMIT 1))))"][[1, 1]]; 
cops = {}; 
RunScheduledTask[AppendTo[cops, operationsPerSecond], 2]; 
Dynamic[ListLinePlot[cops]] 

,一旦你看到的是,性能稳定,您可以使用setNumberOfThreads[]更改线程数,并查看性能的影响。

最后一个评论。而不是直接使用开放式的URI的开法,我用这个包装,所以这是错误的情况下,它会自动重试:

def reliable_open(uri) 
    max_retry = 10 
    try_counter = 1 
    while try_counter < max_retry 
    begin 
     result = open(uri) 
     return result 
    rescue 
     puts "Error when trying to open #{uri}" 
     try_counter += 1 
     sleep try_counter * 10 
    end 
    end 
    raise "Imposible to open after #{max_retry} retries" 
end