2

我得到了一个能够完全抓取的数据流。数据全部投入Kafka,然后发送给Cassandra。现在卡夫卡消费者非常缓慢,比制片人慢得多。我希望他们完全一样。我能做些什么来实现这个结果或者我的代码出了什么问题?为什么我的卡夫卡消费者比我的卡夫卡生产者慢得多?

这里是蟒蛇我卡夫卡消费者代码:

import logging 
from cassandra.cluster import Cluster 
from kafka.consumer.kafka import KafkaConsumer 
from kafka.consumer.multiprocess import MultiProcessConsumer 
from kafka.client import KafkaClient 
from kafka.producer.simple import SimpleProducer 
import json 
from datetime import datetime, timedelta 
from cassandra import ConsistencyLevel 
from dateutil.parser import parse 
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG) 
class Whitelist(logging.Filter): 
    def __init__(self, *whitelist): 
     self.whitelist = [logging.Filter(name) for name in whitelist] 
    def filter(self, record): 
     return any(f.filter(record) for f in self.whitelist) 
for handler in logging.root.handlers: 
    handler.addFilter(Whitelist('consumer')) 
log = logging.getLogger('consumer') 
try: 
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace) 
    kafka = KafkaClient('localhost') 
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None) 
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?") 
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM 
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") 
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)") 
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)") 
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)") 
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)") 
    while True: 
     messages = consumer.get_messages(count=16) 
     if len(messages) == 0: 
      print 'IDLE' 
      continue 
     for message in messages: 
      try: 
       response = json.loads(message.value) 
       data = json.loads(response['body']) 
       print response['body'] 
       articles = data['articles'] 
       idlist = [r['id'] for r in articles] 
       if len(idlist)>0: 
        article_rows = session.execute(article_lookup_stmt,[idlist]) 
        rows = [r.id for r in article_rows] 
        for article in articles: 
         try: 
          if not article['id'] in rows: 
           article['created_at'] = parse(article['created_at']) 
           scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0) 
           session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre'])) 
           session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id'])) 
           session.execute(article_by_url_insert_stmt, (article['url'], article['id'])) 
           session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id'])) 
           log.debug('%s %s' % (article['id'],article['created_at'])) 
          session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares'])) 
         except Exception as e: 
          print 'error==============:',e 
          continue 
      except Exception as e: 
       print 'error is:',e 
       log.exception(e.message) 
except Exception as e: 
    log.exception(e.message) 

编辑:

我还添加了我的个人资料结果与上面的代码慢行似乎是

article_rows = session.execute(article_lookup_stmt,[idlist]) 

Sun Feb 14 16:01:01 2016 consumer.out 

     395793 function calls (394232 primitive calls) in 23.074 seconds 

    Ordered by: internal time 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     141 10.695 0.076 10.695 0.076 {select.select} 
    7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects} 
     1 0.542 0.542 23.097 23.097 consumer.py:5(<module>) 
    1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects} 
     38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode) 
     13 0.078 0.006 0.078 0.006 {time.sleep} 
    2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__) 
    22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack) 
     3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response) 
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan) 
    1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__) 
     3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics) 
     33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple) 
    15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0} 
     141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request) 
     288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes) 
    2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller) 
     115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages) 
    2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers) 
    24548 0.017 0.000 0.017 0.000 {_struct.unpack} 
44228/43959 0.016 0.000 0.016 0.000 {len} 

谢谢期待你的答复。

+1

正如目前所述,您的问题缺乏正确答案所需的详细信息。使用分析器查明脚本的哪些部分很慢,然后尝试重写这些部分以使其更快。有关更多详细信息,请参见https://docs.python.org/2/library/profile.html。 – liori

+0

我的脚本缓慢的部分是消息后的消息。 – peter

+1

你的消费者问题5 cassandra查询 - 没有迹象表明你的用户做了什么,但似乎5个同步CQL查询可能需要比平凡的生产者花费更长的时间。 –

回答

2

您可以尝试运行消费者而不保存到C *,因此您可以观察它产生的差异。
如果事实证明保存到C *是一个阻塞点(我假设它是这样),那么您可以拥有一个线程池(大于16个线程用户生成线程),其唯一责任是写入C *。

这样,您就可以卸载代码的慢速部分,这只会在消费者代码中留下微不足道的部分。
您可以使用from multiprocessing import Pool
更多here

+0

谢谢!如果我已经这样做了,但它仍然很慢?你是正确的写给卡桑德拉的是瓶颈。但不知道数据增加时为什么这么慢。 – peter

+1

如果你正在使用你发布的代码,那么你还没有这样做:)但是如果你这样做,并且你注意到无论你的线程池中有多少线程,你仍然处于大致相同的吞吐量,那么你必须调整C *。由于我们知道Cassandra最显着的特点是始终能够接受无限数量的数据(如horz。scale增加),这应该是可行的,您应该将此问题发布到C *标签。 –

+0

非常感谢。对于很多后续问题抱歉,我只有一个:)如果我已经使用多进程使用者,为什么我仍然需要使用多线程?谢谢 – peter