问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 结构化流式作业不使用所有工作人员
我有一个 Spark 2.0.2 结构化流作业连接到 Apache Kafka 数据流作为源。这项工作从 Kafka 中获取 Twitter 数据 (JSON),并使用 CoreNLP 对数据进行注释,例如情绪、词性标记等。它与local[*]
master 配合得很好。但是,当我设置一个独立的 Spark 集群时,只有一个工作人员习惯于处理数据。我有两个具有相同能力的工人。
提交我缺少的工作时是否需要设置一些东西。我试过--num-executors
在我的spark-submit
命令中设置,但我没有运气。
提前感谢您指向正确方向的指针。
apache-spark - Multiple aggregations in Spark Structured Streaming
I would like to do multiple aggregations in Spark Structured Streaming.
Something like this:
- Read a stream of input files (from a folder)
- Perform aggregation 1 (with some transformations)
- Perform aggregation 2 (and more transformations)
When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".
Is there a way to do such multiple aggregations in Structured Streaming?
apache-spark - 如何处理 Spark Structured Streaming 中已删除(或更新)的行?
如果我想count
知道有多少人在积极工作"Coca-Cola"
,我会使用以下查询:
这在批处理模式下工作正常。
但是,假设该company
领域person
随着时间的推移而发生变化,或者假设人们完全被移除Dataset
,我如何才能使用结构化流式处理,所以count
仍然正确?
AFAIK 结构化流式处理假设数据源是仅附加的:这是否意味着我需要将删除和更新作为单独的数据源进行跟踪,并自己合并它们?
apache-spark - Spark Structured Streaming 中未从 S3 获取新数据
我正在尝试从 Spark Structured Streaming 中的 S3 存储桶读取数据。下面的代码用于获取现有数据。但是,当新数据添加到存储桶时,Spark 不会选择这个。
我怎样才能使这项工作来获取新数据?或者,这是一个还不支持的功能吗?
apache-spark - Spark 结构化流
如何通过单个作业在 Kafka 流上运行多个流式 SQL 查询。结构化流媒体是一种可靠的前进方式。例如,我在单个作业中对流运行 10 个查询。假设我只想运行 9 个查询,有没有办法动态更改每次运行时从商店运行的查询。我希望在每次执行流式查询(即连续查询)时从存储中动态选择查询。
scala - 来自kafka的Spark结构化蒸汽-从检查点恢复后再次处理最后一条消息
我正在使用 Spark 2.0.2 的全新(并标记为“alpha”)结构化流来读取来自 kafka 主题的消息并从中更新几个 cassandra 表:
我还在 sparkSession 上配置了一个检查点位置(“spark.sql.streaming.checkpointLocation”)。这使我可以在流媒体应用程序恢复时立即接收到达的消息。
但是,自从配置了这个检查点位置后,我注意到在恢复时它也始终如一地处理前一批的最后一条消息,即使它已经被正确处理而没有失败。
知道我在这里做错了什么吗?这似乎是一个非常常见的用例。
更多信息:
请参阅此处的相关日志(主题 5876 是上一批成功处理的最后一个主题):
此外,当我终止流时,我确保它被优雅地停止以避免数据丢失:
scala - 为什么 Spark 应用程序失败并显示“ClassNotFoundException:无法找到数据源:kafka”作为带有 sbt 程序集的 uber-jar?
我正在尝试运行像StructuredKafkaWordCount这样的示例。我从Spark Structured Streaming Programming guide开始。
我的代码是
我添加了以下 sbt 文件:
构建.sbt:
我还添加了 project/assembly.sbt
这将创建一个带有非罐子的 Uberprovided
罐子。
我提交以下行:
但我得到这个运行时错误:
有没有办法知道找不到哪个类,以便我可以在 maven.org 存储库中搜索该类。
lookupDataSource
源代码似乎位于https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource的第543 行。 scala,但我找不到与 Kafka 数据源的直接链接......
完整的源代码在这里:https ://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
apache-spark - TypeError:'Builder' 对象不可调用 Spark 结构化流
在运行 python spark 结构化流
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html的编程指南[链接]中给出的示例
我得到以下错误:
TypeError:'Builder'对象不可调用
错误 :
apache-spark - 使用套接字的 Spark 结构化流,设置 SCHEMA,在控制台中显示 DATAFRAME
如何DataFrame
在 PySpark 中为流式传输设置模式。
例如我需要一个像这样的表:
如何将标头/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。
另外,是否可以连续显示此表,或者说DataFrame
. 当我尝试它时,我得到了错误
"pyspark.sql.utils.AnalysisException: '当流式 DataFrames/Datasets 上没有流式聚合时,不支持完整输出模式;;\nProject"
apache-spark - 为什么 Spark DataFrames 不更改其架构以及如何处理?
我正在使用 Spark 2.1 的结构化流从内容是二进制 avro 编码的 Kafka 主题中读取。
因此,设置后DataFrame
:
如果我打印这个DataFrame
( messages.printSchema()
) 的模式,我会得到以下信息:
这个问题应该与 avro-decoding 的问题是正交的,但是让我们假设我想以某种方式将value
消息中的内容转换DataFrame
为Dataset[BusinessObject]
, 通过 function Array[Byte] => BusinessObject
。例如完整性,函数可能只是(使用avro4s):
当然,正如 miguno 在这个相关问题中所说,我不能只用 a 应用转换DataFrame.map()
,因为我需要为这样的 a 提供一个隐式编码器BusinessObject
。
这可以定义为:
现在,执行地图:
但是,如果我查询新架构,我会得到以下信息:
而且我认为这没有任何意义,因为数据集应该使用BusinessObject
案例类的 Product 属性并获得正确的值。
我.schema(StructType)
在阅读器中看到了一些关于 Spark SQL 使用的示例,但我不能这样做,不仅仅是因为我使用的是readStream
,而是因为我实际上必须先转换列才能在这些字段中操作。
我希望告诉 Spark SQL 引擎,transformedMessages
Dataset 模式是StructField
带有案例类字段的。