假设我有一个 aobservable,它发出 2D 点,表示某些路线的转折点
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
这意味着机器人从原点开始(0,0)
,然后到矩形的左下角,然后是右下角,然后返回,最后回到原点。
假设现在我想要一个 observable,它提供的点就像机器人以恒定的速度移动一样,比如每秒 1 个单位。
因此,第一个操作员按原样接收(0,0)
并重新传输它。
然后它接收(0, 10)
并看到,到该点的距离是单位,到达该点10
需要几秒钟。10
所以,新的 observable 应该发出
(0, 1)
(0, 2)
(0, 3)
(0, 4)
(0, 5)
(0, 6)
(0, 7)
(0, 8)
(0, 9)
(0, 10)
每秒一对,直到最后一个接收点到达并重新传输。然后操作员应该采取下一点并对其进行同样的操作。
如何使用 ReactiveX 实现这一点?
我怀疑我们应该在这里实施“bakpressure”来减慢可观察到的源,直到结果及时重新发射所有东西?
更新
我写了下面的代码,它可以工作。它使用两个flatMap
s。一个是提供额外的点,并且每个点都带有时间戳(以秒为单位),另一个flatMap
是引入适合时间戳的延迟。
该代码有效,但看起来很复杂。问题仍然存在:ReactiveX 库是否包含用于这些事情的现成工具:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class EmitByTime {
public static double speed = 1;
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
public static Observable<double[]> routeFeeder() {
return routePoints()
.flatMap(new Func1<double[], Observable<double[]>>() {
double[] previous = null;
@Override
public Observable<double[]> call(double[] doubles) {
if( previous == null) {
previous = new double[] {doubles[0], doubles[1], 0};
return Observable.just(previous);
}
else {
double dx = doubles[0] - previous[0];
double dy = doubles[1] - previous[1];
double dist = Math.sqrt( dx*dx + dy*dy );
dx /= dist;
dy /= dist;
double vx = dx * speed;
double vy = dy * speed;
double period = dist / speed;
double end = previous[2] + period;
double start = Math.ceil(previous[2] + 1);
ArrayList<double[]> sequence = new ArrayList<>();
for(double t = start; t < end; t+=1) {
double tt = (t - previous[2]);
double x = previous[0] + tt * vx;
double y = previous[1] + tt * vy;
sequence.add(new double[] {x, y, t});
}
previous = new double[] {doubles[0], doubles[1], end};
sequence.add(previous);
return Observable.from(sequence);
}
}
})
.flatMap(new Func1<double[], Observable<double[]>>() {
long origin = System.currentTimeMillis();
@Override
public Observable<double[]> call(double[] doubles) {
long now = System.currentTimeMillis();
long due = Math.round(doubles[2]*1000) + origin;
if( due <= now ) {
return Observable.just(doubles);
}
else {
return Observable.just(doubles).delay(due-now, TimeUnit.MILLISECONDS);
}
}
})
;
}
public static void main(String[] args) throws InterruptedException {
routeFeeder().subscribe(new Action1<double[]>() {
@Override
public void call(double[] doubles) {
System.out.println(Arrays.toString(doubles));
}
});
Thread.sleep(20000);
}
}