2

我在使用ReactFXTableView的反应式绑定时遇到了问题。然而,驱动绑定源自RxJava,在下面的代码中,您将找到一个将其转换为.setCellValueFactoryEventStream ObservableEventStream

但是,TableView除了列的初始绑定值之外,从不显示任何内容。当我将 a 添加System.out.printlnsetCellValueFactory()正文时,我发现它setCellValueFactory()被无限循环调用,并且发出的值从未进入绑定。

我真的对此感到困惑。如何停止这种行为并让 Observable 成功地将单个值发送到 EventStream 和 Binding?

这是我的SSCCE。

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(Observable.just(1), Observable.just(2)));
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for x col");
                return toReactFX(cb.getValue().getX().map(x -> (Number) x)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for y col");
                return toReactFX(cb.getValue().getY().map(y -> (Number) y)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> EventStream<T> toReactFX(Observable<T> obs) {
        EventSource<T> es = new EventSource<>();
        obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
        return es;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

更新

我想我在下面提出的解决方案中发现了一个问题。如果Observable在平台线程之外的其他线程上发出任何值,则不会向属性填充任何值。

我试图通过在将线程调用rxToProperty放在平台线程之前检查线程调用是否是平台线程来解决这个问题,但这不起作用并再次导致无限循环。我不知道 Property 的线程安全性是否使事情脱轨。

但是我怎样才能在多个线程上发出一个 Observable 来安全地填充一个Property?这是我更新的 SSCCE 显示此行为。“X”列永远不会填充,因为它是多线程的,但“Y”列会填充,因为它保留在平台线程上。

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(
                Observable.just(1, 5, 6, 8,2,3,5,2).observeOn(Schedulers.computation()), 
                Observable.just(2,6,8,2,14)
                )
            );
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);


            System.out.println("Constructor is happening on FX THREAD: " + Platform.isFxApplicationThread());

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> { 
                System.out.println("CellValueFactory for X called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getX().map(x -> (Number) x));
                }
            );

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("CellValueFactory for Y called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getY().map(y -> (Number) y));
            }
        );

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ObjectProperty<T> property = new SimpleObjectProperty<>();

        obs.subscribe(v -> { 
            if (Platform.isFxApplicationThread()) {
                System.out.println("Emitting " + v + " on FX Thread");
                property.set(v);
            }
            else { 
                System.out.println("Emitting " + v + " on Non-FX Thread");
                Platform.runLater(() -> property.set(v));
            }
        });

        return property;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}
4

1 回答 1

2

我找到了一个解决方案,尽管我还没有完全弄清楚 OP 代码中问题的确切根本原因(如果有人能说明原因,我会将其标记为答案)。我最初认为这Platform.runLater()可能会导致最初设置的无限循环(如下所示)。

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
    return es;
}

事实证明,这个理论是正确的。删除Platform.runLater()导致无限循环消失。也许发出的值一直把自己扔到 GUI 线程的后面,因此永远不会进入绑定?但是仍然没有发出任何内容,并且表值保持在 -1,即初始 Binding 值。

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> es.push(foo), e -> e.printStackTrace());
    return es;
}

我确实找到了一些有用的东西。我创建了一个名为 的新转换方法rxToProperty(),并用它替换了对 的调用rxToReactFX()。在那之后,一切似乎都很好。

private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
    ObjectProperty<T> property = new SimpleObjectProperty<>();
    obs.subscribe(v -> property.set(v));
    return property;
}

这是新的TableColumn设置。

TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
xCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getX().map(x -> (Number) x)));

TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
yCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getY().map(y -> (Number) y)));

如果有人有更好的解决方案,或者可以解释为什么Platform.runLater()andEventStream不起作用,我会将其标记为已接受的答案。

更新

在非 FX 线程上发出的 Observable 存在一些问题,并且值永远不会填充到Property. 我发现这可以通过使用cache()来保存最后一个值来解决,并且它将在订阅的调用线程上重新发送,这将是 FX 线程。我还做了一些同步并只读了返回的Property.

private static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.cache(1).subscribe(v -> { 
            synchronized(property) { 
                if (Platform.isFxApplicationThread()) {
                    System.out.println("Emitting val " + v + " on FX Thread");
                }
                else { 
                    System.out.println("Emitting val " + v + " on Non-FX Thread");
                }
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

最终更新 Tomas Mikula 在 ReactFX GitHub 项目上对这种行为以及解决方案提供了一些非常有用的见解。

https://github.com/TomasMikula/ReactFX/issues/22

于 2015-06-12T18:45:38.933 回答