问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
2264 浏览

scala - Spark Structured Streaming - writeStream 每小时拆分数据

目前我们在生产环境中运行 Spark 流。我正在将代码转换为使用结构化流。我能够成功地从 Kinesis 读取数据并将(接收器)写入 S3 中的 Parquet 文件。

我们的业务逻辑要求,我们将流数据写入每小时文件夹中。来自 kinesis 的传入数据没有任何日期时间字段。所以不能按日期时间分区。我们定义了一个函数[getSubfolderNameFromDate()],它可以获取当前小时+日期(1822062017 - 一天中的第 18 小时,2017 年 6 月 22 日),因此我们可以在每小时文件夹中写入数据。

使用 Spark 流式传输,上下文会重新初始化并自动在下一小时文件夹中写入数据,但我无法通过结构化流式传输实现相同的目标。

例如,200 万条记录在一天中的第 4 个小时进行了流式传输,它应该被写入"S3_location/0422062017.parquet/",在接下来的一个小时内流式传输的数据应该在"S3_location/0522062017.parquet/"等等。

使用结构化流,它可以全天连续写入同一个文件夹,我理解这是因为它只评估一次文件夹名称并连续将数据附加到它。但是我想将新数据附加到每小时文件夹中,有没有办法做到这一点?

我目前正在使用以下查询:

0 投票
1 回答
982 浏览

apache-spark - 使用结构化 Spark 流解码 Java 枚举/自定义非案例类

我正在尝试使用 Spark 2.1.1 中的结构化流来读取 Kafka 并解码 Avro 编码的消息。我有根据这个问题定义的 UDF 。

但是我得到以下失败。

它在这条线上失败

另一种方法是为我拥有的自定义类定义编码器

但是在注册 messageEncoder 时失败并出现以下错误。

当我尝试使用 a mapafter执行此操作时load(),出现以下编译错误。

这是否意味着我不能对 Java 枚举使用结构化流?它只能与原语或案例类一起使用吗?

我阅读了一些相关的问题1 , 2 , 3,似乎有可能为类指定自定义编码器,即 UDT 在 2.1 中被删除,并且没有添加新功能。

任何帮助将不胜感激。

0 投票
2 回答
1501 浏览

apache-spark - 为什么 spark-submit 失败并显示“AnalysisException:kafka 不是有效的 Spark SQL 数据源”?

我使用 Spark 2.1.0 和 Kafka 0.10.2.1。

我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。

代码如下:

我的部署脚本如下:

但是我得到了错误:

我尝试将同一应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也尝试在客户端模式下使用纱线,我得到了同样的错误。

0 投票
1 回答
727 浏览

apache-spark - 如何从套接字读取流数据集?

下面的代码从套接字读取,但我没有看到任何输入进入工作。虽然我已经nc -l 1111运行并转储数据,但不确定为什么我的 Spark 作业无法从10.176.110.112:1111.

0 投票
2 回答
4044 浏览

apache-spark - Spark Structured Streaming 如何处理背压?

我正在分析 Spark Structured Streaming 的背压特性。有谁知道细节?是否可以通过代码调整进程传入记录?谢谢

0 投票
1 回答
1708 浏览

apache-spark - 为什么流数据集的 foreachPartition 会出错?

我正在从 Spark Streaming 迁移到 Structured Streaming,并且遇到以下代码的问题:

它出现以下错误AnalysisException

引起:org.apache.spark.sql.AnalysisException:带有流源的查询必须用writeStream.start();;

流式查询不foreachPartition支持?在这种情况下writeStream.foreach实施的唯一方法是什么?foreachPartition

我想避免发送每个事件,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批次中有 1000 个事件和 5 个分区,则并行生成 5 个请求,每个请求正文中有 200 个事件。

0 投票
3 回答
2798 浏览

apache-spark - Pyspark Structured Streaming Kafka 配置错误

我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https ://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

但是,我总是以以下错误告终:

在创建 ds1 时,我还尝试将其添加到我的选项集中:

但即使明确地为它分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。

我还尝试使用“assign”选项并获得相同的错误(我们的 Kafka 主机设置为分配 - 每个消费者只分配一个分区,并且我们没有任何重新平衡)。

知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流式处理的方法?或者这是唯一的网关?

0 投票
1 回答
1544 浏览

java - 带有结构化流的 S3 检查点

我已经尝试过Apache Spark (Structured Streaming) 中给出的建议:S3 Checkpoint support

我仍然面临这个问题。以下是我得到的错误

我有这样的东西作为我的代码的一部分

然后像这样使用检查点目录:

任何帮助表示赞赏。提前致谢!

0 投票
0 回答
517 浏览

apache-spark - Spark结构化流连接池

假设我定义了一些流,例如:

以及一些处理逻辑,例如:

正如我上面提到的,CustomForEachWriter负责消息处理和存储到 RDBMS 中。在最简单的方法中,我可以在CustomForEachWriter.

但我并不是很喜欢这种方法,所以我想知道如何引入连接池。通常是 Connection 并且DataSource不实现Serializable接口,所以我不能简单地传递从驱动程序上的连接池中检索到的 Connection(TaskNotSerialized将引发异常)。它还看起来当新数据到达时CustomEachWriter创建了新对象。

您能否向我解释在这种情况下结构化流媒体的正确方法是什么?

谢谢

亲切的问候

0 投票
1 回答
569 浏览

apache-spark - 火花结构化流无法从检查点位置开始

我正在使用结构化流功能和 Kafka 做一个简单的 Spark 程序。由于 Kafka 是源,因此有 2 个接收器:

  • 水槽 1- 控制台水槽——在所有情况下都能正常工作
  • 水槽 2 和 3 -H2 和 Ignite Foreach 水槽

第一次运行代码运行良好,但是当我使用检查点位置杀死并重新启动程序时,出现以下错误

我检查了源代码,我希望KafkaSourceparseFunction 方法可用,jar( ) 在类路径中可用。有关信息,我正在使用 Kafka 0.10.2.1 maven 依赖项。org.apache.spark.sql.execution.streaming.HDFSMetadataLogspark-sql_2.11-2.1.1.jar