爬虫有一个urlQueue来记录要抓取的url,一个模拟异步url fetcher。
我试图用rx-java风格编写它。 起初,我尝试Flowable.generate这样如何重写以下rx-java爬虫
Flowable.generate((Consumer<Emitter<Integer>>) e -> {
final Integer poll = demo.urlQueue.poll();
if (poll != null) {
e.onNext(poll);
} else if (runningCount.get() == 0) {
e.onComplete();
}
}).flatMap(i -> {
runningCount.incrementAndGet();
return demo.urlFetcher.asyncFetchUrl(i);
}, 10)
.doOnNext(page -> demo.onSuccess(page))
.subscribe(page -> runningCount.decrementAndGet());
,但它不会工作,因为在开始时,有可能只有一个urlQueue种子,所以产生被称为10次,但只有一个即onNext被发射。只有当它完成时,才会调用下一个请求(1) - >生成。
尽管在代码中,我们指定flatMap maxConcurrency为10,它会逐一抓取。
之后,我修改下面的代码,它可以像预期的那样工作。
但是在代码中,我应该关心当前有多少任务正在运行,然后计算应该从队列中提取多少个任务,我认为rx-java应该完成这项工作。
我不确定代码是否可以用更简单的方法重写。
public class CrawlerDemo {
private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class);
// it can be redis queue or other queue
private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>();
private static AtomicInteger runningCount = new AtomicInteger(0);
private static final int MAX_CONCURRENCY = 5;
private UrlFetcher urlFetcher = new UrlFetcher();
private void addSeed(int i) {
urlQueue.offer(i);
}
private void onSuccess(Page page) {
page.links.forEach(i -> {
logger.info("offer more url " + i);
urlQueue.offer(i);
});
}
private void start(BehaviorProcessor processor) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
} else {
processor.onComplete();
}
}
private int dispatchMoreLink(BehaviorProcessor processor) {
int links = 0;
while (runningCount.get() <= MAX_CONCURRENCY) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
links++;
} else {
if (runningCount.get() == 0) {
processor.onComplete();
}
break;
}
}
return links;
}
private Flowable<Page> asyncFetchUrl(int i) {
return urlFetcher.asyncFetchUrl(i);
}
public static void main(String[] args) throws InterruptedException {
CrawlerDemo demo = new CrawlerDemo();
demo.addSeed(1);
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
processor
.flatMap(i -> {
runningCount.incrementAndGet();
return demo.asyncFetchUrl(i)
.doFinally(() -> runningCount.decrementAndGet())
.doFinally(() -> demo.dispatchMoreLink(processor));
}, MAX_CONCURRENCY)
.doOnNext(page -> demo.onSuccess(page))
.subscribe();
demo.start(processor);
}
}
class Page {
public List<Integer> links = new ArrayList<>();
}
class UrlFetcher {
static Logger logger = LoggerFactory.getLogger(UrlFetcher.class);
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public Flowable<Page> asyncFetchUrl(Integer url) {
logger.info("start async get " + url);
return Flowable.defer(() -> emitter ->
scheduledExecutorService.schedule(() -> {
Page page = new Page();
// the website urls no more than 1000
if (url < 1000) {
page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList());
}
logger.info("finish async get " + url);
emitter.onNext(page);
emitter.onComplete();
}, 5, TimeUnit.SECONDS)); // cost 5 seconds to access url
}
}