-1

我想在 Flink中的“ Select .. From .. GROUP BY .. ”查询中保留每个键的所有原始行。我定义了一个名为 RowToJsonAgg的AggregateFunction ,它将行聚合成一个 Json 字符串。

class RowToJsonAgg extends AggregateFunction[String, ListBuffer[String]]{
  def accumulate(accumulator: ListBuffer[String], row: Any*): Unit = {
   ....

// 假设行看起来像“$field1_name, $field1_value, $field2_name, $field2_value, ...” // 尝试从行生成 json。然而,当我运行查询时,Flink 似乎找不到这个函数}

  def merge(accumulator: ListBuffer[String], its: java.lang.Iterable[ListBuffer[String]]): Unit = {
    accumulator.append(
      WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
    )
  }

  def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
    accumulator.clear()
  }

  override def getValue(accumulator: ListBuffer[String]): String = {
    accumulator.mkString("{", ",", "}")
  }

  override def createAccumulator(): ListBuffer[String] = ListBuffer.empty

  override def getAccumulatorType(): TypeInformation[ListBuffer[String]] = {
    TypeInformation.of(classOf[ListBuffer[String]])
  }

  override def getResultType: TypeInformation[String] = TypeInformation.of(classOf[String])
}

数据类和查询如下所示:

case class Stock(id:Int, price: Int, volumn: Int, ts: Long)

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

val bbTableEnv = TableEnvironment.create(bbSettings)

bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])

val table = bbTableEnv.fromValues(...)

bbTableEnv.createTemporaryView("Stock", table)

bbTableEnv.executeSql(
    "select price, row_to_json_agg('volumn', volumn, 'ts', ts) as details from Stock group by price"
)

当我运行应用程序时,我得到了 SQL 验证异常,详细消息是“未找到函数签名 row_to_json_agg(CHARACTER, NUMERIC, CHARACTER, NUMERIC) 的匹配项

似乎 Flink 找不到正确的累积函数来调用。

如果我将累积函数声明如下

def accumulate(accumulator: ListBuffer[String], volumn: Integer, ts: Long)

并改变了查询

"select price, row_to_json_agg(volumn, ts) from Stock group by price" 

我遇到了同样的异常,消息是“找不到函数签名 row_to_json_agg(NUMERIC,NUMERIC) 的匹配项

任何想法如何使聚合函数工作?

4

1 回答 1

0

我自己想通了。

  1. 通过运行 SQL 注册 UDF,如下所示:

    bbTableEnv.executeSQL(String.format("创建临时函数 $udf_name as '%s'", "$full_class_name_of_your_udf") )

代替

bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
  1. 更喜欢使用 Java 来实现 UDF 而不是 Scala
于 2021-01-20T00:30:37.670 回答