问题标签 [spark-streaming-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
462 浏览

pyspark - net.jpounz.lz4 使用火花流从 kafka 读取时出现异常

我使用 python 使用 spark 2.4.0。并从 kafka_2.11-2.0.0 (binary not source)读取数据。我正在使用 spark-submit --jars sspark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar script.py 错误报告中出现错误消息,如果有人可以提供帮助,谢谢:)

jar 文件的 pom.xml : spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar :

0 投票
0 回答
760 浏览

python - 错误实用程序:python 的线程标准输出编写器中未捕获的异常

我使用 python 使用 spark 2.4.0。并从 kafka_2.11-2.0.0 读取数据(二进制不是源)。我正在使用 spark-submit --jars sspark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar script.py 错误报告中出现错误消息,如果有人可以提供帮助,谢谢:)

jar 文件的 pom.xml : spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar :

spark-kafka WorldCount 脚本在 spark streaming 的例子中提出

0 投票
1 回答
1243 浏览

apache-spark - 如何优化 Spark 结构化流应用程序中的执行程序实例数量?

运行

YARN集群模式

应用

  • Spark 结构化流
  • 从 Kafka 主题中读取数据

关于 Kafka 话题

  • 1 个具有 4 个分区的主题 - 现在。(可以更改分区数)
  • 每 1 秒在主题中添加最多 2000 条记录。

我发现Kafka主题分区的数量与火花执行器的数量(1:1)相匹配。
所以,就我而言,到目前为止我所知道的,4 个 spark executor 是我认为的解决方案。
但我担心数据吞吐量——能保证 2000 rec/sec 吗?

是否有关于在火花结构化流中设置正确配置的任何指导或建议?
特别是spark.executor.coresspark.executor.instances或者关于执行者的东西。

0 投票
0 回答
408 浏览

apache-spark - 如何构建一个数据框,它是两个 kafka 流与“键”列的连接,其余列是最新值

我的 Spark 2.4.x (pyspark) 应用需要:

  1. 输入是两个 Kafka 主题,输出是一个 Kafka 主题
  2. 一个“流表”,其中
    • 有一个逻辑键和
    • 剩余的列应该是来自任一流的最新值。
  3. 亚秒级延迟。测试表明,watermarks在不使用时这是可以实现的。

这似乎是一个基本的事情,但它并不完全适合我。


例子:

注意:在下面的示例中,T1、T2 和 T2 时间点可能相隔秒/分钟/小时。

T1)在时间T1

KafkaPriceTopic获得 1 个消息负载(我们称之为P1):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"1", "PriceCol": "101.5"}

带有有效负载的KafkaVolumeTopic 1 消息(我们称之为V1):
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"1", "VolumeCol": "50"}

我想要一个看起来像这样 的结果:DataFrame

T2) KafkaPriceTopic 1 消息(P2):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"2", "PriceCol": "101.6"}

结果DataFrame

注意P1不再相关

T3) KafkaVolumeTopic 1 消息V2
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"2", "VolumeCol": "60"}

结果DataFrame

注意P1V1不再相关


什么有效

  1. get_json_object从有效负载(现在)中 提取 json ,join这两个主题的流。
  2. 然而。这将产生(w/o watermark)a ,它包含Sec1DataFrame收到的所有价格和交易量,而不仅仅是最新的。
  3. 所以这后面是一个groupBy(...).agg(last(...),...). 但是我坚持只获得最新值的一行。

问题

然而,最后的agg&last()并没有始终如一地做到这一点。

  1. 当 KafkaVolumeTopic 收到一条新消息时,结果可能会与来自 KafkaPriceTopic 的旧消息连接。
  2. 进一步orderBy的 /sort 不能在没有聚合的流上使用。

限制

  1. 我不能在那groupBy之前join因为那需要withWatermark,而且我认为我的应用程序不能使用watermark。理由:
    • 该应用程序应该能够在一天中的任何时间加入给定 SecurityCol 的两个主题。
      • 如果 PriceTopic 在上午 9 点收到消息,而 VolumeTopic 在上午 10 点收到消息
      • 我希望两人能加入并出席
    • 水印限制何时以append模式发出数据。所以不能在这里使用水印,因为时间范围是一整天。

有任何想法吗?

0 投票
2 回答
519 浏览

spark-structured-streaming - 如何使用 spark 结构化流从 kafka 连续流式传输数据?

我正在尝试将我的 DStream api 迁移到结构化流式传输,并在如何等待或无法将微批处理与结构化流式传输相关联时犹豫不决。

在下面的代码中,我正在创建直接流并永远等待,以便我可以无限期地使用 kafka 消息。

我怎样才能在结构化流媒体中实现同样的效果?

