0

我想创建一个使用用户名和密码进行订阅的 MQTT 流。我正在使用 apache-bahir 创建 mqtt 流。当我使用 MQTTUtils.createStream() 方法时,它只接受 ssc、brokerurl、topic 和 StorageLevel 作为参数。当我提供用户名和密码时,它显示“无法解析符号 createStream()。

val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)

https://bahir.apache.org/docs/spark/2.3.0/spark-streaming-mqtt/

这个页面是我为实现而参考的。

val sc = new SparkContext()
val ssc = new StreamingContext(sc,Seconds(10))
val stream = MQTTUtils.createStream(ssc,"broker.mqttdashboard.com","tag_topic",StorageLevel.DISK_ONLY,"clientid","username","password")

错误消息是无法解析符号 MQTTUtils.createStream()

4

2 回答 2

0

试试这个:

 val spark  = SparkSession
      .builder()
      .appName("app")
      .config("spark.master", "local[*]")
      .getOrCreate()

val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("username", username)
  .option("password", password)
  .option("topic", topic)
  .option("brokerUrl",brokerUrl )
  .load()

你拥有Github中的所有信息

于 2019-11-19T15:32:49.507 回答
0

使用 JavaStreamingContext 我们可以解决这个问题

val jssc = new JavaStreamingContext(sc,Seconds(2))
val clientId = MqttClient.generateClientId()
val stream =  MQTTUtils.createPairedByteArrayStream(jssc,brokerUrl,topic,clientId,username,password,true)
stream.print()
jssc.start()
jssc.awaitTermination()
于 2019-11-20T09:26:12.427 回答