2015-10-19 39 views
1

我正在写一个小程序,将推特从Twitter公共流中放入HBase数据库。该程序使用两个线程,一个收集推文,一个处理它们。 第一个线程使用twitter4j StatusListener获取推文并将它们放入一个容量为100的ArrayBlockingQueue中。 第二个线程从队列中获取状态,过滤所需数据并将其移至数据库。 处理比收集状态需要更多的时间。全部队列中的java blockingqueue消费块

制片人看起来是这样的:

public void onStatus(Status status) { 
    try { 
     this.queue.put(status); 
    } catch(Exception ex) { 
     ex.printStackTrace(); 
    } 
} 

消费者使用需要,并调用一个函数来处理新的状态:

public void run() { 
    try { 
     while(true) { 
      // Get new status to process 
      this.status = this.queue.take(); 
      this.analyse(); 
     } 
    } catch(Exception ex) { 
     ex.printStackTrace(); 
    } 
} 

在两个线程创建并启动了主要功能:

ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100); 

Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public)); 
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public)); 

ta_public.start(); 
st_public.start(); 

该程序运行一段时间没有任何问题,但然后停止s uddenly。此时队列已满,消费者似乎无法从中获得新的状态。我尝试了几种生产者/消费者模式的变化,但没有成功。没有例外被抛出。

我不知道是为了寻找失败。我希望有人能给我一个提示或解决方案。

+0

有一段时间?当队列填满或发生一段时间后,故障是否立即发生? 'analyse'中是否有任何'System.exit'类型的调用(或从那里调用的方法)? –

+0

不,它只是过滤推文的hashtags,用户名和文本,并将它们放入数据库中。 –

+0

如果你不叫'analyse',它仍然失败吗? “停止”你说的是什么意思 - JVM退出,它只是挂起,还是其他的东西? –

回答

0

如果使用阻塞队列,如果使用多个列表,请仔细检查代码和拼写错误中的阻塞命令(放置和取出ArrayBlockingQueue)。