我的项目打算使用 Flink 进行数据处理。我们有一个 Kafka 主题,使用该主题我们想使用 Flink 聚合数据。以下是我的示例代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", kafkaServer);
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("topic", "data-stream");
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
KafkaTableSource source = Kafka011JsonTableSource.builder()
// set Kafka topic
.forTopic("data-stream")
// set Kafka consumer properties
.withKafkaProperties(kafkaProps)
// set Table schema
.withSchema(TableSchema.builder()
.field("id", Types.STRING())
.field("refId", Types.STRING())
.field("externalId", Types.STRING())).build()).build();
TableSink csvSink = new CsvTableSink("file", "|");
String[] fieldNames = {"refId"};
TypeInformation[] fieldTypes = {Types.STRING()};
tableEnvironment.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
tableEnvironment.registerTableSource("data-stream", source);
Table events = tableEnvironment.scan("data-stream");
events.groupBy("refId")
.select("refId");
events.writeToSink(csvSink);
由于 group by 标准的动态特性,我打算使用 Flink 的 Table API。当上面的代码运行时,我得到以下错误。
嵌套异常是java.lang.RuntimeException
编译错误:
private final java.util.List relClasses;
public final org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0;
public GeneratedMetadataHandler_ExplainVisibility(java.util.List relClasses,
org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0) {
this.relClasses = relClasses;
this.provider0 = provider0;
}
public org.apache.calcite.rel.metadata.MetadataDef getDef() {
return org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF;
}
public java.lang.Boolean isVisibleInExplain(
org.apache.calcite.rel.RelNode r,
org.apache.calcite.rel.metadata.RelMetadataQuery mq,
org.apache.calcite.sql.SqlExplainLevel a0) {
final java.util.List key = org.apache.calcite.runtime.FlatLists.of(org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF, r, org.apache.calcite.rel.metadata.NullSentinel.mask(a0));
final Object v = mq.map.get(key);
if (v != null) {
if (v == org.apache.calcite.rel.metadata.NullSentinel.ACTIVE) {
throw org.apache.calcite.rel.metadata.CyclicMetadataException.INSTANCE;
}
if (v == org.apache.calcite.rel.metadata.NullSentinel.INSTANCE) {
return null;
}
return (java.lang.Boolean) v;
}
mq.map.put(key,org.apache.calcite.rel.metadata.NullSentinel.ACTIVE);
try {
final java.lang.Boolean x = isVisibleInExplain_(r, mq, a0);
mq.map.put(key, org.apache.calcite.rel.metadata.NullSentinel.mask(x));
return x;
} catch (java.lang.Exception e) {
mq.map.remove(key);
throw e;
}
}
private java.lang.Boolean isVisibleInExplain_(
org.apache.calcite.rel.RelNode r,
org.apache.calcite.rel.metadata.RelMetadataQuery mq,
org.apache.calcite.sql.SqlExplainLevel a0) {
switch (relClasses.indexOf(r.getClass())) {
default:
return provider0.isVisibleInExplain((org.apache.calcite.rel.RelNode) r, mq, a0);
case 3:
return isVisibleInExplain(((org.apache.calcite.plan.hep.HepRelVertex) r).getCurrentRel(), mq, a0);
case -1:
throw new org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$NoHandler(r.getClass());
}
}
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:138)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:423)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1696)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:583)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:502)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:312)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:310)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:409)
at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:291)
at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:103)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4643)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5105)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:740)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:716)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:703)
at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:976)
at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1847)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:761)
at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:431)
at org.apache.catalina.startup.HostConfig.check(HostConfig.java:1619)
at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:304)
at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:123)
at org.apache.catalina.core.ContainerBase.backgroundProcess(ContainerBase.java:1172)
at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.processChildren(ContainerBase.java:1394)
at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.processChildren(ContainerBase.java:1398)
at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.run(ContainerBase.java:1366)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error compiling:
private final java.util.List relClasses;
public final org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0;
public GeneratedMetadataHandler_ExplainVisibility(java.util.List relClasses,
org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0) {
this.relClasses = relClasses;
this.provider0 = provider0;
}
public org.apache.calcite.rel.metadata.MetadataDef getDef() {
return org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF;
}
public java.lang.Boolean isVisibleInExplain(
org.apache.calcite.rel.RelNode r,
org.apache.calcite.rel.metadata.RelMetadataQuery mq,
org.apache.calcite.sql.SqlExplainLevel a0) {
final java.util.List key = org.apache.calcite.runtime.FlatLists.of(org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF, r, org.apache.calcite.rel.metadata.NullSentinel.mask(a0));
final Object v = mq.map.get(key);
if (v != null) {
if (v == org.apache.calcite.rel.metadata.NullSentinel.ACTIVE) {
throw org.apache.calcite.rel.metadata.CyclicMetadataException.INSTANCE;
}
if (v == org.apache.calcite.rel.metadata.NullSentinel.INSTANCE) {
return null;
}
return (java.lang.Boolean) v;
}
mq.map.put(key,org.apache.calcite.rel.metadata.NullSentinel.ACTIVE);
try {
final java.lang.Boolean x = isVisibleInExplain_(r, mq, a0);
mq.map.put(key, org.apache.calcite.rel.metadata.NullSentinel.mask(x));
return x;
} catch (java.lang.Exception e) {
mq.map.remove(key);
throw e;
}
}
private java.lang.Boolean isVisibleInExplain_(
org.apache.calcite.rel.RelNode r,
org.apache.calcite.rel.metadata.RelMetadataQuery mq,
org.apache.calcite.sql.SqlExplainLevel a0) {
switch (relClasses.indexOf(r.getClass())) {
default:
return provider0.isVisibleInExplain((org.apache.calcite.rel.RelNode) r, mq, a0);
case 3:
return isVisibleInExplain(((org.apache.calcite.plan.hep.HepRelVertex) r).getCurrentRel(), mq, a0);
case -1:
throw new org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$NoHandler(r.getClass());
}
}
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:376)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.access$000(JaninoRelMetadataProvider.java:94)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$1.load(JaninoRelMetadataProvider.java:113)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$1.load(JaninoRelMetadataProvider.java:110)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:464)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:477)
at org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:203)
at org.apache.calcite.rel.metadata.RelMetadataQuery.isVisibleInExplain(RelMetadataQuery.java:846)
at org.apache.calcite.rel.externalize.RelWriterImpl.explain_(RelWriterImpl.java:65)
at org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
at org.apache.calcite.plan.volcano.RelSubset.explain(RelSubset.java:174)
at org.apache.calcite.plan.RelOptUtil.toString(RelOptUtil.java:1987)
at org.apache.calcite.plan.volcano.VolcanoPlanner.setRoot(VolcanoPlanner.java:313)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:367)
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
at sample.FlinkService.runFlinck(FlinkService.java:147)
at sample.run(FlinkService.java:702)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:369)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:312)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135)
... 36 common frames omitted
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 0: cannot find symbol
symbol: class BuiltInMetadata$ExplainVisibility$Handler
location: package org.apache.calcite.rel.metadata (compiler.err.cant.resolve.location)
at org.codehaus.commons.compiler.jdk.SimpleCompiler$3.report(SimpleCompiler.java:322)
at com.sun.tools.javac.api.ClientCodeWrapper$WrappedDiagnosticListener.report(ClientCodeWrapper.java:593)
at com.sun.tools.javac.util.Log.writeDiagnostic(Log.java:616)
at com.sun.tools.javac.util.Log$DefaultDiagnosticHandler.report(Log.java:600)
at com.sun.tools.javac.util.Log.report(Log.java:562)
at com.sun.tools.javac.comp.Resolve.logResolveError(Resolve.java:3514)
at com.sun.tools.javac.comp.Resolve.accessInternal(Resolve.java:2219)
at com.sun.tools.javac.comp.Resolve.accessBase(Resolve.java:2262)
at com.sun.tools.javac.comp.Attr.selectSym(Attr.java:3390)
at com.sun.tools.javac.comp.Attr.visitSelect(Attr.java:3278)
at com.sun.tools.javac.tree.JCTree$JCFieldAccess.accept(JCTree.java:1897)
at com.sun.tools.javac.comp.Attr.attribTree(Attr.java:576)
at com.sun.tools.javac.comp.Attr.attribType(Attr.java:638)
at com.sun.tools.javac.comp.Attr.attribType(Attr.java:631)
at com.sun.tools.javac.comp.Attr.attribBase(Attr.java:786)
at com.sun.tools.javac.comp.MemberEnter.complete(MemberEnter.java:1072)
at com.sun.tools.javac.code.Symbol.complete(Symbol.java:574)
at com.sun.tools.javac.code.Symbol$ClassSymbol.complete(Symbol.java:1037)
at com.sun.tools.javac.comp.Enter.complete(Enter.java:493)
at com.sun.tools.javac.comp.Enter.main(Enter.java:471)
at com.sun.tools.javac.main.JavaCompiler.enterTrees(JavaCompiler.java:982)
at com.sun.tools.javac.main.JavaCompiler.compile(JavaCompiler.java:857)
at com.sun.tools.javac.main.Main.compile(Main.java:523)
at com.sun.tools.javac.api.JavacTaskImpl.doCall(JavacTaskImpl.java:129)
at com.sun.tools.javac.api.JavacTaskImpl.call(JavacTaskImpl.java:138)
at org.codehaus.commons.compiler.jdk.SimpleCompiler.cook(SimpleCompiler.java:353)
at org.codehaus.commons.compiler.jdk.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:197)
at org.codehaus.commons.compiler.jdk.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:101)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:50)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:445)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374)
... 70 common frames omitted
我不知道这里发生了什么。任何帮助表示赞赏。
这是我的依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.7</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler-jdk</artifactId>
<version>3.0.7</version>
</dependency>