0

我在 Redis 中有一个普通的 scala 映射(键和值)。现在我想在我的一个 spark-streaming 程序中读取该映射并将其用作广播变量,以便我的奴隶可以使用该映射来解析键映射。我正在使用 spark-redis 2.3.1 库,但现在确定如何阅读。

在 redis 表“员工”中映射 -

name   |    value
------------------
123         David
124         John
125         Alex

这就是我试图在火花中阅读的方式(不确定这是否正确-请纠正我)-

 val loadedDf = spark.read
  .format("org.apache.spark.sql.redis")
  .schema(
    StructType(Array(
      StructField("name", IntegerType),
      StructField("value", StringType)
    )
  ))
  .option("table", "employee")
  .option("key.column", "name")
  .load()
loadedDf.show() 

上面的代码没有显示任何内容,我得到空输出。

4

1 回答 1

2

您可以将以下代码用于您的任务,但您需要使用 Spark 数据集(案例数据框到案例类)来完成此任务。下面是一个在 Redis 中读写的完整示例。

object DataFrameExample {

  case class employee(name: String, value: Int)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
          .builder()
          .appName("redis-df")
          .master("local[*]")
          .config("spark.redis.host", "localhost")
          .config("spark.redis.port", "6379")
          .getOrCreate()

    val personSeq = Seq(employee("John", 30), employee("Peter", 45)
    val df = spark.createDataFrame(personSeq)

    df.write
      .format("org.apache.spark.sql.redis")
      .option("table", "person")
      .mode(SaveMode.Overwrite)
      .save()

    val loadedDf = spark.read
                        .format("org.apache.spark.sql.redis")
                        .option("table", "person")
                        .load()
    loadedDf.printSchema()
    loadedDf.show()
  }
}

输出如下

root
 |-- name: string (nullable = true)
 |-- value: integer (nullable = false)

+-----+-----+
| name|value|
+-----+-----+
| John| 30  |
|Peter| 45  |
+-----+-----+

您还可以在Redis文档中查看更多详细信息

于 2019-05-11T13:32:33.377 回答