问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
494 浏览

python - SparkStreaming 应用程序太慢

在开发 SparkStreaming 应用程序 (python) 时,我不完全确定我是否理解它的工作原理。我只需要读取一个 json 文件流(弹出一个目录)并对每个 json 对象和一个引用执行连接操作,然后将其写回文本文件。这是我的代码:

这是我的火花配置:

好吧,它正在工作,但我有一个问题:流程很慢,流程延迟正在增加。我在local[*]工作,怕是没有并行性……在监控UI中,一次只能看到一个executor和一个job。有没有更简单的方法来做到这一点?就像DStream 上的转换功能一样?是否有我缺少的配置变量?

0 投票
1 回答
826 浏览

json - Kafka 主题到 Spark Streaming DStream,如何获取 Json

我正在尝试使用 Spark Streaming 从 Kafka 主题中获取信息,然后解析我在该主题中获得的 json。为了在 DStream 中获取主题,我使用 stringReader,然后使用 foreach 从 DStream 中获取每个 RDD:

为了将 myRDD 转换为 json (当我打印 myRDD 时,json 的格式是正确的)并提取我需要的两个字段,我尝试使用 json4s 并尝试了这个:

我也尝试过以这种方式使用 json4s:

但它也不能正常工作。

这是输出:

这些是我正在使用的版本:

我认为问题在于我不明白如何将 RDD 中的每条记录转换为 json 对象来解析它。有人可以更深入地向我解释一下,以便我了解它是如何工作的吗?我的代码正确吗?

谢谢你。

0 投票
1 回答
317 浏览

scala - Scala Spark:尝试在使用重载时避免类型擦除

我对 Scala/Spark 比较陌生

我正在尝试根据类类型将一个函数重载到 DStream

我收到编译错误:

persist(_root_.org.apache.spark.streaming.dstream.DStream) is already defined in the scope

似乎是由于类型擦除。如何让编译器识别出DStream[Service1]DStream[Service2]

谢谢

0 投票
1 回答
412 浏览

apache-kafka - 无法保留 DStream 以供下一批使用

然后进行一些转换并创建类型为 twp DStream Data_1 和 Data_2

并按如下方式进行连接,然后过滤掉那些没有连接键的记录并将它们保存在历史记录中,以便通过与 Data_1 进行联合来在下一批中使用它

我在上一步之后获得了历史记录(通过将其保存到 hdfs 进行检查),但是在进行联合时,这个历史记录仍然是批量空的。

0 投票
2 回答
1150 浏览

java - 对 JavaDStream 进行排序 - Spark Streaming

我有一个适用于JavaDStreams对象的应用程序。这是一段代码,我在其中计算单词出现的频率。

现在,如果我希望打印按整数值排序的前 N ​​个频繁元素,如果没有类似sortByKey的方法(用于 JavaPairRDD),我该怎么做?

0 投票
1 回答
995 浏览

apache-spark - Apache Spark 流式传输 - 超时长时间运行的批处理

我正在设置一个 Apache Spark 长时间运行的流式传输作业,以使用 InputDStream 执行(非并行)流式传输。

我想要实现的是,当队列中的批处理花费太长时间(基于用户定义的超时)时,我希望能够跳过该批处理并完全放弃它 - 并继续执行其余部分。

我无法在 spark API 或在线找到解决此问题的方法——我研究了使用 StreamingContext awaitTerminationOrTimeout,但这会在超时时杀死整个 StreamingContext,而我要做的只是跳过/杀死当前批次。

我也考虑过使用 mapWithState,但这似乎不适用于这个用例。最后,我正在考虑设置一个 StreamingListener 并在批处理开始时启动一个计时器,然后在达到某个超时阈值时让批处理停止/跳过/杀死,但似乎仍然没有办法杀死批处理。

谢谢!

0 投票
0 回答
324 浏览

scala - Spark Streaming - 如何从 foreachRDD 函数中获取结果?

我正在尝试使用 Spark Streaming 读取 Kafka 消息,进行一些计算并将结果发送到另一个进程。

我需要为我的要求积累 JSONObject(全局变量)。put操作引发 NotSerializable 异常。

java.io.NotSerializableException:org.apache.spark.streaming.kafka.DirectKafkaInputDStream$MappedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。

是否可以将此 jsonArray 发送出此 forahRDD 块?我不想写入文件或数据库。

0 投票
1 回答
351 浏览

python - pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输

我想用来自 kafka 主题的数据训练一个流式 kmeans 模型。

我的问题是如何呈现 kmeans streamig 模型的数据

此输出(这些是我的功能,用“|”分隔):

1.0|2.0|0.0|21.0|2.0

1.0|2.0|0.0|21.0|2.0

然后我想这样做

如果我结合两段代码,我会得到错误:

0 投票
1 回答
641 浏览

elasticsearch - pyspark - 将 dstream 写入 elasticsearch 时出错

我在将数据从 spark 流 (pyspark) 索引到 elasticserach 时遇到问题。数据是类型dstream。下面是它的外观

这是我正在使用的弹性索引:index=clus 和 type=data

这是我的代码:

这是错误:

17/07/25 15:31:31 错误执行程序:阶段 11.0(TID 23)中任务 2.0 中的异常 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:发现不可恢复的错误 [127.0.0.1:9200] 返回错误请求(400) - 解析失败;在 org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) 在 org.elasticsearch.hadoop.rest 的 org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) .RestRepository.tryFlush(RestRepository.java:220) 在 org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) 在 org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) 在org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) 在 org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java: ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 错误执行器:阶段 11.0 中的任务 0.0 异常(TID 21 ) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:发现不可恢复的错误 [127.0.0.1:9200] 返回错误请求(400) - 解析失败;在 org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) 在 org.elasticsearch.hadoop.rest 的 org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) .RestRepository.tryFlush(RestRepository.java:220) 在 org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) 在 org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) 在org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat. ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:748) 17/07/25 15 :31:31 错误执行程序:阶段 11.0(TID 22)中任务 1.0 中的异常 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:发现不可恢复的错误 [127.0.0.1:9200] 返回错误请求(400) - 无法解析;在 org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) 在 org.elasticsearch.hadoop.rest 的 org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) .RestRepository.tryFlush(RestRepository.java:220) 在 org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) 在 org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:

0 投票
1 回答
673 浏览

scala - Spark 中的 Dstream 映射和 Dstream 变换映射是否相同?

下面两个是一样的吗?