I'm looking to create a LocationHandler class that returns an observable<Location>
whose I can send a new Location and subscribers get the last one added and any subsequent values.
I've written this class, it works but I don't know if it's the correct way to do it because I've added a callback and I smell it bad.
Thanks for any help.
public class LocationHandler {
private MessageHandler<Location> onNewItem;
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = getHookedObservable()
.mergeWith(locationInitBuilder.build())
.replay(1).autoConnect();
}
private Observable<Location> getHookedObservable() {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
onNewItem = location -> e.onNext(location);
}
});
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){ // <---------- add new values
if (onNewItem != null){
onNewItem.handleMessage(address);
} else {
throw new IllegalStateException("Cannot add an item to a never subscribed stream");
}
}
}
Following @Blackbelt advice I've modified it with a ReplaySubject.
public class LocationHandler {
private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = locationInitBuilder.build()
.mergeWith(inputStream)
.replay(1).autoConnect();
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){
inputStream.onNext(address);
}
}