问题标签 [spark-redis]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Pyspark / Redis - 随时间推移的数据聚合
我正在使用KafkaUtils Pyspark 实现 从Kafka流式传输大量数据(200k + 事件/批次 3 秒)。
我收到实时数据:
- 一个
sessionID
- 一个
ip
- 一个
state
我现在使用基本的Spark / Redis实现正在做的事情如下:
火花工作:
- 通过以下方式汇总数据
sessionID
:rdd_combined = rdd.map(map_data).combineByKey(lambda x: frozenset([x]), lambda s, v: s | frozenset([v]), lambda s1, s2: s1 | s2)
- 创建一个
set
不同的state
(可能是 1、2、3...) - 保留
ip
信息,然后将其转换为lon
/lat
。 - 检查是否
sessionID
在 Redis 中,如果是则更新它,否则将其写入 Redis。
然后我只在 Python 中为 Redis运行一个小脚本,检查 state 中是否有 1:
- 如果是,则该事件将在频道中发布(例如
channel_1
)并从 Redis 中删除。 - 如果不是,我们检查/更新时间戳。如果
NOW() - timestamp > 10 min
数据已发布,channel_2
否则我们什么也不做。
问题 :
我一直想知道用Spark计算大部分工作的最佳实现是什么。
- 使用
window
+ 聚合或reduceByKeyAndWindow
:我担心的是,如果我使用 10 分钟的窗口并每 3 秒对几乎相同的数据进行一次计算,则效率不是很高。 - using
updateStateByKey
看起来很有趣,但数据永远不会被删除,这可能会成为问题。另外我怎么能检查我们已经过了 10 分钟?
关于这个实现或其他可能的想法?
scala - Spark/Scala 并行写入 redis
是否可以从 spark 并行写入 Redis?
(或:如何从 spark 中快速编写数万个键/列表)
目前,我正在按顺序向 Redis 写信,并且需要永远。我需要写大约 90000 个列表(长度为 2-2000)。速度极其重要。目前,它大约需要 1 小时。Redis 的传统基准声称每秒有数千次 Redis 写入,但在我的管道中,我离这个还很远。
任何帮助表示赞赏。
apache-spark - 如何将redis变成火花数据集或数据框?
我正在尝试使用 redis 作为 spark sql 的来源,但遇到了如何转换 rdd 的问题。以下是我的代码:
我得到以下异常:
我不知道下一步该怎么做,请帮忙!
redis - Storm + Redis 或 Storm Trident 或 Spark Streaming
我将构建流处理系统。使用 Kafka 进行消息传输。流处理可以通过风暴三叉戟、风暴或火花流来完成。但找不到最佳答案。
如果一开始的条件,流处理将很简单。字段可以在不同的元组内,这就是我需要存储前一个字段的原因。现在我通过使用storm + redis(不是三叉戟)实现了这个场景。在 bolt 内部,从 redis 中获取所有字段,然后通过 if 条件进行测量。如果字段不为空,则使用,如果为空,则继续工作。如果这是正确的,我对建筑有不好的感觉。
我应该使用 trident api 还是 spark 流式传输?
我不需要“恰好一次”的交货保证。只需要每个数据源的状态。此状态将按字段计算。
谢谢您的回复。
scala - Kafka Spark Stream 直接保存到 Redis
我正在使用 Scala 获取 kafkaStream 并希望将此数据直接插入到 Redis。这样做的最佳最佳策略是什么?
早些时候我试图使用https://github.com/debasishg/scala-redis但这不适用于 Spark,所以我必须收集 RDD,然后将记录保存到 Redis 中,这在我的项目中产生了很多开销。所以寻找一些解决方案,我可以直接将这串消息推送到 Redis 中,并且还想维护 ZScore。
谢谢,
scala - scala 2.11 的 spark-redis 连接器
我正在寻找一种解决方案,我可以使用 zscore 和附加模式将我的 kafka 流式 RDD 保存到 Redis。Do we have any Connector to do this - RedisLabs 尝试了 spark redis 连接器,但它仅与 Scala 2.10 兼容。Anchormen 的另一个 github https://github.com/Anchormen/spark-redis-connector但它再次缺乏文档,并且 jar 在 maven 上不可用。
所以,请有任何建议。谢谢,
scala - Spark Streaming Redis Read Time Out with Scala
当我从 redis 读取表格时,得到以下错误。
下面的代码通常运行良好。
通常它的工作行数少于 200 万行。但是,如果我正在阅读大表,则会出现此错误。
18/10/11 17:08:25 错误执行程序:阶段 3.0 (TID 338) 中任务 37.0 中的异常 redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: 在 redis.clients.util 读取超时。 RedisInputStream.ensureFill(RedisInputStream.java:202) 在 redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
val redis = spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()
我还更改了 redis 上的一些设置,但我认为这与此无关。你知道我该如何解决这个问题吗?
java - 如何在 Spark Streaming 中使用 redis
我正在构建一个应用程序,它从 redis 中的列表中读取 json 元素并使用 spark 流式传输它们。这是我写的:
如何使用 jssc 对象访问 redis。提前致谢。
apache-spark - Spark-redis:数据帧写入时间太慢
我是 Apache Spark/Redis 用户,最近我为一个项目尝试了spark-redis。该程序正在生成大约 300 万行的 PySpark 数据帧,我正在使用命令将其写入 Redis 数据库
正如GitHub 项目数据框页面所建议的那样。
但是,对于相同的 Spark 集群配置(相同数量的 EC2 实例和实例类型),我的写入时间不一致。有时它发生得很快,有时又太慢。有什么方法可以加快这个过程并获得一致的写入时间?我想知道当里面已经有很多键时它是否会慢慢发生,但这对于哈希表来说应该不是问题,不是吗?
scala - 如何使用 spark-redis 在 spark 中读取 redis 映射
我在 Redis 中有一个普通的 scala 映射(键和值)。现在我想在我的一个 spark-streaming 程序中读取该映射并将其用作广播变量,以便我的奴隶可以使用该映射来解析键映射。我正在使用 spark-redis 2.3.1 库,但现在确定如何阅读。
在 redis 表“员工”中映射 -
这就是我试图在火花中阅读的方式(不确定这是否正确-请纠正我)-
上面的代码没有显示任何内容,我得到空输出。