问题标签 [spark-redis]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
829 浏览

python - Pyspark / Redis - 随时间推移的数据聚合

我正在使用KafkaUtils Pyspark 实现 从Kafka流式传输大量数据(200k + 事件/批次 3 秒)。

我收到实时数据:

  • 一个sessionID
  • 一个ip
  • 一个state

我现在使用基本的Spark / Redis实现正在做的事情如下:

火花工作:

  • 通过以下方式汇总数据sessionIDrdd_combined = rdd.map(map_data).combineByKey(lambda x: frozenset([x]), lambda s, v: s | frozenset([v]), lambda s1, s2: s1 | s2)
  • 创建一个set不同的state(可能是 1、2、3...)
  • 保留ip信息,然后将其转换为lon/ lat
  • 检查是否sessionID在 Redis 中,如果则更新它,否则将其写入 Redis。

然后我只在 Python 中为 Redis运行一个小脚本,检查 state 中是否有 1:

  • 如果,则该事件将在频道中发布(例如channel_1)并从 Redis 中删除。
  • 如果不是,我们检查/更新时间戳。如果NOW() - timestamp > 10 min数据已发布,channel_2否则我们什么也不做。

问题 :

我一直想知道用Spark计算大部分工作的最佳实现是什么。

  • 使用window+ 聚合或reduceByKeyAndWindow:我担心的是,如果我使用 10 分钟的窗口并每 3 秒对几乎相同的数据进行一次计算,则效率不是很高。
  • usingupdateStateByKey看起来很有趣,但数据永远不会被删除,这可能会成为问题。另外我怎么能检查我们已经过了 10 分钟?

关于这个实现或其他可能的想法?

0 投票
1 回答
2026 浏览

scala - Spark/Scala 并行写入 redis

是否可以从 spark 并行写入 Redis?

(或:如何从 spark 中快速编写数万个键/列表)

目前,我正在按顺序向 Redis 写信,并且需要永远。我需要写大约 90000 个列表(长度为 2-2000)。速度极其重要。目前,它大约需要 1 小时。Redis 的传统基准声称每秒有数千次 Redis 写入,但在我的管道中,我离这个还很远。

任何帮助表示赞赏。

0 投票
1 回答
951 浏览

apache-spark - 如何将redis变成火花数据集或数据框?

我正在尝试使用 redis 作为 spark sql 的来源,但遇到了如何转换 rdd 的问题。以下是我的代码:

我得到以下异常:

我不知道下一步该怎么做,请帮忙!

0 投票
0 回答
223 浏览

redis - Storm + Redis 或 Storm Trident 或 Spark Streaming

我将构建流处理系统。使用 Kafka 进行消息传输。流处理可以通过风暴三叉戟、风暴或火花流来完成。但找不到最佳答案。

如果一开始的条件,流处理将很简单。字段可以在不同的元组内,这就是我需要存储前一个字段的原因。现在我通过使用storm + redis(不是三叉戟)实现了这个场景。在 bolt 内部,从 redis 中获取所有字段,然后通过 if 条件进行测量。如果字段不为空,则使用,如果为空,则继续工作。如果这是正确的,我对建筑有不好的感觉。

我应该使用 trident api 还是 spark 流式传输?

我不需要“恰好一次”的交货保证。只需要每个数据源的状态。此状态将按字段计算。

谢谢您的回复。

0 投票
0 回答
812 浏览

scala - Kafka Spark Stream 直接保存到 Redis

我正在使用 Scala 获取 kafkaStream 并希望将此数据直接插入到 Redis。这样做的最佳最佳策略是什么?

早些时候我试图使用https://github.com/debasishg/scala-redis但这不适用于 Spark,所以我必须收集 RDD,然后将记录保存到 Redis 中,这在我的项目中产生了很多开销。所以寻找一些解决方案,我可以直接将这串消息推送到 Redis 中,并且还想维护 ZScore。

谢谢,

0 投票
0 回答
460 浏览

scala - scala 2.11 的 spark-redis 连接器

我正在寻找一种解决方案,我可以使用 zscore 和附加模式将我的 kafka 流式 RDD 保存到 Redis。Do we have any Connector to do this - RedisLabs 尝试了 spark redis 连接器,但它仅与 Scala 2.10 兼容。Anchormen 的另一个 github https://github.com/Anchormen/spark-redis-connector但它再次缺乏文档,并且 jar 在 maven 上不可用。

所以,请有任何建议。谢谢,

0 投票
0 回答
443 浏览

scala - Spark Streaming Redis Read Time Out with Scala

当我从 redis 读取表格时,得到以下错误。

下面的代码通常运行良好。

通常它的工作行数少于 200 万行。但是,如果我正在阅读大表,则会出现此错误。

18/10/11 17:08:25 错误执行程序:阶段 3.0 (TID 338) 中任务 37.0 中的异常 redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: 在 redis.clients.util 读取超时。 RedisInputStream.ensureFill(RedisInputStream.java:202) 在 redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)

val redis = spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()

我还更改了 redis 上的一些设置,但我认为这与此无关。你知道我该如何解决这个问题吗?

0 投票
1 回答
769 浏览

java - 如何在 Spark Streaming 中使用 redis

我正在构建一个应用程序,它从 redis 中的列表中读取 json 元素并使用 spark 流式传输它们。这是我写的:

如何使用 jssc 对象访问 redis。提前致谢。

0 投票
1 回答
898 浏览

apache-spark - Spark-redis:数据帧写入时间太慢

我是 Apache Spark/Redis 用户,最近我为一个项目尝试了spark-redis。该程序正在生成大约 300 万行的 PySpark 数据帧,我正在使用命令将其写入 Redis 数据库

正如GitHub 项目数据框页面所建议的那样。

但是,对于相同的 Spark 集群配置(相同数量的 EC2 实例和实例类型),我的写入时间不一致。有时它发生得很快,有时又太慢。有什么方法可以加快这个过程并获得一致的写入时间?我想知道当里面已经有很多键时它是否会慢慢发生,但这对于哈希表来说应该不是问题,不是吗?

0 投票
1 回答
1320 浏览

scala - 如何使用 spark-redis 在 spark 中读取 redis 映射

我在 Redis 中有一个普通的 scala 映射(键和值)。现在我想在我的一个 spark-streaming 程序中读取该映射并将其用作广播变量,以便我的奴隶可以使用该映射来解析键映射。我正在使用 spark-redis 2.3.1 库,但现在确定如何阅读。

在 redis 表“员工”中映射 -

这就是我试图在火花中阅读的方式(不确定这是否正确-请纠正我)-

上面的代码没有显示任何内容,我得到空输出。