问题标签 [apache-bahir]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
94 浏览

scala - Apache Bahir,向 ActorReceiver 发送内容

我正在尝试使用 Apache Bahir 连接到 Akka,使用 Spark Streaming 设置一个简单的过程。我试图以他们为榜样,与这位较老的人一起学习。我有一个简单的转发演员

我创建了一个流

配置如下所示:

我的问题是:如何向 Forwarder 演员发送消息?也许我不明白在这种情况下如何使用 Akka Remote。当应用程序启动时,我看到一个日志

后来我看到

这似乎提醒了ScalaDoc中的描述:

总而言之,我不确定我应该如何向 Forwarder 演员发送消息。谢谢你的帮助!

0 投票
1 回答
269 浏览

apache-spark - ApacheSpark 流上的 ApacheBahir 结构化流连接器的架构问题

我正在尝试将 Apache Spark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。

我正在创建结构化流,如下所示:

到目前为止一切顺利,在 REPL 中我得到了这个 df 对象,如下所示:

但是,如果我开始使用这一行从流中读取:

我收到以下错误:

我的直觉说架构有问题,所以我添加了一个:

但这无济于事,有什么想法吗?

0 投票
2 回答
174 浏览

scala - ApacheSpark 流上的 ApacheBahir 结构化流连接器丢失了架构

我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。

我正在创建结构化流,如下所示:

到目前为止一切顺利,在 REPL 中我得到了这个 df 对象,如下所示:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]

我从这个线程中了解到,每次连接时我都必须更改客户端 ID。所以这解决了,但如果我开始使用这一行从流中读取:

val 查询 = df.writeStream。输出模式(“追加”)。
格式(“控制台”).start()

然后生成的架构如下所示:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]

数据如下:

在此处输入图像描述

这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。

这是 ApacheBahir 的限制吗?

提供模式也无济于事,因为以下代码类似于相同的结果:

0 投票
1 回答
884 浏览

maven - 在使用 mvn install 添加 jar 时:安装文件:在项目 Standalone-pom 上:工件信息不完整或无效

我想将 bahir jar 推送到我的本地 m2 存储库。我正在使用 maven-3.5.0 下载的 tar.gz 和 jdk8,两者都设置为环境变量并且工作正常。

我使用maven从git的可用下载中为spark构建了apache bahir,这是BUILD SUCCESS,它创建了一个目标文件夹,其中包含名为bahir-parent_2.11-2.3.0-SNAPSHOT-tests.jar的jar

现在当我执行时:

它应该将 jar 推送到我本地的 .m2/repository。但相反,它给了我以下错误:

但是当我使用“apt install maven”:maven-3.3.9 时,我得到了 BUILD SUCCESS 我无法弄清楚为什么在使用 maven-3.5.0 时它会给我错误。

0 投票
1 回答
285 浏览

python-2.7 - “java.lang.NoSuchMethodError: org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(Z)V”的原因

我正在尝试通过修改提供的示例字数示例来使用 Apache Bahir 运行 spark 结构化流 MQTT。

SPARK 版本:spark-2.2.0-bin-hadoop2.7。

我正在使用这个命令来运行程序:bin\spark-submit --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.2.0 mqtt.py

下面是我的代码:

但是在获取查询时出现以下错误:

谁能帮助我哪里出错了?

0 投票
1 回答
380 浏览

python - MQTTTextStreamSource 的 getBatch 返回的 DataFrame 没有 isStreaming=true

我尝试将 MQTT 与 PySpark Structured Streaming 一起使用。

错误信息:

我不明白我的代码有什么问题。此外,根据这篇文章,结构化流 2.1.0 实际上由 Bahir MQTT 支持。我也尝试了 Spark 2.2.1 并遇到了同样的问题。

这就是我运行代码的方式:

我该如何解决这个问题?

0 投票
1 回答
309 浏览

twitter - Apache Spark 2.3.1 - twitter 不是包 org.apache.spark.streaming 的成员

首先,我一直在寻找这个问题一段时间,我可以看到存在其他解决方案,但是对于 Apache Spark 版本 2.3.1 没有。

简而言之,我正在尝试创建一个使用 bahir 在 spark 中执行分析 twitter 消息的应用程序。

但是,我使用的是 Apache Spark 版本 2.3.1,所以我找到了2.3.0-SNAPSHOT

但是当我尝试使用bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.3.0-SNAPSHOT它时,从我本地的 spark-shell 中找不到它:

我可能是一个愚蠢的假设,但我认为 2.3.0 可能适用于 2.3.1。

我可以获取2.2.1版本并且 twitter4j 似乎可以工作,但是我仍然遇到实际streaming.twitter_使用 Spark 2.3.1 的问题。从我当地的火花壳:

很高兴知道是否有人知道它是否可用于 Apache Spark 2.3.1?

还是我只是被迫降级我的 Spark 版本以使其正常工作?

我在一个名为 Zeppelin 的笔记本中执行此操作,但我也尝试在 Zeppelin 之外执行此操作,因此似乎与我正在使用的笔记本没有任何关系。

感谢您的任何见解。

0 投票
1 回答
427 浏览

spark-structured-streaming - 为什么使用 MQTT 源查询会抛出 ClassCastException“SerializedOffset 无法转换为 org.apache.spark.sql.execution.streaming.LongOffset”?

Spark Structured Streaming 代码时出现以下异常

18/12/05 15:00:38 错误 StreamExecution:查询 [id = 48ec92a0-811a-4d57-a65d-c0b9c754e093,runId = 5e2adff4-855e-46c6-8592-05e3557544c6] 以错误 java.lang.ClassCastException: org. apache.spark.sql.execution.streaming.SerializedOffset 无法在 org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:152 ) 在 org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614 )

每次我启动查询时都会发生此异常。当我在删除检查点后启动它时它确实有效。

Spark 结构化流代码如下,基本上我只是从 MQTT 队列中读取数据并写入 ElasticSearch 索引。

以下是使用的依赖项。我使用 MapR 分发。

火花提交命令

对此的任何帮助将不胜感激。

0 投票
1 回答
182 浏览

activemq - 使用 AMQSource 的 Flink 作业不会生成输出

我使用了 Apache Bahir 的 AMQSource 连接器来监听 ActiveMQ,但是当我运行 Flink 作业来使用来自 ActiveMQ 的数据时,没有生成输出。

例如,连接器正在侦听包含 4 条消息的 ActiveMQ,但是当我运行 Flink 作业时,没有数据被消耗。

0 投票
1 回答
109 浏览

scala - 如何在 MQTTUtils 中使用 createpairedStream?

我无法使用

MQTTUtils.createPairedStream()

在斯卡拉?

如何将主题列表指定为参数?我尝试了所有方法,如字典、列表、元组,但没有奏效。然后我在python中尝试过,当时它显示了一个错误,比如

Java 网关进程在向驱动程序发送其端口号之前退出