问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Spark Structured Streaming - writeStream 每小时拆分数据
目前我们在生产环境中运行 Spark 流。我正在将代码转换为使用结构化流。我能够成功地从 Kinesis 读取数据并将(接收器)写入 S3 中的 Parquet 文件。
我们的业务逻辑要求,我们将流数据写入每小时文件夹中。来自 kinesis 的传入数据没有任何日期时间字段。所以不能按日期时间分区。我们定义了一个函数[getSubfolderNameFromDate()]
,它可以获取当前小时+日期(1822062017 - 一天中的第 18 小时,2017 年 6 月 22 日),因此我们可以在每小时文件夹中写入数据。
使用 Spark 流式传输,上下文会重新初始化并自动在下一小时文件夹中写入数据,但我无法通过结构化流式传输实现相同的目标。
例如,200 万条记录在一天中的第 4 个小时进行了流式传输,它应该被写入"S3_location/0422062017.parquet/"
,在接下来的一个小时内流式传输的数据应该在"S3_location/0522062017.parquet/"
等等。
使用结构化流,它可以全天连续写入同一个文件夹,我理解这是因为它只评估一次文件夹名称并连续将数据附加到它。但是我想将新数据附加到每小时文件夹中,有没有办法做到这一点?
我目前正在使用以下查询:
apache-spark - 使用结构化 Spark 流解码 Java 枚举/自定义非案例类
我正在尝试使用 Spark 2.1.1 中的结构化流来读取 Kafka 并解码 Avro 编码的消息。我有根据这个问题定义的 UDF 。
但是我得到以下失败。
它在这条线上失败
另一种方法是为我拥有的自定义类定义编码器
但是在注册 messageEncoder 时失败并出现以下错误。
当我尝试使用 a map
after执行此操作时load()
,出现以下编译错误。
这是否意味着我不能对 Java 枚举使用结构化流?它只能与原语或案例类一起使用吗?
我阅读了一些相关的问题1 , 2 , 3,似乎有可能为类指定自定义编码器,即 UDT 在 2.1 中被删除,并且没有添加新功能。
任何帮助将不胜感激。
apache-spark - 为什么 spark-submit 失败并显示“AnalysisException:kafka 不是有效的 Spark SQL 数据源”?
我使用 Spark 2.1.0 和 Kafka 0.10.2.1。
我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。
代码如下:
我的部署脚本如下:
但是我得到了错误:
我尝试将同一应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也尝试在客户端模式下使用纱线,我得到了同样的错误。
apache-spark - 如何从套接字读取流数据集?
下面的代码从套接字读取,但我没有看到任何输入进入工作。虽然我已经nc -l 1111
运行并转储数据,但不确定为什么我的 Spark 作业无法从10.176.110.112:1111
.
apache-spark - Spark Structured Streaming 如何处理背压?
我正在分析 Spark Structured Streaming 的背压特性。有谁知道细节?是否可以通过代码调整进程传入记录?谢谢
apache-spark - 为什么流数据集的 foreachPartition 会出错?
我正在从 Spark Streaming 迁移到 Structured Streaming,并且遇到以下代码的问题:
它出现以下错误AnalysisException
:
引起:org.apache.spark.sql.AnalysisException:带有流源的查询必须用writeStream.start();;
流式查询不foreachPartition
支持?在这种情况下writeStream.foreach
实施的唯一方法是什么?foreachPartition
我想避免发送每个事件,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批次中有 1000 个事件和 5 个分区,则并行生成 5 个请求,每个请求正文中有 200 个事件。
apache-spark - Pyspark Structured Streaming Kafka 配置错误
我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https ://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
但是,我总是以以下错误告终:
在创建 ds1 时,我还尝试将其添加到我的选项集中:
但即使明确地为它分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。
我还尝试使用“assign”选项并获得相同的错误(我们的 Kafka 主机设置为分配 - 每个消费者只分配一个分区,并且我们没有任何重新平衡)。
知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流式处理的方法?或者这是唯一的网关?
java - 带有结构化流的 S3 检查点
我已经尝试过Apache Spark (Structured Streaming) 中给出的建议:S3 Checkpoint support
我仍然面临这个问题。以下是我得到的错误
我有这样的东西作为我的代码的一部分
然后像这样使用检查点目录:
任何帮助表示赞赏。提前致谢!
apache-spark - Spark结构化流连接池
假设我定义了一些流,例如:
以及一些处理逻辑,例如:
正如我上面提到的,CustomForEachWriter
负责消息处理和存储到 RDBMS 中。在最简单的方法中,我可以在CustomForEachWriter
.
但我并不是很喜欢这种方法,所以我想知道如何引入连接池。通常是 Connection 并且DataSource
不实现Serializable
接口,所以我不能简单地传递从驱动程序上的连接池中检索到的 Connection(TaskNotSerialized
将引发异常)。它还看起来当新数据到达时CustomEachWriter
创建了新对象。
您能否向我解释在这种情况下结构化流媒体的正确方法是什么?
谢谢
亲切的问候
apache-spark - 火花结构化流无法从检查点位置开始
我正在使用结构化流功能和 Kafka 做一个简单的 Spark 程序。由于 Kafka 是源,因此有 2 个接收器:
- 水槽 1- 控制台水槽——在所有情况下都能正常工作
- 水槽 2 和 3 -H2 和 Ignite Foreach 水槽
第一次运行代码运行良好,但是当我使用检查点位置杀死并重新启动程序时,出现以下错误
我检查了源代码,我希望KafkaSource
parseFunction 方法可用,jar( ) 在类路径中可用。有关信息,我正在使用 Kafka 0.10.2.1 maven 依赖项。org.apache.spark.sql.execution.streaming.HDFSMetadataLog
spark-sql_2.11-2.1.1.jar