问题标签 [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5157 浏览

apache-spark - 如何让 Spark Streaming 写入其输出以便 Impala 可以读取它?

我对 Spark Streaming API 有以下问题。我目前正在通过 Flume 将输入数据流式传输到 Spark Streaming,我计划使用它对数据进行一些预处理。然后,我想将数据保存到 Hadoop 的文件系统并使用 Impala 进行查询。但是,Spark 将数据文件写入单独的目录,并为每个 RDD 生成一个新目录。

这是一个问题,因为首先,Impala 中的外部表无法检测子目录,而只能检测它们指向的目录内的文件,除非分区。其次,Spark 添加新目录的速度如此之快,以至于在 Impala 中为每个生成的目录定期创建一个新分区对性能非常不利。另一方面,如果我选择增加 Spark 中写入的滚动间隔,以降低生成目录的频率,则会增加延迟,直到 Impala 可以读取传入的数据。这是不可接受的,因为我的系统必须支持实时应用程序。在 Hive 中,我可以使用以下设置配置外部表以检测子目录而无需分区:

但据我了解,Impala 没有这样的功能。

我目前正在使用以下代码从 Flume 读取数据并将其写入 HDFS:

这里,变量 path 决定了目录的前缀,文本文件(part-0000 等)被添加到该目录,目录名的其余部分是 Spark 生成的时间戳。我可以将代码更改为以下内容:

在这种情况下,文件将被添加到由路径确定的同一目录中,但由于它们始终命名为 part-00000、part-00001、part-00002 等,因此将覆盖先前生成的文件。在查看 Spark 的源代码时,我注意到文件的名称是由 SparkHadoopWriter 的 open() 方法中的一行确定的:

在我看来,没有办法通过 Spark API 操作 splitID。总而言之,我的问题如下:

  • 有什么方法可以让 Impala 中的外部表检测子目录?
  • 如果没有,是否有任何方法可以让 Spark 将其输出文件写入单个目录或以 Impala 立即读取的形式?
  • 如果没有,Spark 是否有任何类型的更新来解决这个问题,或者我应该只分支我自己的 Spark 版本,我可以用它来决定它自己编写的文件的名称?
0 投票
1 回答
3075 浏览

apache-spark - 用于 RDD 分析处理的具有大量流和模型的 Spark Streaming

我们正在使用 Spark Streaming 创建一个实时流处理系统,该系统使用大量(数百万)分析模型应用于许多不同类型的传入度量数据流(超过 100000)中的 RDD。此流是原始流或转换流。每个RDD都要经过一个分析模型进行处理。由于我们不知道哪个 Spark 集群节点将处理来自不同流的哪些特定 RDD,因此我们需要使所有这些模型在每个 Spark 计算节点上都可用。这将在每个 spark 节点上产生巨大的开销。我们正在考虑使用内存数据网格在 Spark 计算节点上提供这些模型。这是正确的方法吗?

或者

我们是否应该避免一起使用 Spark 流,而只使用 Redis(带有 pub/sub)之类的内存数据网格来解决这个问题。在这种情况下,我们会将数据流式传输到包含特定模型的特定 Redis 节点。当然,我们将不得不做所有的分箱/窗口等。

请建议。

0 投票
1 回答
2882 浏览

java - Spark Streaming 历史状态

我正在构建用于检测欺诈 ATM 卡交易的实时处理。为了有效地检测欺诈,逻辑需要通过卡的最后交易日期,每天(或过去 24 小时)的交易金额总和

用例之一是,如果在本国境外的卡交易超过该国家最后一次交易的 30 天,则发送警报作为可能的欺诈

因此试图将 Spark 流视为一种解决方案。为了实现这一点(可能我缺少关于函数式编程的想法),下面是我的伪代码

我在这里面临两个问题

1)如何进一步使用最后一个交易日期与同一张卡进行比较
2)如何保存数据,即使重新启动驱动程序,s2 的旧值也会恢复 3)updateStateByKey可以用来维持历史状态?

我想我缺少火花流/函数式编程的关键点,即如何实现这种逻辑。

0 投票
3 回答
1777 浏览

