我正在将来自供应商的消息作为数据流处理,并希望将 msgSeqNum 本地存储在本地文件中。原因:
他们发送 msgSeqNum 来唯一标识每条消息。它们提供“同步和流”功能,以便在从给定序列号重新连接时流式传输消息。假设如果 msgSeqNum 从 1 开始并且我的连接在 msgSeqNum 50 时断开并错过了接下来的 100 条消息(供应商服务器的当前 msgSeqNum 现在是 150),那么当我重新连接到供应商时,我需要调用“同步和流”使用 msgSeqNum=50 获取错过的 100 条消息。
所以我想了解如何在本地保存 msgSeqNum 以便快速访问。我假设
1)由于读/写频繁发生,即在处理每条消息时(读取忽略重复,在处理完消息后写入更新msgSeqNum),我认为最好使用Java NIO的'MappedByteBuffer'?
2)有人可以确认下面的代码是否最适合我公开映射的字节缓冲区对象以用于读取和写入,并在进程的生命周期内保持 FileChannel 处于打开状态?下面的示例 Junit 代码:
我知道这可以通过一般的 Java 文件操作来读取和写入文件来实现,但我需要一些快速的东西,这相当于非 IO,因为我使用的是单个写入器模式,并且希望快速处理这些消息。 -阻塞方式。
private FileChannel fileChannel = null;
private MappedByteBuffer mappedByteBuffer = null;
private Charset utf8Charset = null;
private CharBuffer charBuffer = null;
@Before
public void setup() {
try {
charBuffer = CharBuffer.allocate( 24 ); // Long max/min are till 20 bytes anyway
System.out.println( "charBuffer length: " + charBuffer.length() );
Path pathToWrite = getFileURIFromResources();
FileChannel fileChannel = (FileChannel) Files
.newByteChannel( pathToWrite, EnumSet.of(
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING ));
mappedByteBuffer = fileChannel
.map( FileChannel.MapMode.READ_WRITE, 0, charBuffer.length() );
utf8Charset = Charset.forName( "utf-8" );
//charBuffer = CharBuffer.allocate( 8 );
} catch ( Exception e ) {
// handle it
}
}
@After
public void destroy() {
try {
fileChannel.close();
} catch ( IOException e ) {
// handle it
}
}
@Test
public void testWriteAndReadUsingSharedMappedByteBuffer() {
if ( mappedByteBuffer != null ) {
mappedByteBuffer.put( utf8Charset.encode( charBuffer.wrap( "101" ) )); // TODO improve this and try reusing the same buffer instead of creating a new one
} else {
System.out.println( "mappedByteBuffer null" );
fail();
}
mappedByteBuffer.flip();
assertEquals( "101", utf8Charset.decode(mappedByteBuffer).toString() );
}