我想做的是监听套接字数据并将其转换为我的 UI 可以订阅此事件并在 UI 上进行更改的可观察字符串
到目前为止,我创建了一个类 SocketConnection 维护在匕首连接发生正确并接收数据并能够正确处理接口,但想与 rxkotlin 一起应用。
使用 Socket.io、kotlin
套接字连接类
class SocketConnection : SocketStreamListener {
private var socket: Socket? = null
var responseSocket :ResponseHandler?= null
companion object {
var instance = SocketConnection()
}
override fun createSocket(socketQuery: SocketQuery): Socket? {
try {
val okHttpClient = UnsafeOkHttpClient.getUnsafeOkHttpClient()
IO.setDefaultOkHttpWebSocketFactory(okHttpClient)
IO.setDefaultOkHttpCallFactory(okHttpClient)
val opts = IO.Options()
opts.reconnection = false
opts.callFactory = okHttpClient
opts.webSocketFactory = okHttpClient
opts.query = "userID=" + socketQuery.userID + "&token=" + socketQuery.token
socket = IO.socket(CommonContents.BASE_API_LAYER, opts)
L.d("Socket object created")
} catch (e: URISyntaxException) {
L.e("Error creating socket", e)
}
return socket
}
override fun createSocketListener(socket: Socket) {
L.d("inside the socket Listner")
socket.connect()?.on(Socket.EVENT_CONNECT, {
L.d("connected")
listenSocketEvents()
//socketDataListener()
createMessageListener()
})?.on(Socket.EVENT_DISCONNECT,
{
L.d("disconnected")
return@on
})
}
/**
* function used to listen a socket chanel data
*/
private fun listenSocketEvents() {
/* socket?.on("1502", { args ->
// This Will Work
L.d("Socket market depth event successfully")
val socketData = args[0] as String
L.d(socketData)
// instance.data = Observable.just(socketData)
//data!!.doOnNext({ socketData })
*//*
data = args[0] as String
for (i in 0 until arr.size) {
arr[i].socketStreamingData(data)
}*//*
})*/
}
// This Will Not Work
fun socketDataListener(): Observable<String>{
return Observable.create({
subscibe ->
// L.d("Socket market depth event successfully")
socket?.on("1502", { args ->
L.d("Socket market depth event successfully")
val socketData = args[0] as String
subscibe.onNext(socketData)
})
})
}
}
存储库
fun getSocketData(): Observable<String> {
// L.e("" + SocketConnection.instance.socketDataListener())
return SocketConnection.instance.createMessageListener()
}
视图模型
fun getSocketData(): Observable<String>{
return groupRepository.getSocketData()
}
OnFragment (UI)
private fun getSocketUpdate(){
subscribe(watchlistViewModel.getSocketData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
L.d("SocketData : " + it.count())
}, {
L.e("Error")
}))
}
在这个 UI 中使用一次性订阅方法进入基类。
请让我提前知道我做错了什么thanx