2

首先我在python和kafka中做婴儿步骤,所以假设我有一个listA = [item1,item2,item3]并且listA的每个项目都是producer一个主题。现在我想要的是动态地将项目添加/删除到 listA 并立即成为生产者,而且每个项目都应该在它自己的线程上运行,因为它们应该是独立的。

所以基本上我正在尝试扩展应用程序。

到目前为止,我尝试对每个生产者项目进行硬编码并在其自己的终端中运行它

每一个项目

from pykafka import KafkaClient
import json
from datetime import datetime
import uuid
import time


input_file = open('./data/item1.json')
json_array = json.load(input_file)
coordinates = json_array['features'][0]['geometry']['coordinates']

# Generate uuid


def generate_uuid():
    return uuid.uuid4()


# Kafaka producer
client = KafkaClient(hosts="localhost:9092")

topic = client.topics['test_kafka2']
producer = topic.get_sync_producer()


# Generate all coordinates
def generate_coordinates(coordinates):
    # new_coordinates = []
    i = 0
    while i < len(coordinates):
        data = {}
        data['class'] = 201
        data['key'] = str(data['class']) + '_' + str(generate_uuid())
        data['time_stamp'] = str(datetime.utcnow())
        data['longitude'] = coordinates[i][0]
        data['latitude'] = coordinates[i][1]
        message = json.dumps(data)
        producer.produce(message.encode('ascii'))
        time.sleep(1)

        # If item reaches last coordinaates
        if i == len(coordinates)-1:
            coordinates = coordinates[::-1]
            i = 0
        else:
            i += 1
    # return new_coordinates


generate_coordinates(coordinates)
4

0 回答 0