2016-07-28 75 views
0

我知道终止通知是通过元数据网址提供,我也可以为了确定做同样的事情到EC2现货实例终止与Python 2.7版

if requests.get("http://169.254.169.254/latest/meta-data/spot/termination-time").status_code == 200 

如果该通知已经公布。我在我的现货实例中运行Python服务是:

  1. 遍历长轮询SQS队列
  2. 如果得到消息,它将暂停投票和作品的有效载荷。
  3. 处理有效负载可能需要5-50分钟。
  4. 处理有效负载将涉及产生最多50个线程的线程池以处理将文件并行上传到S3,这是处理有效负载所花费的大部分时间。
  5. 最后,从队列中删除消息,漂洗,重复。

这项工作是幂等的,所以如果相同的有效负载运行多次,我就不用处理时间/成本,但不会对应用程序工作流程产生负面影响。

我正在寻找一种优雅的方式,现在也在后台每隔5秒轮询终止通知。一旦出现终止通知,我想立即将消息释放回SQS队列,以便另一个实例尽快提取它。

作为奖励,我想关闭工作,关闭线程池并让服务进入停滞状态。如果我终止服务,supervisord将简单地重新启动它。

更大的奖金!有没有可用的Python模块简化了这一点,只是工作?

回答

0

我编写了这段代码来演示如何使用一个线程来轮询Spot实例终止。它首先启动一个轮询线程,它将负责检查http端点。

然后我们创建假工人池(模仿实际工作)并开始运行池。最终,轮询线程将在(执行后约10秒执行)中踢出并杀死整个事情。

为了防止脚本在Supervisor重新启动后继续工作,我们只需在__main__的开头进行检查,如果终止通知在那里,我们会休眠2.5分钟,这比通知之前持续的时间要长该实例已关闭。

#!/usr/bin/env python 
import threading 
import Queue 
import random 
import time 
import sys 
import os 

class Instance_Termination_Poll(threading.Thread): 
    """ 
    Sleep for 5 seconds and eventually pretend that we then recieve the 
    termination event 

    if requests.get("http://169.254.169.254/latest/meta-data/spot/termination-time").status_code == 200 
    """ 

    def run(self): 
     print("Polling for termination") 
     while True: 
      for i in range(30): 
       time.sleep(5) 
       if i==2: 
        print("Recieve Termination Poll!") 
        print("Pretend we returned the message to the queue.") 
        print("Now Kill the entire program.") 
        os._exit(1) 
      print("Well now, this is embarassing!") 

class ThreadPool: 
    """ 
    Pool of threads consuming tasks from a queue 
    """ 

    def __init__(self, num_threads): 
     self.num_threads = num_threads 
     self.errors = Queue.Queue() 
     self.tasks = Queue.Queue(self.num_threads) 
     for _ in range(num_threads): 
      Worker(self.tasks, self.errors) 

    def add_task(self, func, *args, **kargs): 
     """ 
     Add a task to the queue 
     """ 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """ 
     Wait for completion of all the tasks in the queue 
     """ 
     try: 
      while True: 
       if self.tasks.empty() == False: 
        time.sleep(10) 
       else: 
        break 
     except KeyboardInterrupt: 
      print "Ctrl-c received! Kill it all with Prejudice..." 
      os._exit(1) 

     self.tasks.join() 

class Worker(threading.Thread): 
    """ 
    Thread executing tasks from a given tasks queue 
    """ 

    def __init__(self, tasks, error_queue): 
     threading.Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.errors = error_queue 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: 
       func(*args, **kargs) 
      except Exception, e: 
       print("Exception " + str(e)) 
       error = {'exception': e} 
       self.errors.put(error) 

      self.tasks.task_done() 

def do_work(n): 
    """ 
    Sleeps a random ammount of time, then creates a little CPU usage to 
    mimic some work taking place. 
    """ 
    for z in range(100): 
     time.sleep(random.randint(3,10)) 
     print "Thread ID: {} working.".format(threading.current_thread()) 
     for x in range(30000): 
      x*n 
     print "Thread ID: {} done, sleeping.".format(threading.current_thread()) 

if __name__ == '__main__': 
    num_threads = 30 

    # Start up the termination polling thread 
    term_poll = Instance_Termination_Poll() 
    term_poll.start() 

    # Create our threadpool 
    pool = ThreadPool(num_threads) 
    for y in range(num_threads*2): 
     pool.add_task(do_work, n=y) 

    # Wait for the threadpool to complete 
    pool.wait_completion()