首先我在python和kafka中做婴儿步骤,所以假设我有一个listA = [item1,item2,item3]并且listA的每个项目都是producer
一个主题。现在我想要的是动态地将项目添加/删除到 listA 并立即成为生产者,而且每个项目都应该在它自己的线程上运行,因为它们应该是独立的。
所以基本上我正在尝试扩展应用程序。
到目前为止,我尝试对每个生产者项目进行硬编码并在其自己的终端中运行它
每一个项目
from pykafka import KafkaClient
import json
from datetime import datetime
import uuid
import time
input_file = open('./data/item1.json')
json_array = json.load(input_file)
coordinates = json_array['features'][0]['geometry']['coordinates']
# Generate uuid
def generate_uuid():
return uuid.uuid4()
# Kafaka producer
client = KafkaClient(hosts="localhost:9092")
topic = client.topics['test_kafka2']
producer = topic.get_sync_producer()
# Generate all coordinates
def generate_coordinates(coordinates):
# new_coordinates = []
i = 0
while i < len(coordinates):
data = {}
data['class'] = 201
data['key'] = str(data['class']) + '_' + str(generate_uuid())
data['time_stamp'] = str(datetime.utcnow())
data['longitude'] = coordinates[i][0]
data['latitude'] = coordinates[i][1]
message = json.dumps(data)
producer.produce(message.encode('ascii'))
time.sleep(1)
# If item reaches last coordinaates
if i == len(coordinates)-1:
coordinates = coordinates[::-1]
i = 0
else:
i += 1
# return new_coordinates
generate_coordinates(coordinates)