2016-11-11 45 views
3

我正在阅读来自Twitter Streaming API的推文。连接到API后,我得到一个生成器。如何在某个特定时间退出发电机?

我正在循环播放收到的每条推文,但是我想在18PM退出迭代器。收到每条推文后,我会检查它是否晚于指定的时间戳并停止。

问题是我没有经常收到推文。所以,我可以在17:50收到一个,下一个在19PM。那时候我会发现时间已过,我需要停下来。

有没有办法在18PM时强制停止?

这里是我的代码的高级视图:

def getStream(tweet_iter): 
    for tweet in tweet_iter: 
     #do stuff 
     if time_has_passed(): 
      return 

tweet_iter = ConnectAndGetStream() 
getStream(tweet_iter) 
+2

注意:遵循PEP 8(getStream应该是get_stream,正式推荐)是个好主意。 – EOL

+0

为什么你的脚本不能在六点时停止运行? – jonrsharpe

+2

我猜从tweet生成器中获得yieldvalue的时间是动态的,所以您必须在某种超时时间内包装下一个() - 调用以腾出空间来检查它是什么时间。请参阅http://stackoverflow.com/questions/492519/timeout-on-a-function-call – Moberg

回答

1

你的问题可以通过分割设计的功能被分解为两个独立的过程:

  1. Twitter的过程中充当封装到Twitter API和
  2. 一个监视器进程,当到达退出时间时能够终止twitter进程。

下面的一段代码原型以上使用Python的多处理模块所描述的功能:

import multiprocessing as mp 
import time 

EXIT_TIME = '12:21' #'18:00' 

def twitter(): 

    while True: 
     print 'Twittttttttttt.....' 
     time.sleep(5) 

def get_time(): 

    return time.ctime().split()[3][:5] 

if __name__ == '__main__': 

    # Execute the function as a process 
    p = mp.Process(target=twitter, args=()) 
    p.start() 

    # Monitoring the process p 
    while True: 
     print 'Checking the hour...' 
     if get_time() == EXIT_TIME: 
      p.terminate() 
      print 'Current time:', time.ctime() 
      print 'twitter process has benn terminated...' 
      break 
     time.sleep(5) 

当然,你可以使用p.join(TIMEOUT),而不是使用以所呈现的,而真正的循环我的示例如here所示。

+0

谢谢。你的原型似乎工作完美,但我面临一些问题。我将一些参数传递给'twitter'函数。其中,我传递一个记录器对象,我得到这个错误消息“TypeError:不能pickle thread.lock对象”。你知道这件事吗? – Stergios

+0

请查看以下帖子:http://stackoverflow.com/a/7865512/2194843 包含您遇到的错误类型的解决方法。 – funk

1

这里是与线程和调度器蟒一个例子:

import threading 
import time 
import os 
import schedule 

def theKillingJob(): 
    print("Kenny and Cartman die!") 
    os._exit(1) 

schedule.every().day.at("18:00").do(theKillingJob,'It is 18:00') 

def getStream(tweet_iter): 
    for tweet in tweet_iter: 
     #do stuff 

def kenny(): 
    while True: 
     print("Kenny alive..") 
     schedule.run_pending() 
     time.sleep(1) 

def cartman(): 
    while True: 
     print("Cartman alive..") 

     tweet_iter = ConnectAndGetStream() 
     getStream(tweet_iter) 

     # You can change whenever you want to check for tweets by changing sleep time here 
     time.sleep(1) 

if __name__ == '__main__': 
    daemon_kenny = threading.Thread(name='kenny', target=kenny) 
    daemon_cartman = threading.Thread(name='cartman', target=cartman) 
    daemon_kenny.setDaemon(True) 
    daemon_cartman.setDaemon(True) 

    daemon_kenny.start() 
    daemon_cartman.start() 
    daemon_kenny.join() 
    daemon_cartman.join() 
1

创建用于生产一个单独的线程,并使用Queue进行通信。我还必须使用threading.Event来阻止制片人。

import itertools, queue, threading, time 

END_TIME = time.time() + 5 # run for ~5 seconds 

def time_left(): 
    return END_TIME - time.time() 

def ConnectAndGetStream():    # stub for the real thing 
    for i in itertools.count(): 
     time.sleep(1) 
     yield "tweet {}".format(i) 

def producer(tweets_queue, the_end): # producer 
    it = ConnectAndGetStream() 
    while not the_end.is_set(): 
     tweets_queue.put(next(it)) 

def getStream(tweets_queue, the_end): # consumer 
    try: 
     while True: 
      tweet = tweets_queue.get(timeout=time_left()) 
      print('Got', tweet) 
    except queue.Empty: 
     print('THE END') 
     the_end.set() 

tweets_queue = queue.Queue() # you might wanna use the maxsize parameter 
the_end = threading.Event() 
producer_thread = threading.Thread(target=producer, 
            args=(tweets_queue, the_end)) 
producer_thread.start() 
getStream(tweets_queue, the_end) 
producer_thread.join() 
相关问题