2

我得到了一个可以完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果或我的代码有什么问题?

这是我在 python 中的 Kafka 消费者代码:

import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta  
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]
    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
    handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
    kafka = KafkaClient('localhost')
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
    while True:
        messages = consumer.get_messages(count=16)
        if len(messages) == 0:
            print 'IDLE'
            continue
        for message in messages:
            try:
                response = json.loads(message.value)
                data = json.loads(response['body'])
                print response['body']
                articles = data['articles']
                idlist = [r['id'] for r in articles]
                if len(idlist)>0:
                    article_rows = session.execute(article_lookup_stmt,[idlist])
                    rows = [r.id for r in article_rows]
                    for article in articles:
                        try:
                            if not article['id'] in rows:
                                article['created_at'] = parse(article['created_at'])
                                scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
                                session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
                                session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
                                session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
                                session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
                                log.debug('%s %s' % (article['id'],article['created_at']))
                            session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
                        except Exception as e:
                            print 'error==============:',e
                            continue
            except Exception as e:
                print 'error is:',e
                log.exception(e.message)
except Exception as e:
    log.exception(e.message)

编辑:

我还添加了我的个人资料结果,代码行似乎很慢

    article_rows = session.execute(article_lookup_stmt,[idlist])

Sun Feb 14 16:01:01 2016    consumer.out

         395793 function calls (394232 primitive calls) in 23.074 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      141   10.695    0.076   10.695    0.076 {select.select}
     7564   10.144    0.001   10.144    0.001 {method 'acquire' of 'thread.lock' objects}
        1    0.542    0.542   23.097   23.097 consumer.py:5(<module>)
     1510    0.281    0.000    0.281    0.000 {method 'recv' of '_socket.socket' objects}
       38    0.195    0.005    0.195    0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
       13    0.078    0.006    0.078    0.006 {time.sleep}
     2423    0.073    0.000    0.137    0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
    22112    0.063    0.000    0.095    0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
        3    0.052    0.017    0.162    0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005    0.047    0.000    0.055    0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
     1270    0.032    0.000    0.034    0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
        3    0.024    0.008    0.226    0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
       33    0.024    0.001    0.031    0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
    15374    0.024    0.000    0.024    0.000 {built-in method new of type object at 0x788ee0}
      141    0.023    0.000   11.394    0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
      288    0.020    0.000    0.522    0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
     2423    0.018    0.000    0.029    0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
      115    0.018    0.000   11.372    0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
     2423    0.018    0.000    0.059    0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
    24548    0.017    0.000    0.017    0.000 {_struct.unpack}
44228/43959    0.016    0.000    0.016    0.000 {len}

感谢你的回复。

4

1 回答 1

2

您可以尝试在不保存到 C* 的情况下运行消费者,这样您就可以观察它有多大的不同。
如果事实证明保存到 C* 是一个瓶颈(我假设是这样),那么您可能有一个线程池(大于 16 个线程您的消费者产生),其唯一职责是写入 C*。

这样,您将卸载代码的慢速部分,这将只在消费者代码中留下琐碎的部分。
你可以使用一个from multiprocessing import Pool.
更多在这里

于 2016-02-14T16:48:28.787 回答