问题标签 [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - Spark 结构化流批处理
我在 Spark 的结构化编程中运行批处理。下面的代码段会抛出错误,提示“kafka 不是有效的 Spark SQL 数据源;”。我使用的版本是--> spark-sql-kafka-0-10_2.10。感谢您的帮助。谢谢。
java - 获取异常“java.lang.NoSuchFieldError:METASTORE_CLIENT_SOCKET_LIFETIME”或“必填字段'client_protocol'未设置!”
我正在使用 Spark 2.1 的结构化流(内部使用 Hive 1.2.1 jar)。我正在尝试为 Hive 开发 ForEachWriter,以通过 JDBC 连接器将流数据写入 Hive。
在我的集群上,我有 Hive 1.1.0。
如果我使用版本 1.2.1 的 Hive jdbc jar,那么由于 Hive 的客户端和服务器版本不匹配,我会遇到异常:
如果我使用 1.1.0 版的 Hive jdbc jar,那么我会在 Spark 中遇到如下异常:
HiveContex
它在初始化t in时到来SparkSession
。
scala - Spark Structured Streaming - 处理每一行
我在 Spark 2.1.1 中使用结构化流。我需要将一些业务逻辑应用于传入消息(来自 Kafka 源)。
本质上,我需要获取消息,获取一些键值,在 HBase 中查找它们并在数据集上执行一些更多的业务逻辑。最终结果是需要写入另一个 Kafka 队列的字符串消息。
但是,由于传入消息的抽象是一个数据帧(无界表 - 结构化流),我必须遍历在触发期间收到的数据集mapPartitions
(由于 HBase 客户端不可序列化而导致的分区)。
在我的过程中,我需要遍历每一行以执行相同的业务流程。
- 有没有更好的方法可以帮助我避免
dataFrame.mapPartitions
打电话?我觉得它的顺序和迭代! - 结构化流基本上迫使我从我的业务流程中生成一个输出数据帧,而没有任何开始。我可以使用哪些其他设计模式来实现我的最终目标?
你会推荐一种替代方法吗?
scala - 无法使用 Spark 结构化流在 Parquet 文件中写入数据
我有一个 Spark 结构化流:
我想使用 DataStreamWriter 将数据写入 FileSystem,
但是在文件夹中创建了零个文件data
。只有_spark_metadata
正在被创建。
但是,我可以在控制台上看到数据format
是什么时候console
:
我无法理解其背后的原因。
火花 - 2.1.0
scala - 结构化流式传输 - 使用每条消息
当每条消息通过结构化流管道传输时,处理每条消息的“推荐”方式是什么(我在 spark 2.1.1 上,源是 Kafka 0.10.2.1)?
到目前为止,我正在查看dataframe.mapPartitions
(因为我需要连接到 HBase,其客户端连接类不可序列化,因此mapPartitions
)。
想法?
scala - 结构化流 - Foreach Sink
我基本上是从 Kafka 源读取信息,并将每条消息转储到我的foreach
处理器(感谢 Jacek 的简单示例页面)。
如果这确实有效,我将在process
此处的方法中实际执行一些业务逻辑,但是,这不起作用。我相信它println
不起作用,因为它在执行程序上运行,并且无法将这些日志返回给驱动程序。但是,这个insert into
临时表至少应该可以工作,并向我展示消息实际上已被消费并处理到接收器。
我在这里想念什么?
真的在寻找第二双眼睛来检查我的努力:
scala - 为什么我的结构化流在前几条消息后没有继续
对于我的 spark 2.1.1 和 Kafka 0.10.2.1 结构化流示例,我能够通过接收foreach
器工作。我的流源配置为每 10 秒推送 2 条消息。
我看到前几条消息通过接收foreach
器(打开 - 过程 - 关闭)构造很好。但是,在第一次推送之后,该进程不再从队列中读取?
有任何想法吗 ?
编写器实现:
在我的业务逻辑处理过程中,我需要将数据从 HBase 转换为数据帧以进行进一步处理。我这样做的尝试失败了。另一位绅士提到它不是“允许的”,因为那部分是在执行程序上运行的。
对此有何建议?
apache-spark - Spark Streaming IllegalStateException:此消费者已关闭
所以使用: - Spark Structured Streaming (2.1.0) - Kafka 0.10.2.0 - Scala 2.11
我正在使用 Kafka 的默认 API,所以基本上:
设置选项(通过 SSL)和一切。然后我明显地应用了一些动作等并启动流等(它运行正常)。但是有时它会引发异常:
任何提示为什么这会失败?
apache-spark - Spark on YARN + Secured hbase
我正在向连接到安全 hbase 集群的 YARN(在 spark 2.1.1 + kafka 0.10.2.1 上)提交作业。当我在“本地”模式下运行时(spark.master=local[*]),这项工作表现得很好。
但是,一旦我将 master 作为 YARN(并将部署模式作为客户端)提交作业,我就会看到以下错误消息 -
我正在关注 hortonworks 建议,以向纱线集群提供有关 HBase 和 keytab 等的信息。关注这篇 kb 文章 - https://community.hortonworks.com/content/supportkb/48988/how-to-run-spark-job-to -interact-with-secured-hbas.html
任何指针可能会发生什么?
登录HBase的机制:
另外,我尝试了另一种登录机制,如:
请建议。
java - 如何转换数据集要写入 Kafka 的 JSON 消息的 DataSet?
我使用 Spark 2.1.1。
我有以下DataSet<Row>
ds1;
(ds1.isStreaming
给true
)
我正在尝试生成DataSet<String>
ds2。换句话说,当我写信给卡夫卡水槽时,我想写这样的东西
我已经尝试过这样的事情,df2.toJSON().writeStream().foreach(new KafkaSink()).start()
但是它给出了以下错误
有to_json
,json_tuple
但是我不确定如何在这里利用它们?
我尝试了以下使用json_tuple()
功能
我收到以下错误:
无法解析“
result
”给定的输入列:[名称、比率、计数];;