问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
383 浏览

scala - 为什么从 Kafka 读取流失败并显示“无法找到存储在数据集中的类型的编码器”?

我正在尝试将 Spark Structured Streaming 与 Kafka 一起使用。

我从 Spark 文档中得到了我的代码,我得到了这个构建错误:

找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。.as[(字符串,字符串)]

我在其他 SO 帖子上读到这是由于缺少import spark.implicits._. 但这对我没有任何改变。

更新

0 投票
1 回答
3584 浏览

python - 使用 python 构建 Spark 结构化流

我正在尝试使用 Kafka 和 Python 构建结构化流。要求:我需要在 Spark 中处理来自 Kafka(JSON 格式)的流数据(执行转换),然后将其存储在数据库中。

我有 JSON 格式的数据,例如, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

我打算spark.readStream用于阅读 Kafka 之类的内容,

我参考了这个链接以供参考,但没有得到如何解析 JSON 数据。我试过这个,

但看起来它不起作用。

任何使用 Python 处理 Spark 结构化流的人都可以指导我继续使用示例示例或链接吗?

使用,

程序运行,但我在控制台上得到值,

我在这里错过了什么吗。

0 投票
1 回答
10513 浏览

scala - 如何使用结构化流从 Kafka 读取 JSON 格式的记录?

我正在尝试使用结构化流方法,使用基于 DataFrame/Dataset API 的 Spark-Streaming 从 Kafka 加载数据流。

我用:

  • 火花 2.10
  • 卡夫卡 0.10
  • 火花-sql-kafka-0-10

Spark Kafka DataSource 定义了底层架构:

我的数据采用 json 格式,它们存储在value列中。我正在寻找一种方法如何从 value 列中提取底层架构并将接收到的数据帧更新为存储在value中的列?我尝试了下面的方法,但它不起作用:

在这里我得到了异常org.apache.spark.sql.AnalysisException: Can't extract value from value#337;,因为在创建流时,里面的值是未知的......

你有什么建议吗?

0 投票
3 回答
5463 浏览

java-8 - 如何在 Spark 结构化流中连接两个流?- Java 8

我知道,尚不支持两个流之间的连接。但是有办法做到吗?还是我需要等待 Spark 2.2.0?谢谢

0 投票
1 回答
367 浏览

apache-spark - pyspark 中的结构化流

我正在尝试将数据从另一台服务器流式传输到 HBase,并能够在 Python 中定义不同的列族。我在 Spark 文档中环顾四周,只看到:

我怎样才能有相同的实现直接写入 HBase 并能够将数据映射到不同的列族?

0 投票
2 回答
639 浏览

scala - 如何使用结构化流将镶木地板文件从 HDFS 复制到 MS SQL Server?

我正在尝试使用 Spark Streaming 将 HDFS 中的镶木地板文件复制到 MS Sql Server。我正在为 MS SQL Server 使用 JDBC 驱动程序。我的代码是:

我收到错误:

如果有人以前在此工作过,请提供修复。

0 投票
3 回答
2705 浏览

apache-spark - 结构化流式 Kafka 源偏移存储

我正在使用 Kafka (Integration guide)的结构化流源,如上所述,它不会提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存储在哪里?如果不提交偏移量,是否有任何方法可以监控火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每条记录附带的偏移量不适合用例) ?

干杯

0 投票
2 回答
2930 浏览

apache-spark - 使用 Spark 结构化流处理后删除文件

在 Spark Structures Streaming 中使用文件源,并希望在处理完文件后删除它们。

我正在阅读一个充满 JSON 文件( , 等)的目录1.json2.json然后将它们写为 Parquet 文件。我想在成功处理每个文件后删除它。

0 投票
2 回答
2742 浏览

apache-spark - Spark Structured Streaming 中的 KafkaSource 的“偏移量已从 X 更改为 0”错误是什么?

我在带有检查点的 Spark Structured Streaming 应用程序中使用 KafkaSource 收到错误“偏移量已从 X 更改为 0,某些数据可能已丢失”,但它似乎实际上并没有引起任何问题。我试图弄清楚错误的实际含义。

我的设置如下。

  • 我在 docker 容器中运行了 Kafka(0.10.1.0),并在 /tmp/kafka-logs 上安装了一个命名卷,以便在重新启动之间保留日志。

  • 我在另一个 docker 容器中有一个 Spark Structured Streaming (2.1.1) 应用程序。流使用来自 Kafka 的数据。他们还在重新安装在命名卷中的位置使用检查点,以确保元数据在重新启动之间保持不变。

  • 我使用实现ForeachWriter接口的自定义接收器,这意味着我必须实现自己的已处理版本日志,以便在一切重新启动时,我可以告诉 Spark Streaming 不要重新处理已处理的内容。

所有这一切都运作良好,数据从 Kafka 正确使用,我的自定义接收器正确处理它。

现在如果我杀死Spark Streaming应用程序,让Kafka中的数据堆积然后重新启动Spark Streaming,它会抛出以下错误,表明Kafka中的某些数据不再可用

但是在抛出错误之后,我看到我的流正常启动。Spark Streaming 正确地将堆积在 Kafka 中的数据推送到我的自定义接收器,并具有预期的版本。然后我的接收器继续并正确处理新数据。

因此,该错误表明某些数据在 Kafka 中不再可用,但仍设法被 Spark Streaming 正确使用。

如果我重新启动 Spark Streaming 应用程序,即使没有数据被推送到 Kafka,我也会再次收到相同的错误。如果我开始将新数据推送到 Kafka,系统将继续正确处理它。

有人知道这里会发生什么吗?我是否错误地解释了错误?

0 投票
1 回答
892 浏览

apache-spark - 在 Spark 结构化流中获取窗口的所有行

我有一个用例,我们需要在窗口内的数据中查找模式。我们正在试验结构化流。我们有一个连续的事件流,并且正在寻找诸如事件 A(设备断开连接)在 10 秒内跟随事件 B(设备重新连接)的模式。或事件 A(断开连接)在 10 秒内没有跟随事件 B(重新连接)。

我正在考虑使用窗口函数将数据集分组为 10 秒的窗口桶,并在每次更新窗口值时检查模式。看起来窗口函数真的被用作结构化流中的 groupBy ,这迫使我使用聚合函数来获取列值的高级聚合。

我想知道在结构化流中使用窗口函数时是否有一种方法可以遍历列的所有值。