0

我正在使用 Flink SQL 处理 Kafka 流,其中每条消息都从 Kafka 中提取,使用 flink sql 处理并推回 kafka。我想要一个嵌套输出,其中输入是平坦的,输出是嵌套的。例如说我的输入是

{'StudentName':'ABC','StudentAge':33}

并希望输出为

{'Student':{'Name':'ABC','Age':33}}

我尝试在这里搜索和一些类似的链接,但找不到。是否可以使用 Apache Flink SQL API 来做到这一点?如有必要,可以使用用户定义的函数,但希望避免这样做。

4

2 回答 2

0

你可以尝试这样的事情:

SELECT 
  MAP ['Student', MAP ['Name', StudentName, 'Age', StudentAge]] 
FROM 
  students;

我在这里找到了 MAP 函数,但我不得不在SQL 客户端中进行实验以找出语法。

于 2019-01-04T16:05:57.883 回答
0

我可以通过从 Flink UDF 返回一个地图来实现同样的目的。UDF 中的 eval() 函数将返回一个 Map,而 FlinkSQL 查询将使用 student 作为别名调用 UDF:

UDF 应如下所示

    public class getStudent extends ScalarFunction {
        public Map<String, String> eval(String name, Integer age) {
            Map<String, String> student = new HashMap<>();
            student.put("Name", name);
            student.put("Age", age.toString());
            return student;
        }
    }

FlinkSQL 查询保持如下:

    Select getStudent(StudentName, StudentAge) as `Student` from MyKafkaTopic

尝试从 FlinkSQL 中获取 List 时,也可以对 List 执行相同的操作

于 2019-08-19T05:36:44.207 回答