0

我正在使用 Java nio 的 WatchService,但我发现它对于以下用例非常不可靠:

  1. 当一个非常大的文件(> 500 megs)被写入正在监视的目录时,如果我依赖 ENTRY_CREATE 事件,该文件通常还没有准备好读取 - 它仍在由另一个线程写入。所以我通常求助于监听 ENTRY_MODIFY。

  2. 但是,当数千个小型 (~2Kb) 文件被复制到监视目录时,其中 80% 的 ENTRY_CREATE 或 ENTRY_MODIFY 不会被调用!

有没有其他人遇到过这个?是否有更好更可靠的库,或者我应该简单地切换到阻塞队列实现,文件复制器将文件名添加到队列中,消费者线程处理文件处理?

围绕 WatchService 实现的代码:


            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.MEDIUM
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    log.info( "Event kind: {} . File affected: {}.", event.kind(), event.context() );
            // Processing the file..
                }
                key.reset();
            }

 
4

1 回答 1

0

根据 DuncG 和 Jim Garrison 的有益评论,我意识到 Watch Service 对每个通知的处理时间很敏感。我正在将 6,416 个文件复制到它正在监视的文件夹中,如果我做的不仅仅是记录 ENTRY_XX 事件,那么许多更新都缺少它。

这对我有用:

  1. 在处理 ENTRY_XX 事件时,我将它写入一个 LMAX Disruptor,它的环形缓冲区大小高于批处理中预期的最大文件(我将其设置为 2^19 即 524,288 个插槽,它足以处理 50k 或更多文件更新而无需阻塞,假设文件将有 10 个监视服务通知)。

[ PS:由于线程同步的延迟,写入一个简单的 ExecutorService 队列没有帮助。在 6,416 个文件名中,我只得到了 1273 个文件名!]。

  1. 我还必须预热环形缓冲区,否则我仍然会错过一些 Watch Service 更新。我正在用测试消息填充其所有插槽,然后将异步事件发送到将 6,416 个文件复制到监视文件夹的函数。

示例代码:

// publishing file names from watch service event to Disrupto ring buffer
 
    private void watchStagingFolder() {
        try {
            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.HIGH
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                log.info( "key found: {}", key );
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    String eventKindStr = event.kind().name();
                    log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
                    if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
                        String fileName = event.context().toString();
                        log.info( "File to be processed: {}", fileName );
                        fileProcessorDisruptorEventProducer.send( fileName );
                    } else {
                        log.info( "Ignoring event kind {}", event.kind() );
                    }
                }
                key.reset();
            }
        } catch ( Exception e ) {
            log.error( "Found error while watching the staging directory.", e );
        }
    }

// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
    public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
    private final CoreProperties coreProperties;
    private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;

    @PostConstruct
    public void init() {
        int bufferSize = coreProperties.getDisruptor().getBufferSize();
        for ( int i = 0; i < bufferSize; i++ ) {
            fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
        }
        log.info( "Warmed up disruptor with {} test messages.", bufferSize );
    }
}

// processing files in the Disruptor consumer/handler
    @Override
    public void onEvent( Msg msg, long l, boolean b ) {
        try {
            if ( count < bufferSize ) {
                log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
                count++;
            } else if ( count == bufferSize ) {
                log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
                newSingleThreadExecutor.submit( () ->
                        applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
                );
                count++;
            } else {
                log.debug( "File: {}", msg.getPayload() );
############ 
// no longer worried about slow processing impacting watch service
                processFile( ( String ) msg.getPayload() );
############
            }
        } catch ( RuntimeException rte ) {
            log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
        }
    }
于 2022-02-06T10:04:22.533 回答