问题标签 [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
426 浏览

apache-spark - Spark 结构化流批处理

我在 Spark 的结构化编程中运行批处理。下面的代码段会抛出错误,提示“kafka 不是有效的 Spark SQL 数据源;”。我使用的版本是--> spark-sql-kafka-0-10_2.10。感谢您的帮助。谢谢。

0 投票
1 回答
1803 浏览

java - 获取异常“java.lang.NoSuchFieldError:METASTORE_CLIENT_SOCKET_LIFETIME”或“必填字段'client_protocol'未设置!”

我正在使用 Spark 2.1 的结构化流(内部使用 Hive 1.2.1 jar)。我正在尝试为 Hive 开发 ForEachWriter,以通过 JDBC 连接器将流数据写入 Hive。

在我的集群上,我有 Hive 1.1.0。

如果我使用版本 1.2.1 的 Hive jdbc jar,那么由于 Hive 的客户端和服务器版本不匹配,我会遇到异常:

如果我使用 1.1.0 版的 Hive jdbc jar,那么我会在 Spark 中遇到如下异常:

HiveContex它在初始化t in时到来SparkSession

0 投票
2 回答
3811 浏览

scala - Spark Structured Streaming - 处理每一行

我在 Spark 2.1.1 中使用结构化流。我需要将一些业务逻辑应用于传入消息(来自 Kafka 源)。

本质上,我需要获取消息,获取一些键值,在 HBase 中查找它们并在数据集上执行一些更多的业务逻辑。最终结果是需要写入另一个 Kafka 队列的字符串消息。

但是,由于传入消息的抽象是一个数据帧(无界表 - 结构化流),我必须遍历在触发期间收到的数据集mapPartitions(由于 HBase 客户端不可序列化而导致的分区)。

在我的过程中,我需要遍历每一行以执行相同的业务流程。

  1. 有没有更好的方法可以帮助我避免dataFrame.mapPartitions打电话?我觉得它的顺序和迭代!
  2. 结构化流基本上迫使我从我的业务流程中生成一个输出数据帧,而没有任何开始。我可以使用哪些其他设计模式来实现我的最终目标?

你会推荐一种替代方法吗?

0 投票
2 回答
4015 浏览

scala - 无法使用 Spark 结构化流在 Parquet 文件中写入数据

我有一个 Spark 结构化流:

我想使用 DataStreamWriter 将数据写入 FileSystem,

但是在文件夹中创建了零个文件data。只有_spark_metadata正在被创建。

但是,我可以在控制台上看到数据format是什么时候console

我无法理解其背后的原因。

火花 - 2.1.0

0 投票
1 回答
463 浏览

scala - 结构化流式传输 - 使用每条消息

当每条消息通过结构化流管道传输时,处理每条消息的“推荐”方式是什么(我在 spark 2.1.1 上,源是 Kafka 0.10.2.1)?

到目前为止,我正在查看dataframe.mapPartitions(因为我需要连接到 HBase,其客户端连接类不可序列化,因此mapPartitions)。

想法?

0 投票
1 回答
6349 浏览

scala - 结构化流 - Foreach Sink

我基本上是从 Kafka 源读取信息,并将每条消息转储到我的foreach处理器(感谢 Jacek 的简单示例页面)。

如果这确实有效,我将在process此处的方法中实际执行一些业务逻辑,但是,这不起作用。我相信它println不起作用,因为它在执行程序上运行,并且无法将这些日志返回给驱动程序。但是,这个insert into临时表至少应该可以工作,并向我展示消息实际上已被消费并处理到接收器。

我在这里想念什么?

真的在寻找第二双眼睛来检查我的努力:

0 投票
0 回答
323 浏览

scala - 为什么我的结构化流在前几条消息后没有继续

对于我的 spark 2.1.1 和 Kafka 0.10.2.1 结构化流示例,我能够通过接收foreach器工作。我的流源配置为每 10 秒推送 2 条消息。

我看到前几条消息通过接收foreach器(打开 - 过程 - 关闭)构造很好。但是,在第一次推送之后,该进程不再从队列中读取?

有任何想法吗 ?

编写器实现:

在我的业务逻辑处理过程中,我需要将数据从 HBase 转换为数据帧以进行进一步处理。我这样做的尝试失败了。另一位绅士提到它不是“允许的”,因为那部分是在执行程序上运行的。

对此有何建议?

0 投票
1 回答
1676 浏览

apache-spark - Spark Streaming IllegalStateException:此消费者已关闭

所以使用: - Spark Structured Streaming (2.1.0) - Kafka 0.10.2.0 - Scala 2.11

我正在使用 Kafka 的默认 API,所以基本上:

设置选项(通过 SSL)和一切。然后我明显地应用了一些动作等并启动流等(它运行正常)。但是有时它会引发异常:

任何提示为什么这会失败?

0 投票
1 回答
5112 浏览

apache-spark - Spark on YARN + Secured hbase

我正在向连接到安全 hbase 集群的 YARN(在 spark 2.1.1 + kafka 0.10.2.1 上)提交作业。当我在“本地”模式下运行时(spark.master=local[*]),这项工作表现得很好。

但是,一旦我将 master 作为 YARN(并将部署模式作为客户端)提交作业,我就会看到以下错误消息 -

我正在关注 hortonworks 建议,以向纱线集群提供有关 HBase 和 keytab 等的信息。关注这篇 kb 文章 - https://community.hortonworks.com/content/supportkb/48988/how-to-run-spark-job-to -interact-with-secured-hbas.html

任何指针可能会发生什么?

登录HBase的机制:

另外,我尝试了另一种登录机制,如:

请建议。

0 投票
1 回答
1843 浏览

java - 如何转换数据集要写入 Kafka 的 JSON 消息的 DataSet?

我使用 Spark 2.1.1。

我有以下DataSet<Row>ds1;

ds1.isStreamingtrue

我正在尝试生成DataSet<String>ds2。换句话说,当我写信给卡夫卡水槽时,我想写这样的东西

我已经尝试过这样的事情,df2.toJSON().writeStream().foreach(new KafkaSink()).start()但是它给出了以下错误

to_jsonjson_tuple但是我不确定如何在这里利用它们?


我尝试了以下使用json_tuple()功能

我收到以下错误:

无法解析“ result”给定的输入列:[名称、比率、计数];;