1

我有一个用例,我们正在流式传输事件,并且对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。Spark Streaming 将运行 40 个执行程序,我有 5 个这样的 Streaming 作业都连接到同一个 Redis 集群。所以我很困惑我应该采取什么方法来创建 Redis 连接

  1. 在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以用广播变量做到这一点吗?

  2. 为每个分区创建一个 Redis 连接,但是我的代码是这样编写的

    val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })

所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区,我都将创建一个新连接。

有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

如果需要,我可以添加更多上下文/信息。

4

1 回答 1

2

1. 在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使这个对象可序列化)。我可以用广播变量做到这一点吗?

回答 - 否。由于与连接关联的机器相关数据,大多数连接对象不可序列化。

2. 有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

Ans- 是的,创建一个连接池并在分区中使用它。这里是风格。您可以像这样创建一个连接池https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

然后使用它

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

请检查: 使用 foreachRDD 的设计模式

于 2019-03-15T22:09:16.340 回答