0

I'm looking to create a LocationHandler class that returns an observable<Location> whose I can send a new Location and subscribers get the last one added and any subsequent values.

I've written this class, it works but I don't know if it's the correct way to do it because I've added a callback and I smell it bad.

Thanks for any help.

public class LocationHandler {
    private MessageHandler<Location> onNewItem;
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = getHookedObservable()
                .mergeWith(locationInitBuilder.build())
                .replay(1).autoConnect();
    }


    private Observable<Location> getHookedObservable() {
        return Observable.create(new ObservableOnSubscribe<Location>() {
            @Override
            public void subscribe(ObservableEmitter<Location> e) throws Exception {
                onNewItem = location -> e.onNext(location);
            }
        });
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){ // <---------- add new values
        if (onNewItem != null){
            onNewItem.handleMessage(address);
        } else {
            throw new IllegalStateException("Cannot add an item to a never subscribed stream");
        }
    }
}

Following @Blackbelt advice I've modified it with a ReplaySubject.

public class LocationHandler {
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = locationInitBuilder.build()
                .mergeWith(inputStream)
                .replay(1).autoConnect();
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){
        inputStream.onNext(address);
    }
}
4

2 回答 2

3

you could use a Subject instead of MessageHandler. Subject can act as observable and subscriber at the same time. You could have a method in your LocationHandler that returns Subject#asObservable to which you will subscribe. Internally, when setLocation, you will have to invoke Subject#onNext providing the location. There are different types of Subjects available. Please refer the documentation to choose the one that suits better your needs. E.g.

  public class LocationHandler {
     BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create();

     public Observable<GeevLocation> getLocation() {
        return mLocationSubject.asObservable();
     }

    public void setLocation(GeevLocation address){
        mLocationSubject.onNext(address);
    }
 }

from the outside call getLocation and subscribe to the returned Observable. When a setLocation is called you will get the object onNext

于 2016-11-08T16:13:07.697 回答
1

as Blackbelt already told you, you would use a Subject. In particular I would use a BehaviorSubject. Subjects are hot by default, but they can replay events by subscription. BehaviorSubject will give you the last emitted value or the init-value, if you subscribe. Every subscriber will get the values as the come in. The stream will never finish because it is hot. Please remeber to handle errores, because the second onError will be swallowed.

Example-Code

class Location {

}

class LocationInitializationBuilder {
    static Location build() {
        return new Location();
    }
}

class LocationHandler {
    private Subject<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        Location initialValue = LocationInitializationBuilder.build();

        locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized();
    }

    public Observable<Location> getLocation() {
        return locationObservable.hide();
    }

    public void setLocation(Location address) { // <---------- add new values
        locationObservable.onNext(address);
    }
}

public class LocationTest {
    @Test
    public void name() throws Exception {
        LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder());

        TestObserver<Location> test = locationHandler.getLocation().test();

        locationHandler.setLocation(new Location());

        test.assertValueCount(2);
    }
}
于 2016-11-08T17:28:33.817 回答