2016-10-10 125 views
0

我创建了以下脚本以从API端点下载图像,并按预期工作。事情是因为所有的请求必须相互等待,所以速度很慢。什么是正确的方法可以使我想要获取的每个项目都同步地执行步骤,但是要使每个项目都平行。这从在线服务称为 servicem8 所以我希望达到的目标是:python请求网址并行

  • 获取所有可能的作业ID =>保持名称/和其他信息
  • 获取客户
  • 取每个附件的名称的工作

这三个步骤应该为每个工作完成。所以我可以为每项工作平行进行,因为他们不必相互等待。

更新

问题我不明白的是你怎么能确保你在一个电话作为其唯一的每个项目,我可以做的事情在平行束例如每个项目的三个电话,以供例如,当欲

  • 取项(取名=>取描述=>取ID)

所以其取回我想使平行项目?

当前的代码,我有工作,但相当缓慢:

import requests 
import dateutil.parser 
import shutil 
import os 

user = "[email protected]" 
passw = "test" 

print("Read json") 
url = "https://api.servicem8.com/api_1.0/job.json" 
r = requests.get(url, auth=(user, passw)) 

print("finished reading jobs.json file") 
scheduled_jobs = [] 
if r.status_code == 200: 
    for item in r.json(): 
     scheduled_date = item['job_is_scheduled_until_stamp'] 
     try: 
      parsed_date = dateutil.parser.parse(scheduled_date) 
      if parsed_date.year == 2016: 
       if parsed_date.month == 10: 
        if parsed_date.day == 10: 
         url_customer = "https://api.servicem8.com/api_1.0/Company/{}.json".format(item[ 
                            'company_uuid']) 
         c = requests.get(url_customer, auth=(user, passw)) 
         cus_name = c.json()['name'] 
         scheduled_jobs.append(
          [item['uuid'], item['generated_job_id'], cus_name]) 

     except ValueError: 
      pass 

    for job in scheduled_jobs: 
     print("fetch for job {}".format(job)) 
     url = "https://api.servicem8.com/api_1.0/Attachment.json?%24filter=related_object_uuid%20eq%20{}".format(job[ 
                               0]) 
     r = requests.get(url, auth=(user, passw)) 
     if r.json() == []: 
      pass 
     for attachment in r.json(): 
      if attachment['active'] == 1 and attachment['file_type'] != '.pdf': 
       print("fetch for attachment {}".format(attachment)) 
       url_staff = "https://api.servicem8.com/api_1.0/Staff.json?%24filter=uuid%20eq%20{}".format(
        attachment['created_by_staff_uuid']) 
       s = requests.get(url_staff, auth=(user, passw)) 
       for staff in s.json(): 
        tech = "{}_{}".format(staff['first'], staff['last']) 

       url = "https://api.servicem8.com/api_1.0/Attachment/{}.file".format(attachment[ 
                        'uuid']) 
       r = requests.get(url, auth=(user, passw), stream=True) 
       if r.status_code == 200: 
        creation_date = dateutil.parser.parse(
         attachment['timestamp']).strftime("%d.%m.%y") 
        if not os.path.exists(os.getcwd() + "/{}/{}".format(job[2], job[1])): 
         os.makedirs(os.getcwd() + "/{}/{}".format(job[2], job[1])) 
        path = os.getcwd() + "/{}/{}/SC -O {} {}{}".format(
         job[2], job[1], creation_date, tech.upper(), attachment['file_type']) 
        print("writing file to path {}".format(path)) 
        with open(path, 'wb') as f: 
         r.raw.decode_content = True 
         shutil.copyfileobj(r.raw, f) 
else: 
    print(r.text) 

更新[14/10] 我更新与给出一些提示以下方法的代码。为此非常感谢。只有我可以优化我猜的是附件下载,但它现在工作正常。我学到的有趣的事情是,你不能在Windows机器上创建CON文件夹:-)不知道。

我使用熊猫以及试图避免在我的列表中的一些循环,但不知道如果我已经是最高性能。最长实际上是在完整的json文件中阅读。我完全阅读它们,因为我无法找到一个仅告诉API的API方式,只能从2016年9月份返回我的作业.API查询函数似乎在eq/lt/ht上工作。

import requests 
import dateutil.parser 
import shutil 
import os 
import pandas as pd 

user = "" 
passw = "" 

FOLDER = os.getcwd() 
headers = {"Accept-Encoding": "gzip, deflate"} 

import grequests 
urls = [ 
    'https://api.servicem8.com/api_1.0/job.json', 
    'https://api.servicem8.com/api_1.0/Attachment.json', 
    'https://api.servicem8.com/api_1.0/Staff.json', 
    'https://api.servicem8.com/api_1.0/Company.json' 
] 

#Create a set of unsent Requests: 

print("Read json files") 
rs = (grequests.get(u, auth=(user, passw), headers=headers) for u in urls) 
#Send them all at the same time: 
jobs,attachments,staffs,companies = grequests.map(rs) 

#create dataframes 
df_jobs = pd.DataFrame(jobs.json()) 
df_attachments = pd.DataFrame(attachments.json()) 
df_staffs = pd.DataFrame(staffs.json()) 
df_companies = pd.DataFrame(companies.json()) 

