2

我正在尝试不断消耗来自 kafka 的事件。同一个应用程序也使用这些消耗的数据,以 n 秒的间隔(假设 n = 60 秒)执行一些分析和更新数据库。

在同一个应用程序中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.

process1 is to be run continuously
process2 is to be executed once every n=60 seconds 

process2与计算和数据库更新有关,因此需要 5-10 秒来执行。我不想在执行process1期间停顿process2。因此,我正在使用multiprocessing moduleprocess1,process2如果thread1,thread2我在 python 中使用该Threading模块,但由于我已经阅读了有关 GIL 的内容并且该Threading模块无法利用多核架构,我决定使用该multiprocessing模块。)来实现在这种情况下并发。(如果我对上面提到的模块限制的理解GILThreading正确,我很抱歉,请随时纠正我)。

我拥有的应用程序在两个进程之间进行了相当简单的交互,其中process1仅在 60 秒内用它收到的所有消息填充队列,并在 60 秒结束时将所有消息传输到process2.

我在使用此传输逻辑时遇到问题。如何在 60 秒结束时将队列的内容从转移process1process2(我猜这将是主进程或另一个进程?这是我的另一个问题,除了主进程之外,我是否应该实例化 2 个进程?)随后清除队列内容,以便在另一个迭代中重新开始。

到目前为止,我有以下内容:

import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue

def kafka_init():
    client=KafkaClient('kafka1.wpit.nile.works')
    consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
    return consumer

def consumeMessages(q):
    print "thread started"
    while not q.empty():
        try:
            print q.get(True,1)
        Queue.Empty:
            break
    print "thread ended"
if __name__=="__main__":
    starttime=time.time()
    timeout=starttime+ 10 #timeout of read in seconds
    consumer=kafka_init()
    q=Queue()
    p=Process(target=consumeMessages,args=q)
    while(True):
        q.put(consumer.get_message())
        if time.time()>timeout:
            #transfer logic from process1 to main process here.
            print "Start time",starttime
            print "End time",time.time()
            p.start()
            p.join()
            break

任何帮助将非常感激。

4

1 回答 1

3

您正在处理的问题不是特定于 kafka 的,所以我将使用简单的整数的通用“消息”。

在我看来,主要问题是一方面您希望在消息生成后立即处理它们,另一方面只想每 60 秒更新一次数据库。

如果使用q.get(),默认情况下此方法调用将阻塞,直到队列中有可用消息。这可能需要超过 60 秒,这会使数据库更新延迟太久。所以我们不能使用阻塞q.get。我们需要使用q.get超时,以便调用是非阻塞的:

import time
import multiprocessing as mp
import random
import Queue

def process_messages(q):
    messages = []
    start = time.time()
    while True:
        try:
            message = q.get(timeout=1)
        except Queue.Empty:
            pass
        else:
            messages.append(message)
            print('Doing data analysis on {}'.format(message))
        end = time.time()
        if end-start > 60:
            print('Updating database: {}'.format(messages))
            start = end
            messages = []

def get_messages(q):
    while True:
        time.sleep(random.uniform(0,5))
        message = random.randrange(100)
        q.put(message)

if __name__ == "__main__":
    q = mp.Queue()

    proc1 = mp.Process(target=get_messages, args=[q])
    proc1.start()

    proc2 = mp.Process(target=process_messages, args=[q])
    proc2.start()

    proc1.join()
    proc2.join()

产生输出,例如:

Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]
于 2015-08-30T21:18:47.780 回答