问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
363 浏览

apache-spark - 结构化流式作业不使用所有工作人员

我有一个 Spark 2.0.2 结构化流作业连接到 Apache Kafka 数据流作为源。这项工作从 Kafka 中获取 Twitter 数据 (JSON),并使用 CoreNLP 对数据进行注释,例如情绪、词性标记等。它与local[*]master 配合得很好。但是,当我设置一个独立的 Spark 集群时,只有一个工作人员习惯于处理数据。我有两个具有相同能力的工人。

提交我缺少的工作时是否需要设置一些东西。我试过--num-executors在我的spark-submit命令中设置,但我没有运气。

提前感谢您指向正确方向的指针。

0 投票
8 回答
12372 浏览

apache-spark - Multiple aggregations in Spark Structured Streaming

I would like to do multiple aggregations in Spark Structured Streaming.

Something like this:

  • Read a stream of input files (from a folder)
  • Perform aggregation 1 (with some transformations)
  • Perform aggregation 2 (and more transformations)

When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".

Is there a way to do such multiple aggregations in Structured Streaming?

0 投票
1 回答
523 浏览

apache-spark - 如何处理 Spark Structured Streaming 中已删除(或更新)的行?

如果我想count知道有多少人在积极工作"Coca-Cola",我会使用以下查询:

这在批处理模式下工作正常。

但是,假设该company领域person随着时间的推移而发生变化,或者假设人们完全被移除Dataset,我如何才能使用结构化流式处理,所以count仍然正确?

AFAIK 结构化流式处理假设数据源是仅附加的:这是否意味着我需要将删除和更新作为单独的数据源进行跟踪,并自己合并它们?

0 投票
1 回答
1027 浏览

apache-spark - Spark Structured Streaming 中未从 S3 获取新数据

我正在尝试从 Spark Structured Streaming 中的 S3 存储桶读取数据。下面的代码用于获取现有数据。但是,当新数据添加到存储桶时,Spark 不会选择这个。

我怎样才能使这项工作来获取新数据?或者,这是一个还不支持的功能吗?

0 投票
1 回答
788 浏览

apache-spark - Spark 结构化流

如何通过单个作业在 Kafka 流上运行多个流式 SQL 查询。结构化流媒体是一种可靠的前进方式。例如,我在单个作业中对流运行 10 个查询。假设我只想运行 9 个查询,有没有办法动态更改每次运行时从商店运行的查询。我希望在每次执行流式查询(即连续查询)时从存储中动态选择查询。

0 投票
1 回答
1219 浏览

scala - 来自kafka的Spark结构化蒸汽-从检查点恢复后再次处理最后一条消息

我正在使用 Spark 2.0.2 的全新(并标记为“alpha”)结构化流来读取来自 kafka 主题的消息并从中更新几个 cassandra 表:

我还在 sparkSession 上配置了一个检查点位置(“spark.sql.streaming.checkpointLocation”)。这使我可以在流媒体应用程序恢复时立即接收到达的消息。

但是,自从配置了这个检查点位置后,我注意到在恢复时它也始终如一地处理前一批的最后一条消息,即使它已经被正确处理而没有失败。

知道我在这里做错了什么吗?这似乎是一个非常常见的用例。

更多信息:

请参阅此处的相关日志(主题 5876 是上一批成功处理的最后一个主题):

此外,当我终止流时,我确保它被优雅地停止以避免数据丢失:

0 投票
8 回答
33465 浏览

scala - 为什么 Spark 应用程序失败并显示“ClassNotFoundException:无法找到数据源:kafka”作为带有 sbt 程序集的 uber-jar?

我正在尝试运行像StructuredKafkaWordCount这样的示例。我从Spark Structured Streaming Programming guide开始。

我的代码是

我添加了以下 sbt 文件:

构建.sbt:

我还添加了 project/assembly.sbt

这将创建一个带有非罐子的 Uberprovided罐子。

我提交以下行:

但我得到这个运行时错误:

有没有办法知道找不到哪个类,以便我可以在 maven.org 存储库中搜索该类。

lookupDataSource源代码似乎位于https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource的第543 行。 scala,但我找不到与 Kafka 数据源的直接链接......

完整的源代码在这里:https ://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

0 投票
1 回答
6386 浏览

apache-spark - TypeError:'Builder' 对象不可调用 Spark 结构化流

在运行 python spark 结构化流
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html的编程指南[链接]中给出的示例

我得到以下错误:
TypeError:'Builder'对象不可调用

错误 :

0 投票
1 回答
1394 浏览

apache-spark - 使用套接字的 Spark 结构化流,设置 SCHEMA,在控制台中显示 DATAFRAME

如何DataFrame在 PySpark 中为流式传输设置模式。

例如我需要一个像这样的表:

如何将标头/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。

另外,是否可以连续显示此表,或者说DataFrame. 当我尝试它时,我得到了错误

"pyspark.sql.utils.AnalysisException: '当流式 DataFrames/Datasets 上没有流式聚合时,不支持完整输出模式;;\nProject"

0 投票
1 回答
640 浏览

apache-spark - 为什么 Spark DataFrames 不更改其架构以及如何处理?

我正在使用 Spark 2.1 的结构化流从内容是二进制 avro 编码的 Kafka 主题中读取。

因此,设置后DataFrame

如果我打印这个DataFrame( messages.printSchema()) 的模式,我会得到以下信息:

这个问题应该与 avro-decoding 的问题是正交的,但是让我们假设我想以某种方式将value消息中的内容转换DataFrameDataset[BusinessObject], 通过 function Array[Byte] => BusinessObject。例如完整性,函数可能只是(使用avro4s):

当然,正如 miguno 在这个相关问题中所说,我不能只用 a 应用转换DataFrame.map(),因为我需要为这样的 a 提供一个隐式编码器BusinessObject

这可以定义为:

现在,执行地图:

但是,如果我查询新架构,我会得到以下信息:

而且我认为这没有任何意义,因为数据集应该使用BusinessObject案例类的 Product 属性并获得正确的值。

.schema(StructType)在阅读器中看到了一些关于 Spark SQL 使用的示例,但我不能这样做,不仅仅是因为我使用的是readStream,而是因为我实际上必须先转换列才能在这些字段中操作。

我希望告诉 Spark SQL 引擎,transformedMessagesDataset 模式是StructField带有案例类字段的。