根据 DuncG 和 Jim Garrison 的有益评论,我意识到 Watch Service 对每个通知的处理时间很敏感。我正在将 6,416 个文件复制到它正在监视的文件夹中,如果我做的不仅仅是记录 ENTRY_XX 事件,那么许多更新都缺少它。
这对我有用:
- 在处理 ENTRY_XX 事件时,我将它写入一个 LMAX Disruptor,它的环形缓冲区大小高于批处理中预期的最大文件(我将其设置为 2^19 即 524,288 个插槽,它足以处理 50k 或更多文件更新而无需阻塞,假设文件将有 10 个监视服务通知)。
[ PS:由于线程同步的延迟,写入一个简单的 ExecutorService 队列没有帮助。在 6,416 个文件名中,我只得到了 1273 个文件名!]。
- 我还必须预热环形缓冲区,否则我仍然会错过一些 Watch Service 更新。我正在用测试消息填充其所有插槽,然后将异步事件发送到将 6,416 个文件复制到监视文件夹的函数。
示例代码:
// publishing file names from watch service event to Disrupto ring buffer
private void watchStagingFolder() {
try {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get( coreProperties.getStagingLocation() );
path.register( watchService,
new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
SensitivityWatchEventModifier.HIGH
);
WatchKey key;
while ( ( key = watchService.take() ) != null ) {
log.info( "key found: {}", key );
for ( WatchEvent<?> event : key.pollEvents() ) {
String eventKindStr = event.kind().name();
log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
String fileName = event.context().toString();
log.info( "File to be processed: {}", fileName );
fileProcessorDisruptorEventProducer.send( fileName );
} else {
log.info( "Ignoring event kind {}", event.kind() );
}
}
key.reset();
}
} catch ( Exception e ) {
log.error( "Found error while watching the staging directory.", e );
}
}
// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
private final CoreProperties coreProperties;
private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;
@PostConstruct
public void init() {
int bufferSize = coreProperties.getDisruptor().getBufferSize();
for ( int i = 0; i < bufferSize; i++ ) {
fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
}
log.info( "Warmed up disruptor with {} test messages.", bufferSize );
}
}
// processing files in the Disruptor consumer/handler
@Override
public void onEvent( Msg msg, long l, boolean b ) {
try {
if ( count < bufferSize ) {
log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
count++;
} else if ( count == bufferSize ) {
log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
newSingleThreadExecutor.submit( () ->
applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
);
count++;
} else {
log.debug( "File: {}", msg.getPayload() );
############
// no longer worried about slow processing impacting watch service
processFile( ( String ) msg.getPayload() );
############
}
} catch ( RuntimeException rte ) {
log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
}
}