我在 Citrix Xenserver 服务器上有一个场景,在同一网络 (10.0.1.0/24) 上有 3 个虚拟机 (VM) Centos7。每个 VM 负责使用 Scala 上的 Apache Spark(逻辑回归)进行预测。我在 Docker 上使用 Orion Context broker (CB) 来创建将被触发以询问预测的订阅。CB 它仅位于 10.0.1.4 VM 中,我使一些端口可用于从其他机器访问。
我的 docker-compose.yml:
mongo:
image: mongo:4.4
command: --nojournal
orion:
image: fiware/orion
links:
- mongo
ports:
- "1026:1026"
- "1027:1027"
- "1028:1028"
- "9090:9090"
command: -dbhost mongo -corsOrigin __ALL
例如,要从 10.0.1.2 VM 访问 CB,我使用 10.0.1.4:1028/..... 等等。
这是我面临问题的订阅(也许与 10.0.1.3 VM 相关的其他订阅也必须有同样的问题)
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "Suscripcion de anemia para monitorear al Paciente",
"subject": {
"entities": [
{
"id": "Paciente1",
"type": "Paciente"
}
],
"condition": {
"attrs": ["calculateAnaemia"]
},
"expression":{
"q":"calculateAnaemia:1"
}
},
"notification": {
"http": {
"url": "http://10.0.1.2:9002/notify"
},
"attrs": ["gender","age","hemoglobin","mch","mchc","mcv"]
},
"expires": "2040-01-01T14:00:00.00Z",
"throttling": 10
}
EOF
我在 10.0.1.2 VM 上有一个代码,它正在使用 Fiware Cosmos 在端口 9002 上侦听与此订阅相关的更改:
对于10.0.1.4 VM 上的 eventStream变量,端口为 9004,对于 10.0.1.3 VM,端口为 9003
对于conf变量,我在具有 10.0.1.4 IP 的 10.0.1.4 VM 上设置“spark.driver.host”,并将 10.0.1.3 VM 设置为 10.0.1.3 IP
import esqMensajeria.ActorSysMensajeria.ActoresEsquema
import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.fiware.cosmos.orion.spark.connector._
object Main extends App{
val conf = new SparkConf().setMaster("local[*]").setAppName("AnaemiaPrediction").set("spark.driver.host", "10.0.1.2")
val ssc = new StreamingContext(conf, Seconds(10))
// Create Orion Source. Receive notifications on port 9002
val eventStream = ssc.receiverStream(new OrionReceiver(9002))
// Esquema de mensajeria
val actorSysMsj = new ActoresEsquema ()
println("Esperando cambios para obtener información...")
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val gender: Int = entity.attrs("gender").value.asInstanceOf[Number].intValue()
val age: Int = entity.attrs("age").value.asInstanceOf[Number].intValue()
val hemoglobin: Double = entity.attrs("hemoglobin").value.asInstanceOf[Double].doubleValue()
val mch: Double = entity.attrs("mch").value.asInstanceOf[Double].doubleValue()
val mchc: Double = entity.attrs("mchc").value.asInstanceOf[Double].doubleValue()
val mcv: Double = entity.attrs("mcv").value.asInstanceOf[Double].doubleValue()
actorSysMsj.start((entity.id, gender,age,hemoglobin,mch,mchc,mcv),conf)
(entity.id, gender,age,hemoglobin,mch,mchc,mcv)
})
processedDataStream.print
ssc.start()
ssc.awaitTermination()
}
但是当我触发时,订阅失败显示下一个(不仅在 10.0.1.2 上,也在 10.0.1.3 VM 上):
{"id":"61cb8569a1e87a254e16066d",
"description":"Suscripcion de anemia para monitorear al Paciente",
"expires":"2040-01-01T14:00:00.000Z",
"status":"failed",
"subject":{"entities":[{"id":"Paciente1","type":"Paciente"}],
"condition":{"attrs":["calculateAnaemia"]}},
"notification":
{"timesSent":3,
"lastNotification":"2021-12-29T00:03:49.000Z",
"attrs":"gender","age","hemoglobin","mch","mchc","mcv"],"
onlyChangedAttrs":false,
"attrsFormat":"normalized",
http":{"url":"http://10.0.1.2:9002/notify"},
"lastFailure":"2021-12-29T00:03:54.000Z",
"lastFailureReason":"Timeout was reached"},
"throttling":10}]
奇怪的是,当我使用与具有 CB 容器的 10.0.1.4 VM 相关的订阅时,订阅仍然处于活动状态,我得到了预期的结果。
这是订阅:
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "Suscripcion de deceso para monitorear al Paciente",
"subject": {
"entities": [
{
"id": "Paciente1",
"type": "Paciente"
}
],
"condition": {
"attrs": [
"calculateDeceased"
]
},
"expression":{
"q":"calculateDeceased:1"
}
},
"notification": {
"http": {
"url": "http://10.0.1.4:9004/notify"
},
"attrs": [
"gender","age","hasAnaemia","creatinePP","hasDiabetes","ejecFrac","highBloodP","platelets","serumCreatinine","serumSodium","smoking","time"
]
},
"expires": "2040-01-01T14:00:00.00Z"
}
EOF
这是触发并完美处理时的答案:
{"id":"61caab07a1e87a254e160665",
"description":"Suscripcion de deceso para monitorear al Paciente",
"expires":"2040-01-01T14:00:00.000Z",
"status":"active",
"subject":{"entities":[{"id":"Paciente1","type":"Paciente"}],
"condition":{"attrs":["calculateDeceased"]}},
"notification":{"timesSent":1,"lastNotification":"2021-12-28T06:15:41.000Z",
"attrs":["gender","age","hasAnaemia","creatinePP","hasDiabetes","ejecFrac","highBloodP","platelets","serumCreatinine","serumSodium","smoking","time"],
"onlyChangedAttrs":false,
"attrsFormat":"normalized",
"http":{"url":"http://10.0.1.4:9004/notify"},
"lastSuccess":"2021-12-28T06:15:43.000Z",
"lastSuccessCode":200}}
我不得不说我是 Spark、Scala 甚至 Fiware 的新手。但是项目就是项目,也许我错过了我在设置这个项目时阅读的所有内容中没有看到的东西。此外,我停止了所有防火墙 (firewalld),因为我在与 10.0.1.2 和 10.0.1.3 虚拟机相关的订阅上遇到“无法连接到服务器”错误。我也做了一个 sudo yum 更新,我在彼此之间 ping 了所有的虚拟机,我得到了很好的响应。我不知道它是否重要的一件事:我的所有虚拟机上都有互联网,但我无法 ping 例如... www.google.com或 8.8.8.8。所以,欢迎提出任何建议!我为我的英语道歉。先谢谢了~