0

我对 Flink 完全陌生。可能会重复这个问题,但只找到一个链接,这对我来说是无法理解的。

https://stackoverflow.com/a/44294980/6904987

我以 Key Value 格式将数据存储在 Redis 中,例如 Key 是 UserId,UserInfo 是 value。写在下面的代码。

class RedisExampleMapper extends RedisMapper[(String, String)] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
  }

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}


val env = StreamExecutionEnvironment.getExecutionEnvironment
    val conf = new FlinkJedisPoolConfig.Builder().setHost("IP").build()
    val streamSink = env.readTextFile("/path/useInformation.txt").map(x => {
          val userInformation = x.split(",")
          val UserId = userInformation(0)
          val UserInfo  = userInformation(1)
          (UserId , UserInfo)
        })
val redisSink = new RedisSink[(String, String)](conf, new RedisExampleMapper)
streamSink.addSink(redisSink)

样本数据:

12“用户信息12”

13“用户信息13”

14“用户信息14”

15“用户信息15”

我想使用基于 key 的 Flink 从 redis 获取数据。示例 14 应返回“UserInfo14”。输出应该打印在 Flink 日志文件或终端中,无论它是什么。

提前致谢。

4

2 回答 2

0

扩展https://stackoverflow.com/a/44294980/6904987中的答案。

用 . 添加源env.addSource(new RedisSource(data structure name))

您必须自己实现连接到 Redis 数据库的 RedisSource,从 Redis 数据结构中读取记录。

实施取决于。您可以通过轮询从 Redis 消费,也可以订阅 Redis,只要您从 Redis 获取事件,就从源发出事件。

您可以在此处查看一般 SourceFunction 示例和文档:https ://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/源/SourceFunction.html

于 2018-04-13T12:22:10.320 回答
0

如果要查询 Redis 进行键值搜索,可以在转换中使用 Redis 客户端。例如,如果您使用 Java 和 Flink,可以使用 Jedis 查询 Redis。

于 2018-04-16T07:10:05.230 回答