0

我正在使用 DGL 库自定义邻域采样器。但是,当我在初始化 NodeDataLoader 时将参数“num_workers”设置为非零数字时,采样器无法工作。例如:

my_sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
my_dataloader = dgl.dataloading.NodeDataLoader(
                g=G,
                nids=[4],
                block_sampler=my_sampler,
                device='cpu',
                batch_size=1,
                shuffle=True,
                drop_last=False,
                num_workers=1
            )
for step, (input_nodes, seeds, blocks) in enumerate(my_dataloader):
    ...

上面的代码运行良好,我在“dgl.dataloading”中使用了 MultiLayerFullNeighborSampler。请注意,无论参数“num_workers”是否设置为非零,它都有效。

class TestSampler(dgl.dataloading.MultiLayerNeighborSampler):
    def __init__(self, n_layers, return_eids=False):
        super().__init__([None] * n_layers, return_eids=return_eids)

my_sampler = TestSampler(2)
my_dataloader = dgl.dataloading.NodeDataLoader(
                g=G,
                nids=[4],
                block_sampler=my_sampler,
                device='cpu',
                batch_size=1,
                shuffle=True,
                drop_last=False,
                num_workers=0             # here
            )
for step, (input_nodes, seeds, blocks) in enumerate(my_dataloader):
    ...

上面的代码也运行良好。TestSampler 类的实现完全复制自 MultiLayerFullNeighborSampler 的实现。但是,当我将参数“num_workers”设置为非零时:

my_sampler = TestSampler(2)
#my_sampler = dgl.dataloading.MultiLayerNeighborSampler([None] * 2)
#my_sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
my_dataloader = dgl.dataloading.NodeDataLoader(
                g=G,
                nids=[4],
                block_sampler=my_sampler,
                device='cpu',
                batch_size=1,
                shuffle=True,
                drop_last=False,
                num_workers=1             # here
            )
for step, (input_nodes, seeds, blocks) in enumerate(my_dataloader):
   ...

上面的代码在枚举时失败。错误消息是关于多处理的:

---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
~\anaconda3\lib\site-packages\torch\utils\data\dataloader.py in _try_get_data(self, timeout)
    989         try:
--> 990             data = self._data_queue.get(timeout=timeout)
    991             return (True, data)

~\anaconda3\lib\multiprocessing\queues.py in get(self, block, timeout)
    107                     if not self._poll(timeout):
--> 108                         raise Empty
    109                 elif not self._poll():

Empty: 

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
<ipython-input-15-6cf22217dc6e> in <module>
     13                 num_workers=1
     14             )
---> 15 for step, (input_nodes, seeds, blocks) in enumerate(my_dataloader):
     16     print('fuck')
     17     print(input_nodes)

~\anaconda3\lib\site-packages\dgl\dataloading\pytorch\dataloader.py in __next__(self)
    320     def __next__(self):
    321         # input_nodes, output_nodes, blocks
--> 322         result_ = next(self.iter_)
    323         _restore_blocks_storage(result_[-1], self.node_dataloader.collator.g)
    324 

~\anaconda3\lib\site-packages\torch\utils\data\dataloader.py in __next__(self)
    519             if self._sampler_iter is None:
    520                 self._reset()
--> 521             data = self._next_data()
    522             self._num_yielded += 1
    523             if self._dataset_kind == _DatasetKind.Iterable and \

~\anaconda3\lib\site-packages\torch\utils\data\dataloader.py in _next_data(self)
   1184 
   1185             assert not self._shutdown and self._tasks_outstanding > 0
-> 1186             idx, data = self._get_data()
   1187             self._tasks_outstanding -= 1
   1188             if self._dataset_kind == _DatasetKind.Iterable:

~\anaconda3\lib\site-packages\torch\utils\data\dataloader.py in _get_data(self)
   1150         else:
   1151             while True:
-> 1152                 success, data = self._try_get_data()
   1153                 if success:
   1154                     return data

~\anaconda3\lib\site-packages\torch\utils\data\dataloader.py in _try_get_data(self, timeout)
   1001             if len(failed_workers) > 0:
   1002                 pids_str = ', '.join(str(w.pid) for w in failed_workers)
-> 1003                 raise RuntimeError('DataLoader worker (pid(s) {}) exited unexpectedly'.format(pids_str)) from e
   1004             if isinstance(e, queue.Empty):
   1005                 return (False, None)

RuntimeError: DataLoader worker (pid(s) 3032) exited unexpectedly

查了一下资料,觉得可能和windows系统的multiprocessing有关。但是,它不能解释为什么 MultiLayerFullNeighborSampler 类可以运行良好。我想知道如何正确修复它。

软件版本:

Python:Python 3.8.8(默认,2021 年 4 月 13 日,15:08:03)[MSC v.1916 64 位(AMD64)] :: Ana conda, Inc. on win32

Pytorch:1.9.1 py3.8_cuda10.2_cudnn7_0

dgl-cuda10.2:0.7.1

4

0 回答 0