我对 Flink 完全陌生。可能会重复这个问题,但只找到一个链接,这对我来说是无法理解的。
https://stackoverflow.com/a/44294980/6904987
我以 Key Value 格式将数据存储在 Redis 中,例如 Key 是 UserId,UserInfo 是 value。写在下面的代码。
class RedisExampleMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
}
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val conf = new FlinkJedisPoolConfig.Builder().setHost("IP").build()
val streamSink = env.readTextFile("/path/useInformation.txt").map(x => {
val userInformation = x.split(",")
val UserId = userInformation(0)
val UserInfo = userInformation(1)
(UserId , UserInfo)
})
val redisSink = new RedisSink[(String, String)](conf, new RedisExampleMapper)
streamSink.addSink(redisSink)
样本数据:
12“用户信息12”
13“用户信息13”
14“用户信息14”
15“用户信息15”
我想使用基于 key 的 Flink 从 redis 获取数据。示例 14 应返回“UserInfo14”。输出应该打印在 Flink 日志文件或终端中,无论它是什么。
提前致谢。