#url_customer = "https://api.servicem8.com/api_1.0/Company/{}.json".format(item['company_uuid']) 
#c = requests.get(url_customer, auth=(user, passw)) 

#url = "https://api.servicem8.com/api_1.0/job.json" 
#jobs = requests.get(url, auth=(user, passw), headers=headers) 


#print("Reading attachments json") 
#url = "https://api.servicem8.com/api_1.0/Attachment.json" 
#attachments = requests.get(url, auth=(user, passw), headers=headers) 

#print("Reading staff.json") 
#url_staff = "https://api.servicem8.com/api_1.0/Staff.json" 
#staffs = requests.get(url_staff, auth=(user, passw)) 

scheduled_jobs = [] 

if jobs.status_code == 200: 
    print("finished reading json file") 
    for job in jobs.json(): 
     scheduled_date = job['job_is_scheduled_until_stamp'] 
     try: 
      parsed_date = dateutil.parser.parse(scheduled_date) 
      if parsed_date.year == 2016: 
       if parsed_date.month == 9: 
        cus_name = df_companies[df_companies.uuid == job['company_uuid']].iloc[0]['name'].upper() 
        cus_name = cus_name.replace('/', '') 
        scheduled_jobs.append([job['uuid'], job['generated_job_id'], cus_name]) 

     except ValueError: 
      pass 
    print("{} jobs to fetch".format(len(scheduled_jobs))) 

    for job in scheduled_jobs: 
     print("fetch for job attachments {}".format(job)) 
     #url = "https://api.servicem8.com/api_1.0/Attachment.json?%24filter=related_object_uuid%20eq%20{}".format(job[0]) 

     if attachments == []: 
      pass 
     for attachment in attachments.json(): 
      if attachment['related_object_uuid'] == job[0]: 
       if attachment['active'] == 1 and attachment['file_type'] != '.pdf' and attachment['attachment_source'] != 'INVOICE_SIGNOFF': 
        for staff in staffs.json(): 
         if staff['uuid'] == attachment['created_by_staff_uuid']: 
          tech = "{}_{}".format(
           staff['first'].split()[-1].strip(), staff['last']) 

        creation_timestamp = dateutil.parser.parse(
         attachment['timestamp']) 
        creation_date = creation_timestamp.strftime("%d.%m.%y") 
        creation_time = creation_timestamp.strftime("%H_%M_%S") 

        path = FOLDER + "/{}/{}/SC_-O_D{}_T{}_{}{}".format(
         job[2], job[1], creation_date, creation_time, tech.upper(), attachment['file_type']) 

        # fetch attachment 

        if not os.path.isfile(path): 
         url = "https://api.servicem8.com/api_1.0/Attachment/{}.file".format(attachment[ 
                          'uuid']) 
         r = requests.get(url, auth=(user, passw), stream = True) 
         if r.status_code == 200: 
          if not os.path.exists(FOLDER + "/{}/{}".format(job[2], job[1])): 
           os.makedirs(
            FOLDER + "/{}/{}".format(job[2], job[1])) 

          print("writing file to path {}".format(path)) 
          with open(path, 'wb') as f: 
           r.raw.decode_content = True 
           shutil.copyfileobj(r.raw, f) 
        else: 
         print("file already exists") 
else: 
    print(r.text) 
+0

小心选择一种方法t o因为ServiceM8 API受速率限制,并且太多的同时请求导致“HTTP/1.1 429太多请求” 但是,您可以做的是逐步解析附件链接,而不是将它们作为你走;从它们中构建一个url文件。您可以使用多种方法从文件中同时下载它们。 如果你有这一行:'r = requests.get(url,auth =(user,passw),stream = True)''r.url'响应将包含直接的“https:// data-cdn。 servicem8.com/....“链接,不会受到速率限制。 – hmedia1

+0

其他两个简单的步骤可以大大提高此效率: ** 1。**不要为每个作业uuid调用Attachment API,只需在一个请求中获取整个附件文件并过滤related_object_uuid与您在一次打到的作业uuids ** 2。**一旦成功下载了附件,将附件uuid存储在某个文件或数据库中,并跳过任何已经处理了uuid的迭代 - 即每次运行附件下载程序时,都会快速检索新附件。 – hmedia1

+0

....继续...您目前使用的方法在测试当前是否存在该文件之前,为每个文件运行一个API请求。 – hmedia1

回答

0

总体思路是采用异步URL请求,并有一个名为grequests用于觉得─https://github.com/kennethreitz/grequests

从文档一个Python模块:

import grequests 
urls = [ 
    'http://www.heroku.com', 
    'http://python-tablib.org', 
    'http://httpbin.org', 
    'http://python-requests.org', 
    'http://fakedomain/', 
    'http://kennethreitz.com' 
] 
#Create a set of unsent Requests: 
rs = (grequests.get(u) for u in urls) 
#Send them all at the same time: 
grequests.map(rs) 

而且resopnse

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, None, <Response [200]>]

+0

你可以举一些例子如何从我的版本到grequest版本?因为我需要其他http请求的信息来创建要保存的文件的文件名。 – Koen