我有一个用例,我们正在流式传输事件,并且对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。Spark Streaming 将运行 40 个执行程序,我有 5 个这样的 Streaming 作业都连接到同一个 Redis 集群。所以我很困惑我应该采取什么方法来创建 Redis 连接
在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以用广播变量做到这一点吗?
为每个分区创建一个 Redis 连接,但是我的代码是这样编写的
val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })
所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区,我都将创建一个新连接。
有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?
如果需要,我可以添加更多上下文/信息。