我使用 dataloader 来推断 kafka 中的数据,但它不起作用
这是我的代码
class kfkdataset(Dataset):
def __init__(self,consumer,image_size):
super(kfkdataset).__init__()
self.image_size=image_size
self.consumer = consumer
def __getitem__(self, index):
info = json.loads(next(self.consumer).value)
image_osspath = info['path']
image = prep_image_batch(image_osspath,self.image_size)
return image,image_osspath
def __len__(self):
# You should change 0 to the total size of your dataset.
return 9000000
consumer = KafkaConsumer('my-topic',bootstrap_servers=[])
prodataset = kfkdataset(consumer,image_size=608)#)
k = DataLoader(prodataset,
batch_size=batch_size,
num_workers=16)
for inputimage,osspath in k:
inputimage = inputimage.to(device)
detections,_ = model(inputimage)
detections = non_max_suppression(detections, 0.98, 0.4)
当 num_workers 为 1 时有效
当 num_workers >1: 出现错误
文件“batch_upload.py”,第 80 行,用于输入图像,k 中的 osspath:文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py”,第 801 行,in__next__返回 self._process_data(data) 文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py”,第 846 行,in_process_data data.reraise() 文件“/usr/local /lib/python3.6/dist-packages/torch/_utils.py",第 369 行,在 reraise raise self.exc_type(msg) FileExistsError:在 DataLoader 工作进程 1 中捕获 FileExistsError。原始回溯(最后一次调用):文件“/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/worker.py”,第 178 行,在 _worker_loop 数据 = fetcher.fetch(index) 文件中“/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py",第 44 行,在 fetch data = [self.dataset[idx] for idx in possible_batched_index] 文件中"/ usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py”,第 44 行,在 data = [self.dataset[idx] for idx in possible_batched_index] 文件“/appbatch/ utils/utils.py”,第 49 行,在getitem info = json.loads(next(self.consumer).value) 文件“/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py”,第 1192 行,在下一个 return self.next_v2() File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1200, in next_v2 return next(self._iterator) File "/usr/local /lib/python3.6/dist-packages/kafka/consumer/group.py”,第 1115 行,在 _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) 文件“/usr/local/lib/python3. 6/dist-packages/kafka/consumer/group.py”,第 654 行,在 poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) 文件“/usr/local/lib/python3.6/dist-packages /kafka/consumer/group.py”,第 701 行,在 _poll_once self._client.poll(timeout_ms=timeout_ms) 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,行600,在 poll self._poll(timeout / 1000) 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 629 行,在 _poll self._register_send_sockets() 文件“/usr/local /lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件“/usr/lib/python3.6/ selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key .fd, epoll_events) FileExistsError: [Errno 17] 文件存在_register_send_sockets() 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件中“ /usr/lib/python3.6/selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在_register_send_sockets() 文件“/usr/local/lib/python3.6/dist-packages/kafka/client_async.py”,第 619 行,在 _register_send_sockets self._selector.modify(key.fileobj, events, key.data) 文件中“ /usr/lib/python3.6/selectors.py”,第 261 行,在修改 key = self.register(fileobj, events, data) 文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在数据)文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在数据)文件“/usr/lib/python3.6/selectors.py”,第 412 行,在寄存器 self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] 文件存在
我想知道如何让它工作