问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
75 浏览

apache-spark - 从 Hbase 读取 + 转换为 DF + 运行 SQL

编辑 我的用例是一个 Spark 流应用程序(spark 2.1.1 + Kafka 0.10.2.1),其中我从 Kafka 读取,并且每个消息/触发器都需要从 HBase 中提取数据。发布拉取,我需要对数据运行一些 SQL 语句(从 HBase 收到)

自然,我打算将处理(从 HBase 和 SQL 执行读取)推送到工作节点以实现并行性。

到目前为止,我尝试将数据从 HBase 转换为数据框(以便我可以启动 SQK 语句)都失败了。另一位绅士提到它不是“允许的”,因为那部分是在执行程序上运行的。但是,这是我有意识地选择在工作节点上运行这些部分。

这是健全的想法吗?如果不是,为什么不呢?

对此有何建议?还是整体思路?

0 投票
1 回答
574 浏览

apache-spark - 如何分别处理 Kafka 分区并与 Spark 执行器并行处理?

我使用 Spark 2.1.1。

我使用结构化流从 2 个 Kafka 分区读取消息。我将我的应用程序提交到 Spark Standalone 集群,其中包含一个工作人员和 2 个执行程序(每个 2 个核心)。

我想要这样的功能,来自每个 Kafka 分区的消息应该由每个单独的执行程序独立处理。但是现在发生的事情是,执行程序分别读取和映射分区数据,但是映射后形成的无界表是常用的并且具有来自两个分区的数据。

当我在表上运行结构化查询时,查询必须处理来自两个分区的数据(更多数据量)。

Kafka分区在哪里Product_id

有没有办法从执行器映射到的 Kafka 分区并行但单独地对数据运行相同的结构化查询?

0 投票
2 回答
2311 浏览

apache-spark - 写入完成后如何处理 HDFS 目录中的新文件?

在我的场景中,我将 CSV 文件连续上传到 HDFS。

上传新文件后,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为parquet)。即我在每个输入文件和转换/处理的输出文件之间有一个一对一的映射。

我正在评估 Spark Streaming 以监听 HDFS 目录,然后使用 Spark 处理“流文件”。

但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间的端到端一对一映射。

如何转换整个文件而不是它的微批次?

据我所知,Spark Streaming 只能将转换应用于批处理(DStreams映射到RDDs),而不是一次应用于整个文件(当其有限流完成时)。

那是对的吗?如果是这样,我应该为我的场景考虑什么替代方案?

0 投票
2 回答
1136 浏览

python - Spark结构化流-python-org.apache.kafka.common.TopicPartition;类对反序列化无效

我正在尝试执行以下火花流示例代码。 https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py

我正在使用 Spark 版本 2.0.2 的 AWS EMR 集群上运行它。下面的依赖项被添加到 spark submit 中。

  • spark-sql-kafka-0-10_2.11-2.0.2.jar
  • spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar
  • kafka-clients-0.10.2.0.jar

以下是错误日志:

有没有人遇到过类似的问题并有解决方案?

当我删除spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar时,出现以下错误

0 投票
2 回答
2828 浏览

scala - 附加模式下水印聚合查询的空输出

我使用 Spark 2.2.0-rc1。

我有一个 Kafka ,我正在查询一个正在运行的带有topic水印的聚合,带有1 minute水印,输出模式。consoleappend

我在 Kafka 中推送以下数据topic

我得到以下输出:

这是预期的行为吗?

0 投票
2 回答
3221 浏览

scala - 发送 Row.empty 时在 Spark 结构化流中获取 ArrayIndexOutOfBounds 异常

我是一个名为 Kafka 的主题test,我在其中发送字符串消息。之后,我通过 Spark Structured Streaming 根据某些条件过滤这些消息。像这样:

但是,一旦我发送不符合条件的消息,即{"message":"he"},它就会给我以下错误:

我不明白为什么我在ArrayIndexOutOfBounds这里遇到异常。如果数据为空,那么我应该得到一个 empty DataFrame/Dataset,而不是异常。

这是预期的行为吗?

0 投票
1 回答
1140 浏览

apache-spark - 使用 selectExpr 时从 spark 2.1.1 中的 kafka 读取异常

我正在运行 spark 提供的默认示例来计算来自 Kafka 流的单词。

这是我正在运行的代码:

在我的 pom.xml 文件中,我添加了以下依赖项:

我使用以下命令提交代码以触发:

运行时出现以下异常:

例外:

请帮助我解决此异常。谢谢!

0 投票
1 回答
395 浏览

apache-spark - Spark 2.1.1:如何将变量绑定到结构化流查询

我想使用变量来选择两个变量值范围之间的条目。我的 SQL 查询是String sql = "Select count(*) FROM Records WHERE event_time <= UPPERTIME('1') AND event_time >= LOWERTIME('1')";. 在此查询中UPPERTIME('1'),并且LOWERTIME('1')是 UDF,并且定义是

传递给 UDF 的参数是虚拟的,我实际上正在返回全局变量“upperTime 和 lowerTime”。

当我运行上述查询时,它会显示表中所有条目的计数,但根据条件,它应该显示对应于给定范围内的条目数的计数。出了什么问题?

0 投票
0 回答
1015 浏览

apache-spark - 结构化流应用程序崩溃

所以,我有一个结构化的流应用程序,它以 5 分钟的触发间隔从 S3 读取 3 个 csv 文件。对数据帧执行基本分组,然后使用 foreach 输出将记录异步写入外部数据存储。大约 4-5 个间隔后,火花作业崩溃,这是我在日志中发现的:有没有人有解决方案?如果没有,您调试它的第一步是什么?

0 投票
1 回答
81 浏览

apache-spark - 如何将可以在运行时定义的规则应用于流数据集?

不确定标题是否适合我想要实现的目标,所以请耐心等待。

我将从定义我的用例开始:

许多(比如数百万)物联网设备正在向我的 Spark 流发送数据。这些设备每 10 秒发送一次当前温度水平。所有这些物联网设备的所有者都可以定义预设规则,例如:如果温度 > 50 则执行某项操作。

我试图弄清楚如果在某个时间段内超过 50 个标准,我是否可以输出这些设备中有多少符合此条件。问题是规则是实时定义的,应该实时应用于 Spark 作业。

我该怎么做。Spark 是适合这项工作的工具吗?

非常感谢