我正在尝试不断消耗来自 kafka 的事件。同一个应用程序也使用这些消耗的数据,以 n 秒的间隔(假设 n = 60 秒)执行一些分析和更新数据库。
在同一个应用程序中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
与计算和数据库更新有关,因此需要 5-10 秒来执行。我不想在执行process1
期间停顿process2
。因此,我正在使用multiprocessing module
(process1,process2
如果thread1,thread2
我在 python 中使用该Threading
模块,但由于我已经阅读了有关 GIL 的内容并且该Threading
模块无法利用多核架构,我决定使用该multiprocessing
模块。)来实现在这种情况下并发。(如果我对上面提到的模块限制的理解GIL
不Threading
正确,我很抱歉,请随时纠正我)。
我拥有的应用程序在两个进程之间进行了相当简单的交互,其中process1
仅在 60 秒内用它收到的所有消息填充队列,并在 60 秒结束时将所有消息传输到process2
.
我在使用此传输逻辑时遇到问题。如何在 60 秒结束时将队列的内容从转移process1
到process2
(我猜这将是主进程或另一个进程?这是我的另一个问题,除了主进程之外,我是否应该实例化 2 个进程?)随后清除队列内容,以便在另一个迭代中重新开始。
到目前为止,我有以下内容:
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
任何帮助将非常感激。