我正在寻找使用普通 JDK11+ http 客户端读取服务器发送事件的示例,没有额外的依赖项。我在文档中也找不到任何关于 sse的信息。
有什么提示吗?
我正在寻找使用普通 JDK11+ http 客户端读取服务器发送事件的示例,没有额外的依赖项。我在文档中也找不到任何关于 sse的信息。
有什么提示吗?
编辑 2:更新了代码示例以处理data:
协议的一部分。还有event:
、id:
和retry:
部分(见上面的链接),但我不打算为这些添加处理。
我找不到BodySubscriber
做SSE的官员,但写一个并不难。这是一个粗略的 impl(但请注意 TODO):
public class SseSubscriber implements BodySubscriber<Void>
{
protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );
protected static String extractMessageData( String[] messageLines )
{
var s = new StringBuilder( );
for ( var line : messageLines )
{
var m = dataLinePattern.matcher( line );
if ( m.matches( ) )
{
s.append( m.group( 1 ) );
}
}
return s.toString( );
}
protected final Consumer<? super String> messageDataConsumer;
protected final CompletableFuture<Void> future;
protected volatile Subscription subscription;
protected volatile String deferredText;
public SseSubscriber( Consumer<? super String> messageDataConsumer )
{
this.messageDataConsumer = messageDataConsumer;
this.future = new CompletableFuture<>( );
this.subscription = null;
this.deferredText = null;
}
@Override
public void onSubscribe( Subscription subscription )
{
this.subscription = subscription;
try
{
this.deferredText = "";
this.subscription.request( 1 );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
this.subscription.cancel( );
}
}
@Override
public void onNext( List<ByteBuffer> buffers )
{
try
{
// Volatile read
var deferredText = this.deferredText;
for ( var buffer : buffers )
{
// TODO: Safe to assume multi-byte chars don't get split across buffers?
var s = deferredText + UTF_8.decode( buffer );
// -1 means don't discard trailing empty tokens ... so the final token will
// be whatever is left after the last \n\n (possibly the empty string, but
// not necessarily), which is the part we need to defer until the next loop
// iteration
var tokens = s.split( "\n\n", -1 );
// Final token gets deferred, not processed here
for ( var i = 0; i < tokens.length - 1; i++ )
{
var message = tokens[ i ];
var lines = message.split( "\n" );
var data = extractMessageData( lines );
this.messageDataConsumer.accept( data );
// TODO: Handle lines that start with "event:", "id:", "retry:"
}
// Defer the final token
deferredText = tokens[ tokens.length - 1 ];
}
// Volatile write
this.deferredText = deferredText;
this.subscription.request( 1 );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
this.subscription.cancel( );
}
}
@Override
public void onError( Throwable e )
{
this.future.completeExceptionally( e );
}
@Override
public void onComplete( )
{
try
{
this.future.complete( null );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
}
}
@Override
public CompletionStage<Void> getBody( )
{
return this.future;
}
}
然后使用它:
var req = HttpRequest.newBuilder( )
.GET( )
.uri( new URI( "http://service/path/to/events" )
.setHeader( "Accept", "text/event-stream" )
.build( );
this.client.sendAsync( req, respInfo ->
{
if ( respInfo.statusCode( ) == 200 )
{
return new SseSubscriber( messageData ->
{
// TODO: Handle messageData
} );
}
else
{
throw new RuntimeException( "Request failed" );
}
} );
此处基于 Java 11 的 SSE(服务器发送事件)客户端实现:
SSE 客户端
它提供了一个非常简单的 SSE 消息处理用法。
示例用法:
EventHandler eventHandler = eventText -> { process(eventText); };
SSEClient sseClient =
SSEClient sseClient = SSEClient.builder().url(url).eventHandler(eventHandler)
.build();
sseClient.start();
注意:我是这个 SSE 客户端的作者。