问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
100 浏览

apache-spark - 独立集群中的 Spark Streaming 多次接收相同的 Kafka 消息

当我在本地使用 Spark 流应用程序时,每条记录只需要一次,但是,当我将它部署在独立集群上时,它会从 Kafka 读取两次相同的消息。另外,我已经仔细检查过这不是与 kafka 生产者有关的问题。

这就是我创建流的方式:

这是 kafkaParams 配置:

集群有 2 个工作人员,每个工作人员有一个执行程序,看起来每个工作人员都接收相同的消息。任何人都可以帮助我,好吗?

编辑

例如,当我从 kafka 发送一个点时。从这段代码:

我得到"Taken 2 points". 如果我打印它们,它们是平等的。难道我做错了什么?

我在用着

  • “org.apache.spark”%%“spark-streaming-kafka-0-10”%“2.2.0”
  • 火花2.2.0
  • kafka_2.11-0.11.0.1
0 投票
1 回答
64 浏览

scala - Scala:通过 DStream 拆分来自 kafka 的数据

我以以下形式从kafka接收数据

我想访问电子邮件 ID 和名字,并希望将其与来自 cassandra 的数据以以下形式进行比较:

0 投票
1 回答
126 浏览

apache-spark - 在使用带有 kafka 的火花流时,无法迭代从将 Dstream 转换为 List 中检索到的键列表

下面是使用 kafka 进行火花流式传输的代码。在这里,我试图将批处理的密钥作为 Dstream 获取,然后将其转换为 LIST。为了对其进行迭代并将与每个键有关的数据放在以该键命名的 hdfs 文件夹中。

关键基本上是 - Schema.Table_name

提取密钥,但它是 DStream[String] 类型

将其转换为列表并更新 var final_list_of_keys

现在尝试遍历列表。

但我收到错误 - 不支持在启动上下文后添加新的输入、转换和输出操作

当我尝试将 for 循环保留在 keys.foreachRdd 之外的列表中时,列表不会更新并保持为空。

有人可以建议我如何实际重做此代码以将键放在列表中,然后检查它们以将数据放入正确的目录中。

根据我的研究,我看到了帖子-

类似的帖子,但无法从中收集任何解决方案

另外,当我使用地图时,在 foreachRdd 中过滤,然后在其中过滤另一个 foreachRdd 可能会导致问题。参考帖子 -使用类似代码的帖子

0 投票
1 回答
148 浏览

dictionary - 在 PySpark 中使用 Map 解析和分配列名

这是我想要做的。

输入数据如下所示(制表符分隔):

数据通过 Kafka 传入,并使用以下代码进行解析。

解析后的日志应采用以下格式:

在我的代码中,“lines”和“parsed_log”的代码行并没有完成这项工作。你能告诉我怎么做吗?

0 投票
1 回答
296 浏览

scala - Scala 到 Pyspark

我正在尝试在 Dstream 和静态 RDD 之间执行连接。

PySpark

我收到此异常:“您似乎正在尝试广播 RDD 或从“

斯卡拉

但是,这里有人有同样的要求:How to join a DStream with a non-stream file?

这就是解决方案:

Pyspark 中的等价物是什么?

0 投票
1 回答
647 浏览

apache-spark - spark streaming DStream map vs foreachRDD,转换效率更高

只是为了变换,map和foreachRDD可以达到同样的目的,但是哪个效率更高呢?为什么?

例如,对于 DStream[Int]:

我知道 foreachRDD 将直接对 RDD 进行操作,但先映射接缝以将 DStream 转换为 RDD(不确定),因此 foreachRDD 接缝比映射更有效。然而,map 是一个转换操作,而 foreachRDD 是一个输出操作。因此,在进行转换时,map 应该比 foreachRDD 更有效。有谁知道哪一个是对的,为什么?感谢您的回复。

再添加一个对比:

哪个对转型更有效?

0 投票
1 回答
61 浏览

apache-spark - Spark 上的 Kafka 仅读取实时摄取

星火版本 = 2.3.0

卡夫卡版本 = 1.0.0

正在使用的代码片段:

当 Kafka 流实时运行时,我看到 spark 拉数据,但是如果我在 Spark 前一个小时启动 Kafka,它不会提取一小时前的数据。

这是预期的还是有办法在配置中进行设置?

代码运行使用:

0 投票
2 回答
543 浏览

apache-spark - 将 Dstream 中的分区数增加到大于直接方法中的 Kafka 分区数

根据 Direct 方法,它们是 32 个 Kafka 分区和 32 个消费者。但是 32 个消费者的数据处理速度比 Kafka 速率(1.5 倍)慢,这会导致 Kafka 中的数据积压。

我想增加每个消费者收到的 Dstream 的分区数。

我希望解决方案是增加消费者的分区,而不是增加 Kafka 的分区。

0 投票
1 回答
763 浏览

python-3.x - 调用我的函数时,pyspark updateStateByKey 失败

我只是试图运行有状态流的示例代码,但它失败并出现错误。无法理解为什么会发生。

Cloudera vm 5.13.3 上的 Spark 2.3 和 3.6 python

运行选项:

我的代码是:

流正在工作并正在侦听套接字,但是当我尝试在终端应用程序中输入行时失败

一个错误:

可能根本原因在于我的函数updatetotalcount,当我评论转换updateStateByKey(updatetotalcount)时,它会在输出中打印出结果:

请建议,为什么我会收到此错误?

0 投票
0 回答
268 浏览

apache-spark - 集群中作业运行时的火花流错误(纱线资源管理器)

我面临以下错误:
我编写了一个基于 Spark 流 ( Dstream ) 的应用程序来提取来自 PubSub 的消息。不幸的是,我在执行这项工作期间遇到了错误。实际上,我使用的是由 4 个节点组成的集群来执行 spark 作业。

作业运行 10 分钟后没有任何特定错误,我永久收到以下错误:

错误 org.apache.spark.streaming.CheckpointWriter:
无法将检查点任务提交给线程池执行程序 java.util.concurrent.RejectedExecutionException: 任务 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@68395dc9
从 java.util.concurrent被拒绝.ThreadPoolExecutor@1a1acc25
[正在运行,池大小 = 1,活动线程 = 1,排队任务 = 1000,已完成任务 = 412]