0

我使用 dataloader 来推断 kafka 中的数据,但它不起作用

这是我的代码

class kfkdataset(Dataset):
def __init__(self,consumer,image_size):
    super(kfkdataset).__init__()
    self.image_size=image_size
    self.consumer = consumer
def __getitem__(self, index):
    info = json.loads(next(self.consumer).value)
    image_osspath = info['path']
    image = prep_image_batch(image_osspath,self.image_size)
    return image,image_osspath


def __len__(self):
    # You should change 0 to the total size of your dataset.
    return 9000000


consumer = KafkaConsumer('my-topic',bootstrap_servers=[])

prodataset = kfkdataset(consumer,image_size=608)#)
k = DataLoader(prodataset,
        batch_size=batch_size,
        num_workers=16)
for inputimage,osspath in k:

    inputimage = inputimage.to(device)
    detections,_ = model(inputimage)
detections = non_max_suppression(detections, 0.98, 0.4)

当 num_workers 为 1 时有效

当 num_workers >1: 出现错误

文件“batch_upload.py”,第 80 行,用于输入图像,k 中的 osspath:文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py”,第 801 行,in__next__返回 self._process_data(data) 文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py”,第 846 行,in_process_data data.reraise() 文件“/usr/local /lib/python3.6/dist-packages/torch/_utils.py",第 369 行,在 reraise raise self.exc_type(msg) FileExistsError:在 DataLoader 工作进程 1 中捕获 FileExistsError。原始回溯(最后一次调用):文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/worker.py”,第 178 行,在 _worker_loop 数据 = fetcher.fetch(index) 文件中“/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py​​",第 44 行,在 fetch data = [self.dataset[idx] for idx in possible_batched_index] 文件中"/ usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py​​”,第 44 行,在 data = [self.dataset[idx] for idx in possible_batched_index] 文件“/appbatch/ utils/utils.py”,第 49 行,在getitem info = json.loads(next(self.consumer).value) 文件“/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py”,第 1192 行,在下一个 return self.next_v2() File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1200, in next_v2 return next(self._iterator) File "/usr/local /lib/python3.6/dist-packages/kafka/consumer/group.py”,第 1115 行,在 _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) 文件“/usr/local/lib/python3. 6/dist-packages/kafka/consumer/group.py”,第 654 行,在 poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) 文件“/usr/local/lib/python3.6/dist-packages /kafka/consumer/group.py”,第 701 行,在 _poll_once self._client.poll(timeout_ms=timeout_ms) 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,行600,在 poll self._poll(timeout / 1000) 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 629 行,在 _poll self._register_send_sockets() 文件“/usr/local /lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件“/usr/lib/python3.6/ selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key .fd, epoll_events) FileExistsError: [Errno 17] 文件存在_register_send_sockets() 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件中“ /usr/lib/python3.6/selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在_register_send_sockets() 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件中“ /usr/lib/python3.6/selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在数据)文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在数据)文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在

我想知道如何让它工作

4

1 回答 1

0

基本上,num_workers > 1PyTorch 的 DataLoader 中的设置是创建多个工作进程,这些工作进程依次竞标到同一个套接字端口,因为只有一个消费者。

并行化和改进从 Kafka 导入数据的一种方法是在同一消费者组中为该主题创建多个消费者。

于 2020-05-20T09:01:09.793 回答