2

我正在寻找使用普通 JDK11+ http 客户端读取服务器发送事件的示例,没有额外的依赖项。我在文档中也找不到任何关于 sse的信息。

有什么提示吗?

4

2 回答 2

2

编辑1:有关传入数据格式的信息herehere

编辑 2:更新了代码示例以处理data:协议的一部分。还有event:id:retry:部分(见上面的链接),但我不打算为这些添加处理。

我找不到BodySubscriber做SSE的官员,但写一个并不难。这是一个粗略的 impl(但请注意 TODO):

public class SseSubscriber implements BodySubscriber<Void>
{
    protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );

    protected static String extractMessageData( String[] messageLines )
    {
        var s = new StringBuilder( );
        for ( var line : messageLines )
        {
            var m = dataLinePattern.matcher( line );
            if ( m.matches( ) )
            {
                s.append( m.group( 1 ) );
            }
        }
        return s.toString( );
    }

    protected final Consumer<? super String> messageDataConsumer;
    protected final CompletableFuture<Void> future;
    protected volatile Subscription subscription;
    protected volatile String deferredText;

    public SseSubscriber( Consumer<? super String> messageDataConsumer )
    {
        this.messageDataConsumer = messageDataConsumer;
        this.future = new CompletableFuture<>( );
        this.subscription = null;
        this.deferredText = null;
    }

    @Override
    public void onSubscribe( Subscription subscription )
    {
        this.subscription = subscription;
        try
        {
            this.deferredText = "";
            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onNext( List<ByteBuffer> buffers )
    {
        try
        {
            // Volatile read
            var deferredText = this.deferredText;

            for ( var buffer : buffers )
            {
                // TODO: Safe to assume multi-byte chars don't get split across buffers?
                var s = deferredText + UTF_8.decode( buffer );

                // -1 means don't discard trailing empty tokens ... so the final token will
                // be whatever is left after the last \n\n (possibly the empty string, but
                // not necessarily), which is the part we need to defer until the next loop
                // iteration
                var tokens = s.split( "\n\n", -1 );

                // Final token gets deferred, not processed here
                for ( var i = 0; i < tokens.length - 1; i++ )
                {
                    var message = tokens[ i ];
                    var lines = message.split( "\n" );
                    var data = extractMessageData( lines );
                    this.messageDataConsumer.accept( data );
                    // TODO: Handle lines that start with "event:", "id:", "retry:"
                }

                // Defer the final token
                deferredText = tokens[ tokens.length - 1 ];
            }

            // Volatile write
            this.deferredText = deferredText;

            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onError( Throwable e )
    {
        this.future.completeExceptionally( e );
    }

    @Override
    public void onComplete( )
    {
        try
        {
            this.future.complete( null );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
        }
    }

    @Override
    public CompletionStage<Void> getBody( )
    {
        return this.future;
    }
}

然后使用它:

var req = HttpRequest.newBuilder( )
                     .GET( )
                     .uri( new URI( "http://service/path/to/events" )
                     .setHeader( "Accept", "text/event-stream" )
                     .build( );

this.client.sendAsync( req, respInfo ->
{
    if ( respInfo.statusCode( ) == 200 )
    {
        return new SseSubscriber( messageData ->
        {
            // TODO: Handle messageData
        } );
    }
    else
    {
        throw new RuntimeException( "Request failed" );
    }
} );
于 2020-09-09T16:50:39.810 回答
1

此处基于 Java 11 的 SSE(服务器发送事件)客户端实现:
SSE 客户端

它提供了一个非常简单的 SSE 消息处理用法。

示例用法:

EventHandler eventHandler = eventText -> { process(eventText); };
        SSEClient sseClient = 
SSEClient sseClient = SSEClient.builder().url(url).eventHandler(eventHandler)
    .build();
sseClient.start();

注意:我是这个 SSE 客户端的作者。

于 2021-02-09T17:28:58.410 回答