sparkSession.streams.awaitAnyTermination 就足够了吗?

我在流式传输和结构化流式传输中都放了一个示例代码。任何指示都会很有帮助。谢谢

结构化流式等效

0 投票
1 回答
4224 浏览

pyspark - pyspark结构化流式批量写入镶木地板

我正在对 spark 结构化流数据帧进行一些转换。我将转换后的数据帧作为镶木地板文件存储在 hdfs 中。现在我希望对 hdfs 的写入应该分批进行,而不是先转换整个数据帧,然后再存储数据帧。

0 投票
0 回答
360 浏览

apache-spark - 火花工作卡住的原因可能是什么

我正在运行 Spark 2.3.1 独立集群。我的工作是每 2 分钟从 Kafka 小批量中消耗一次,并将聚合写入某个商店。作业如下所示:

这项工作还不错,运行了将近一周。但是后来,它堆积了 - 我看到批次正在运行,但在 Kafka 中开始出现滞后,并且偏移量也变得没有变化。

在日志中,有以下消息,我可以看到它们每 2 分钟重复一次(恕我直言,批处理间隔)。所以我认为这项工作是由于重试而挂起的。

重启解决了这个问题。
我的问题 - 任何想法为什么会发生这种情况,我可以做什么/配置以防止再次发生这种情况?

0 投票
1 回答
1407 浏览

pyspark - 在火花流/结构化流中从 Kafka 读取 avro 消息

我是第一次使用 pyspark。火花版本:2.3.0 卡夫卡版本:2.2.0

我有一个 kafka 生产者,它以 avro 格式发送嵌套数据,我正在尝试在 pyspark 中编写 spark-streaming/结构化流的代码,这会将来自 kafka 的 avro 反序列化为数据帧,并进行转换以 parquet 格式将其写入 s3。我能够在 spark/scala 中找到 avro 转换器,但尚未添加对 pyspark 的支持。如何在 pyspark 中进行相同的转换。谢谢。

0 投票
1 回答
165 浏览

apache-spark - Kafka - Spark Streaming 集成:DStreams 和任务重用

我试图了解 Spark Streaming(不是 Structured Streaming)的内部结构,特别是任务查看 DStream 的方式。我将在 scala 中查看 Spark 的源代码,这里。我了解调用堆栈:

我了解 DStream 确实是 RDD 的哈希图,但我试图了解任务查看 DStream 的方式。我知道 Kafka Spark 集成基本上有两种方法:

  • 基于使用高级 Kafka 消费者 API 的接收器

    在这里, Receiver任务在每个批处理间隔(例如 5 秒)创建一个新的(微)批处理,其中包含 5 个分区(=> 1 秒块间隔),并将下游交给常规任务。

    问题:考虑我们的示例,每个微批次每 5 秒创建一次;恰好有 5 个分区,并且所有微批次的所有这些分区都应该以完全相同的方式在下游进行 DAG 处理,是相同的常规任务一遍又一遍地重复用于每个微批次 (RDD) 的相同分区 id 作为长时间运行的任务?例如

    如果在时间T0将分区(P1,P2,P3,P4,P5)的ubatch1分配给任务 id (T1, T2, T3, T4, T5),则分区( P1',P2',P3',P4 ',P5')在时间T5也被分配给同一组任务(T1、T2、T3、T4、T5)或者是否会为ubatch2创建新任务(T6、T7、T8、T9、T10)

    如果是后者,那么当您已经知道有任务在做完全相同的事情并且可以在长时间运行时重复使用时,必须每 5 秒通过网络向执行程序发送一次新任务不是性能密集型的吗?任务?

  • 直接使用低级 Kafka 消费者 API

    这里一个 Kafka 分区映射到一个 Spark 分区,因此映射到一个任务。同样,考虑主题t的 5 个 Kafka 分区,我们得到 5 个 Spark 分区及其对应的任务。

    问题:假设T0的ubatch1有分区(P1,P2,P3,P4,P5)分配给任务(T1,T2,T3,T4,T5)。在时间T5的 ubatch2分区( P1',P2',P3',P4',P5')是否也被分配给同一组任务(T1, T2, T3, T4, T5)或新任务(T6, T7、T8、T9、T10)ubatch2创建?

0 投票
0 回答
366 浏览

apache-spark - 找不到源 KAFKA 的 Kafka Spark Streaming 归档

我正在尝试 Steam 制作人主题表单 Kafka。收到Kafka不是有效数据源的错误

我导入了所有必需的包,如 Kafka SQL 流等。

代码:

结果 :

线程“主”org.apache.spark.sql.AnalysisException 中的异常:找不到数据源:kafka。请按照《Structured Streaming + Kafka Integration Guide》部署部分部署应用;

结构化流媒体指南也使用了相同的格式。