问题标签 [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
973 浏览

scala - Twitter Spark 流过滤:任务不可序列化异常

我正在尝试构建一个 Spark 应用程序,它将根据我拥有的单词列表过滤 Twitter 流媒体提要。我的列表中有大约 8000 个单词(Twitter 过滤 API 最多支持 400 个单词)。我想将每条传入的推文标记为单词,然后检查这个单词是否存在于我的列表中。如果推文中的任何单词在列表中,那么我应该打印推文,否则拒绝它。

我编写了以下代码来实现这一点(我在spark-shell上一次复制粘贴以下代码几行,这是测试/运行我的代码的正确方法吗?):

但是在运行这个我得到以下异常:

但是当我只是做一个简单的映射时,
val statuses = tweets.dstream.map(status => status.getText)
它工作得很好。

有人可以帮我解决我做错了什么吗?

0 投票
2 回答
4933 浏览

scala - Spark rdd 写入 Hbase

我可以使用以下代码读取来自 Kafka 的消息:

但是,我正在尝试从 Kafka 读取每条消息并将其放入 HBase。这是我写入 HBase 的代码,但没有成功。

0 投票
2 回答
25659 浏览

java - 无法解析主 URL:'spark:http://localhost:18080'

当我尝试运行我的代码时,它会抛出这个Exception

这是我的代码:

知道如何解决这个问题吗?

0 投票
1 回答
720 浏览

scala - 检测火花流中丢失的连接

我目前正在使用 apache spark 流。我想知道如何检测与外部数据源的连接是否丢失,以便我们可以停止流式传输并重新连接到数据源。

提前感谢您的帮助

0 投票
1 回答
991 浏览

java - 不同元素的火花流和批处理模式之间的代码重用

我是 Spark 新手,我想使用 spark 流和 spark 批处理实现 lambda 架构。

在网上看了一下,找到了以下文章:

http://blog.cloudera.com/blog/2014/08/building-lambda-architecture-with-spark-streaming/

这对我的一些分析来说很好,但我认为这个解决方案在必须找到不同元素的情况下是不可行的。

如果要在 JavaRDD 上查找不同的元素,可以使用 distinct 方法。DStreams 是一组 RDD,所以如果你申请

在 Dstream 上的方法,您将在流的每个 rdd 上执行 distinct,因此您将在每个 RDD 中找到不同的元素,而不是在整个 DStream 上。

可能这样写有点混乱,所以让我用一个例子来澄清一下:

我有以下元素:

在批处理应用程序中:

子 RDD 将包含:

如果我理解正确,这应该是流的行为:

假设我们有一个 1s 的批处理时间和一个 2s 的窗口:

第一个RDD:

第二个RDD:

最终会得到 2 个 Rdds:首先:

第二:

这是对 RDD 的独特尊重,而不是对 DStream 的尊重。

我对 Streaming 部分的解决方案如下:

这样的结果是:

作为批处理模式。但是,此解决方案将需要维护开销,并且存在因重复代码库而导致错误的风险。

有没有更好的方法来尽可能多地重用批处理模式的代码来达到相同的结果?

提前致谢。

0 投票
2 回答
3190 浏览

java - 如何只编译 Spark Core 和 Spark Streaming(以便我可以获得 Streaming 的单元测试实用程序)?

我目前正在开发一个 Spark Streaming 应用程序并尝试编写我的第一个单元测试。我在这个应用程序中使用了 Java,我还需要使用 Java(和 JUnit)来编写单元测试。

我找不到任何专注于 Spark Streaming 单元测试的文档,我只能找到 Spark Streaming 源代码中基于 Java 的单元测试:

https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

这取决于一个 Scala 文件:

https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala

反过来,这取决于 Scala 测试文件

https://github.com/apache/spark/tree/branch-1.1/streaming/src/test/scala/org/apache/spark/streaming

所以我认为我可以获取 Spark 源代码,切换到 branch-1.1 分支,然后只编译“核心”和“流”模块,希望最终得到流测试实用程序的编译类(或 jar 文件),所以我可以将它们导入基于 Java 的 Spark Streaming 应用程序中。

但是,尝试通过以下命令行构建它失败了:

您可以在此消息的末尾看到完整的输出。

任何想法如何进步?

构建的完整输出:

0 投票
1 回答
2261 浏览

cassandra - spark-streaming:如何将流数据输出到 cassandra

我正在使用 spark-streaming 阅读 kafka 流式消息。现在我想将 Cassandra 设置为我的输出。我在 cassandra "test_table" 中创建了一个表,其中包含 "key:text primary key" 和 "value:text" 列我已成功将数据映射到JavaDStream<Tuple2<String,String>> data如下所示:

然后我创建了一个列表:

其中 TestTable 是我的自定义类,具有与我的 Cassandra 表相同的结构,成员为“key”和“value”:

请建议一种如何将数据添加JavaDStream<Tuple2<String,String>> dataList<TestTable> list. 我这样做是为了以后可以使用

将 RDD 数据保存到 Cassandra。

我试过这样编码:

但似乎发生了一些类型不匹配。请帮忙。

0 投票
1 回答
3901 浏览

apache-spark - apache spark streaming - kafka - 阅读旧消息

我正在尝试使用火花流读取来自 Kafka 的旧消息。但是,我只能在实时发送消息时检索它们(即,如果我填充新消息,而我的 spark 程序正在运行 - 然后我会收到这些消息)。

我正在更改我的 groupID 和 consumerID 以确保 zookeeper 不只是不提供它知道我的程序以前见过的消息。

假设 spark 将 zookeeper 中的偏移量视为 -1,它不应该读取队列中的所有旧消息吗?我只是误解了 kafka 队列的使用方式吗?我对火花和卡夫卡很陌生,所以我不能排除我只是误解了一些东西。

运行此程序时,我将看到以下消息。所以我相信这不仅仅是因为设置了偏移量而没有看到消息。

2005 年 14 月 12 日 13:34:08 信息 ConsumerFetcherManager:[ConsumerFetcherManager-1417808045047] 为分区 ArrayBuffer([[testtopic,0],initOffset -1 到代理 id:1,主机:test-spark02.vpc,端口: 9092] , [[testtopic,1], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host: test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1到代理 id:1,host:test-spark02.vpc,port:9092])

然后,如果我填充 1000 条新消息——我可以看到这 1000 条消息保存在我的临时目录中。但是我不知道如何阅读现有的消息,这些消息应该在(此时)数万。

0 投票
1 回答
2746 浏览

java - 如何让 Spark Streaming 在单元测试中计算文件中的单词?

我已经成功地在 Java 中构建了一个非常简单的 Spark Streaming 应用程序,它基于Scala 中的 HdfsCount 示例

当我将此应用程序提交到本地 Spark 时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印字数。我按 Ctrl+C 终止应用程序。

现在我尝试为这个功能创建一个非常基本的单元测试,但在测试中我无法打印相同的信息,即字数。

我错过了什么?

下面是单元测试文件,之后我还包含了显示 countWords 方法的代码片段:

StarterAppTest.java

该测试编译并开始运行,Spark Streaming 在控制台上打印了很多诊断消息,但调用wordCounts.print()不打印任何内容,而在 StarterApp.java 本身中,它们会打印。

我也尝试过添加ssc.awaitTermination();ssc.start()但在这方面没有任何改变。之后,我还尝试在此 Spark Streaming 应用程序正在检查的目录中手动创建一个新文件,但这次它给出了错误。

为了完整起见,下面是 wordCounts 方法:

0 投票
1 回答
1580 浏览

java - Spark Streaming saveastextfile 操作,part0000 文件是什么

当我运行 saveAsTextFiles("prefix","postfix"); 对于 JavaDStreams,我发现创建了多个部分文件。

我想了解每个零件文件的含义?

这些文件具有以下命名,/part0000、part0001、part0002

创建用于输出这些文件的基于行的 RDD 平面图的代码

我的假设是,这是为每个 RDD= 1 行创建一个 RDD 的 DStream,因此应该创建 1 个部分文件。