这是我的 Flink CEP MATCH_RECOGNIZE sql。
SELECT E.*
FROM MyEvents
MATCH_RECOGNIZE (
ORDER BY procTime
MEASURES
A.id as id,
A.name as name
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A)
DEFINE
A AS source='XYZ' and name IN ('EVENT_SRC1', 'EVENT_SRC2')
AND EXISTS (select eap.* from MyEvents AS eap where eap.name IN ('EVENT_SRC1'))
AND EXISTS (select eam.* from MyEvents AS eam where eam.name IN ('EVENT_SRC2'))
) AS E;
给出错误为
Caused by: java.lang.NullPointerException: null
at java.util.ArrayDeque.addFirst(ArrayDeque.java:228)
at java.util.ArrayDeque.push(ArrayDeque.java:503)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:49)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlAsOperator.acceptCall(SqlAsOperator.java:121)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateDefinitions(SqlValidatorImpl.java:5408)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateMatchRecognize(SqlValidatorImpl.java:5287)
at org.apache.calcite.sql.validate.MatchRecognizeNamespace.validateImpl(MatchRecognizeNamespace.java:38)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 20 common frames omitted
然而,如果我删除 2 个 EXISTS 条件,那么它就可以工作。另外,我尝试为我的数据流创建另一个表并在内部查询中使用它,但仍然出现相同的错误?Flink CEP SQL 不支持我做错了什么或这样的语法?
注意-我使用的是 1.11 版本。我已经在普通的 FLINK SQL 中尝试过 EXISTS 子句(即选择 ....from....where..)并且它有效。但不适用于 Flink CEP SQL。谢谢。