0

我有 Java TCP 客户端套接字读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这行得通。

有时我也会写信给 OutputStream。命令被转换为单个数据包(byte[])并推送到流中。为此,我使用

public void writeToSocket(byte[] packet) {
    Completable.fromAction(() -> {
         outputStream.write(packet);
         outputStream.flush();
    }).subscribeOn(Schedulers.io()).subscribe(); 
}

现在我想执行

    outputStream.write(packet);
    outputStream.flush();

以符合以下条件的方式

  1. 虽然源数据包是同时从多个位置(使用不同的命令)创建的,但对每个数据包执行上述操作,延迟为 50 毫秒。理想情况下,将数据包排队并延迟执行。
Example:
Place1: createCommand1(), 
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()

有没有办法使用 RxJava 来实现这一点。提前致谢!

4

1 回答 1

1

您可以使用序列化PublishSubject来收集字节,然后用于concatMapCompletable执行写入,然后有延迟:

var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .concatMapCompletable(bytes -> 
       Completable.fromAction(() -> {
           outputStream.write(packet);
           outputStream.flush();
       })
       .subscribeOn(Schedulers.io())
       .andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
   )
   .subscribe();

或者,如果您不介意始终将单个线程专用于发射,则可以只执行 write 和 sleep in doOnNext

var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .observeOn(Schedulers.io())
  .doOnNext(packet -> {
     outputStream.write(packet);
     outputStream.flush();
     Thread.sleep(50);
  })
  .subscribe();
于 2021-04-26T07:02:43.633 回答