2016-12-07 50 views
0

爬虫有一个urlQueue来记录要抓取的url,一个模拟异步url fetcher。
我试图用rx-java风格编写它。 起初,我尝试Flowable.generate这样如何重写以下rx-java爬虫

Flowable.generate((Consumer<Emitter<Integer>>) e -> { 
     final Integer poll = demo.urlQueue.poll(); 
     if (poll != null) { 
      e.onNext(poll); 
     } else if (runningCount.get() == 0) { 
      e.onComplete(); 
     } 
    }).flatMap(i -> { 
     runningCount.incrementAndGet(); 
     return demo.urlFetcher.asyncFetchUrl(i); 
    }, 10) 
      .doOnNext(page -> demo.onSuccess(page)) 
      .subscribe(page -> runningCount.decrementAndGet()); 

,但它不会工作,因为在开始时,有可能只有一个urlQueue种子,所以产生被称为10次,但只有一个即onNext被发射。只有当它完成时,才会调用下一个请求(1) - >生成。
尽管在代码中,我们指定flatMap maxConcurrency为10,它会逐一抓取。

之后,我修改下面的代码,它可以像预期的那样工作。
但是在代码中,我应该关心当前有多少任务正在运行,然后计算应该从队列中提取多少个任务,我认为rx-java应该完成这项工作。

我不确定代码是否可以用更简单的方法重写。

public class CrawlerDemo { 
    private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class); 

    // it can be redis queue or other queue 
    private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>(); 

    private static AtomicInteger runningCount = new AtomicInteger(0); 

    private static final int MAX_CONCURRENCY = 5; 

    private UrlFetcher urlFetcher = new UrlFetcher(); 

    private void addSeed(int i) { 
     urlQueue.offer(i); 
    } 

    private void onSuccess(Page page) { 
     page.links.forEach(i -> { 
      logger.info("offer more url " + i); 
      urlQueue.offer(i); 
     }); 
    } 

    private void start(BehaviorProcessor processor) { 
     final Integer poll = urlQueue.poll(); 
     if (poll != null) { 
      processor.onNext(poll); 

     } else { 
      processor.onComplete(); 
     } 
    } 

    private int dispatchMoreLink(BehaviorProcessor processor) { 

     int links = 0; 
     while (runningCount.get() <= MAX_CONCURRENCY) { 
      final Integer poll = urlQueue.poll(); 
      if (poll != null) { 
       processor.onNext(poll); 

       links++; 
      } else { 
       if (runningCount.get() == 0) { 
        processor.onComplete(); 
       } 
       break; 
      } 
     } 

     return links; 
    } 

    private Flowable<Page> asyncFetchUrl(int i) { 
     return urlFetcher.asyncFetchUrl(i); 
    } 


    public static void main(String[] args) throws InterruptedException { 
     CrawlerDemo demo = new CrawlerDemo(); 
     demo.addSeed(1); 

     BehaviorProcessor<Integer> processor = BehaviorProcessor.create(); 

     processor 
       .flatMap(i -> { 
        runningCount.incrementAndGet(); 
        return demo.asyncFetchUrl(i) 
          .doFinally(() -> runningCount.decrementAndGet()) 
          .doFinally(() -> demo.dispatchMoreLink(processor)); 
       }, MAX_CONCURRENCY) 
       .doOnNext(page -> demo.onSuccess(page)) 
       .subscribe(); 

     demo.start(processor); 


    } 


} 

class Page { 
    public List<Integer> links = new ArrayList<>(); 
} 

class UrlFetcher { 
    static Logger logger = LoggerFactory.getLogger(UrlFetcher.class); 


    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); 

    public Flowable<Page> asyncFetchUrl(Integer url) { 

     logger.info("start async get " + url); 
     return Flowable.defer(() -> emitter -> 
       scheduledExecutorService.schedule(() -> { 

        Page page = new Page(); 
        // the website urls no more than 1000 
        if (url < 1000) { 
         page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList()); 
        } 

        logger.info("finish async get " + url); 
        emitter.onNext(page); 
        emitter.onComplete(); 
       }, 5, TimeUnit.SECONDS));         // cost 5 seconds to access url 
    } 
} 

回答

0

您试图对RxJava使用常规(非Rx)代码,但没有得到您想要的结果。

要做的第一件事就是到urlQueue.poll()转换成Flowable<Integer>

Flowable.generate((Consumer<Emitter<Integer>>) e -> { 
    final Integer take = demo.urlQueue.take(); // Note 1 
    e.onNext(take); // Note 2 
}) 
    .observeOn(Schedulers.io(), 1) // Note 3 
    .flatMap(i -> demo.urlFetcher.asyncFetchUrl(i), 10) 
    .subscribe(page -> demo.onSuccess(page)); 
  1. 阅读的反应方式的队列意味着阻塞等待。尝试poll()队列会增加一层RxJava允许您跳过的复杂性。
  2. 将收到的值传递给任何订户。如果您需要指示完成,则需要添加外部布尔值,或使用带内指示符(例如负整数)。
  3. observeOn()运营商将订阅发电机。价值1只会导致一个订阅,因为没有多个订阅点。

其余代码与您拥有的相似。您产生的问题是因为flatMap(...,10)操作会向发生器订阅10次,这不是您想要的。你想限制同时抓取的次数。添加runningCount是一种混乱,可以防止早期退出发生器,但它不能取代urlQueue上用于发送数据结束的正确方法。