apache-spark - Spark Streaming 未将任务分配到集群上的节点

我有两个节点独立集群用于火花流处理。下面是我的示例代码,它演示了我正在执行的过程。

我的问题是 spark 没有将此状态 RDD 分配给多个节点或没有将任务分配给其他节点并导致响应的高延迟,我的输入负载约为每秒 100,000 个元组。

我已经尝试过以下事情,但没有任何效果

1)spark.locality.wait到 1 秒

2)减少分配给执行程序进程的内存以检查天气火花分发RDD或任务,但即使它超出了驱动器也在运行的第一个节点(m1)的内存限制。

3) 将 spark.streaming.concurrentJobs 从 1(默认)增加到 3

4) 我检查了流 ui 存储,状态 dstream RDD 大约有 20 个分区,都位于本地节点 m1 上。

如果我运行 SparkPi 100000,那么 spark 能够在几秒钟(30-40)后利用另一个节点,所以我确信我的集群配置很好。

编辑

我注意到的一件事是,即使对于我的 RDD,如果我设置存储级别 MEMORY_AND_DISK_SER_2 然后也在应用程序 ui 存储中显示Memory Serialized 1x Replicated

0 投票
3 回答
22937 浏览

java - 为什么启动 StreamingContext 失败并显示“IllegalArgumentException:需求失败:未注册输出操作,因此没有可执行的操作”?

我正在尝试使用 Twitter 作为源执行 Spark Streaming 示例,如下所示:

但我收到以下异常

任何建议如何解决这个问题?

0 投票
1 回答
1239 浏览

hadoop - How to maintain real-time when using Spark's stateful operation updateStateByKey

First the imaginary use case. Let's say I have a stream of tuples (user_id, time_stamp, login_ip). I want to maintain the last login IP of each user at 5 seconds granularity.

Using Spark streaming, I can use the updateStateByKey method to update this map. The problem is, as the stream of data keeps coming, the RDD of each time interval is becoming larger and larger because more user_ids are seen. After sometime, the map will become so large that maintaining it takes longer time, thus the real-time delivery of the result can not be achieved.

Note that this is just a simple example that I come up with to show the problem. Real problems could be more complicated and really need real-time delivery.

Any idea (In Spark as well as other solutions will all be good) on how to solve this problem?

0 投票
1 回答
1124 浏览

scala - 在 SPARK 中实现接收器

我一直在尝试为 SPARK 0.9 实现接收器。我已经使用 Jnetpcap 库捕获了数据包,需要将其传递给 Scala 中的 spark。在“def receive()”方法中写入数据包的捕获部分是否足够?

编辑:这是来自此链接的代码,它使用 Jnetpcap 库捕获数据包:

如何为使用此代码捕获的数据包实现火花接收器?

0 投票
2 回答
6633 浏览

scala - Spark Streaming 累计字数

这是一个用 scala 编写的 spark 流程序。它每 1 秒计算来自套接字的单词数。结果将是字数,例如,从时间 0 到 1 的字数,然后从时间 1 到 2 的字数。但我想知道是否有某种方法可以改变这个程序,以便我们可以累积字数?即从时间 0 到现在的字数。

0 投票
2 回答
20192 浏览

scala - 对于 DStream 中的每个 RDD,如何将其转换为数组或其他一些典型的 Java 数据类型?

我想将 DStream 转换为数组、列表等,然后我可以将其转换为 json 并在端点上提供它。我正在使用 apache spark,注入 twitter 数据。如何在 Dstream 上执行此操作statuses?除了 之外,我似乎什么也做不了print()

0 投票
1 回答
2862 浏览

scala - Spark Streaming 状态网络字数

这是 Spark 附带的示例代码。我在这里复制了代码,这是它的链接:https ://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala . 但是,当我尝试使用命令“bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999”运行程序时,出现以下错误:

****************代码********************

我想知道是不是因为它试图通过执行命令“ssc.checkpoint(".")”在我的本地文件系统上设置检查点,而该文件不是与 hadoop 兼容的文件?(该文件必须与 hadoop 兼容才能设置检查点)如果是,我该如何解决?谢谢!