我想编写一个自定义函数来获取基于配置的过滤器,按列名和指标映射分组。我想编写一个类似于下面的函数。指标映射可以是 1 对 1 或 1 对多,并且可以有大约 100 多个映射。
示例功能:
def buildApi(df: DataFrame, groupByColumns: List[String], filterCondition: String, metricsMapping: List[Map(String , List[String])] ): DataFrame ={
val dfDerived = df.filter(filterCondition).select(< CASE when event_name IN (metricsMapping.actualEventName ) THEN derivedEventName END event_name >).groupBy(groupByColumns)
dfDerived
}
示例配置如下所示:
{
"source":"eventDataframe",
"filters":"is_active = 1 AND is_deprecated <> 1",
"groupbyColumns":["marketplace_id","customer_id","event_date"],
"metricsMapping":
[
{
"derivedEventName": "1st_stream",
"actualEventName": [ "1st_stream"]
},
{
"derivedEventName": "total_event_signup",
"actualEventName": [ "1st_signup" ,"2nd_signup"]
},
{
"derivedEventName": "total_categoty_signup",
"actualEventName":[ "1st_catg_signup","2nd_catg_signup",.. "10th_catg_signup"]
}
],
"outputDataset":dfDerived
},
我将解析这个配置并将函数调用为:
dfDerived = buildApi(Config.source, Config.filters, Config.groupbyColumns, Config.metricsMapping )
如何编写循环度量并动态应用 Case 子句的函数?我可以在这里得到一些帮助吗?