0

我正在尝试收听来自聚会 open_events 流 API 的源源不断的流。出于某种原因,我在(似乎是)一段随机时间后得到了 socketTimeoutException 。我的代码适用于 meetups rsvps 流 API。我正在使用 okio.BufferedSource,如下所示。我得到了“while(!source.exhausted())”的异常,但是当写“while(true)”时,却在下面的行中给出了异常。据我所知,rsvps 和 open_events 的区别在于 rsvps 使用长轮询并返回一个数组,而 open_events 使用分块传输编码来维持持久连接。问题是我不能将 BufferedSource 用于分块响应,然后我应该使用什么来代替?

    public static Observable<String> events(BufferedSource source) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {

                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
            } catch (IOException e) {
                e.printStackTrace();
                subscriber.onError(e);
            }
            subscriber.onCompleted();
        }
    });

完整的堆栈跟踪:

java.net.SocketTimeoutException: timeout
at okio.Okio$3.newTimeoutException(Okio.java:207)
at okio.AsyncTimeout.exit(AsyncTimeout.java:261)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at  retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at okio.Okio$2.read(Okio.java:139)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
... 37 more
Exception in thread "RxNewThreadScheduler-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorNotImplementedException: timeout
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:812)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:128)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
... 7 more
Caused by: java.net.SocketTimeoutException: timeout
at okio.Okio$3.newTimeoutException(Okio.java:207)
at okio.AsyncTimeout.exit(AsyncTimeout.java:261)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123)
... 27 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at okio.Okio$2.read(Okio.java:139)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
... 37 more
4

1 回答 1

0

这可能不相关,但在错误堆栈的更下方提到了Add onError handling. 也许您需要将此方法添加到您的实现中?

于 2017-04-05T10:15:17.590 回答