0

我的项目打算使用 Flink 进行数据处理。我们有一个 Kafka 主题,使用该主题我们想使用 Flink 聚合数据。以下是我的示例代码

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", kafkaServer);
    kafkaProps.put("zookeeper.connect", "localhost:2181");
    kafkaProps.put("topic", "data-stream");

    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
    KafkaTableSource source = Kafka011JsonTableSource.builder()
            // set Kafka topic
            .forTopic("data-stream")
            // set Kafka consumer properties
            .withKafkaProperties(kafkaProps)
            // set Table schema
            .withSchema(TableSchema.builder()
                    .field("id", Types.STRING())
                    .field("refId", Types.STRING())
                    .field("externalId", Types.STRING())).build()).build();

    TableSink csvSink = new CsvTableSink("file", "|");
    String[] fieldNames = {"refId"};
    TypeInformation[] fieldTypes = {Types.STRING()};

    tableEnvironment.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
    tableEnvironment.registerTableSource("data-stream", source);

    Table events = tableEnvironment.scan("data-stream");
    events.groupBy("refId")
            .select("refId");
    events.writeToSink(csvSink);

由于 group by 标准的动态特性,我打算使用 Flink 的 Table API。当上面的代码运行时,我得到以下错误。

嵌套异常是java.lang.RuntimeException

编译错误:

  private final java.util.List relClasses;
  public final org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0;
  public GeneratedMetadataHandler_ExplainVisibility(java.util.List relClasses,
      org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0) {
    this.relClasses = relClasses;
    this.provider0 = provider0;
  }
  public org.apache.calcite.rel.metadata.MetadataDef getDef() {
    return org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF;
  }
  public java.lang.Boolean isVisibleInExplain(
      org.apache.calcite.rel.RelNode r,
      org.apache.calcite.rel.metadata.RelMetadataQuery mq,
      org.apache.calcite.sql.SqlExplainLevel a0) {
    final java.util.List key = org.apache.calcite.runtime.FlatLists.of(org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF, r, org.apache.calcite.rel.metadata.NullSentinel.mask(a0));
    final Object v = mq.map.get(key);
    if (v != null) {
      if (v == org.apache.calcite.rel.metadata.NullSentinel.ACTIVE) {
    throw org.apache.calcite.rel.metadata.CyclicMetadataException.INSTANCE;
      }
      if (v == org.apache.calcite.rel.metadata.NullSentinel.INSTANCE) {
    return null;
      }
      return (java.lang.Boolean) v;
    }
    mq.map.put(key,org.apache.calcite.rel.metadata.NullSentinel.ACTIVE);
    try {
      final java.lang.Boolean x = isVisibleInExplain_(r, mq, a0);
      mq.map.put(key, org.apache.calcite.rel.metadata.NullSentinel.mask(x));
      return x;
    } catch (java.lang.Exception e) {
      mq.map.remove(key);
      throw e;
    }
  }

  private java.lang.Boolean isVisibleInExplain_(
      org.apache.calcite.rel.RelNode r,
      org.apache.calcite.rel.metadata.RelMetadataQuery mq,
      org.apache.calcite.sql.SqlExplainLevel a0) {
    switch (relClasses.indexOf(r.getClass())) {
    default:
      return provider0.isVisibleInExplain((org.apache.calcite.rel.RelNode) r, mq, a0);
    case 3:
      return isVisibleInExplain(((org.apache.calcite.plan.hep.HepRelVertex) r).getCurrentRel(), mq, a0);
    case -1:
      throw new org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$NoHandler(r.getClass());
    }
  }

    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:138)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:423)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1696)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:583)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:502)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:312)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:310)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
    at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:409)
    at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:291)
    at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:103)
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4643)
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5105)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
    at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:740)
    at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:716)
    at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:703)
    at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:976)
    at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1847)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:761)
    at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:431)
    at org.apache.catalina.startup.HostConfig.check(HostConfig.java:1619)
    at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:304)
    at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:123)
    at org.apache.catalina.core.ContainerBase.backgroundProcess(ContainerBase.java:1172)
    at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.processChildren(ContainerBase.java:1394)
    at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.processChildren(ContainerBase.java:1398)
    at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.run(ContainerBase.java:1366)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error compiling:
  private final java.util.List relClasses;
  public final org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0;
  public GeneratedMetadataHandler_ExplainVisibility(java.util.List relClasses,
      org.apache.calcite.rel.metadata.RelMdExplainVisibility provider0) {
    this.relClasses = relClasses;
    this.provider0 = provider0;
  }
  public org.apache.calcite.rel.metadata.MetadataDef getDef() {
    return org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF;
  }
  public java.lang.Boolean isVisibleInExplain(
      org.apache.calcite.rel.RelNode r,
      org.apache.calcite.rel.metadata.RelMetadataQuery mq,
      org.apache.calcite.sql.SqlExplainLevel a0) {
    final java.util.List key = org.apache.calcite.runtime.FlatLists.of(org.apache.calcite.rel.metadata.BuiltInMetadata$ExplainVisibility.DEF, r, org.apache.calcite.rel.metadata.NullSentinel.mask(a0));
    final Object v = mq.map.get(key);
    if (v != null) {
      if (v == org.apache.calcite.rel.metadata.NullSentinel.ACTIVE) {
    throw org.apache.calcite.rel.metadata.CyclicMetadataException.INSTANCE;
      }
      if (v == org.apache.calcite.rel.metadata.NullSentinel.INSTANCE) {
    return null;
      }
      return (java.lang.Boolean) v;
    }
    mq.map.put(key,org.apache.calcite.rel.metadata.NullSentinel.ACTIVE);
    try {
      final java.lang.Boolean x = isVisibleInExplain_(r, mq, a0);
      mq.map.put(key, org.apache.calcite.rel.metadata.NullSentinel.mask(x));
      return x;
    } catch (java.lang.Exception e) {
      mq.map.remove(key);
      throw e;
    }
  }

  private java.lang.Boolean isVisibleInExplain_(
      org.apache.calcite.rel.RelNode r,
      org.apache.calcite.rel.metadata.RelMetadataQuery mq,
      org.apache.calcite.sql.SqlExplainLevel a0) {
    switch (relClasses.indexOf(r.getClass())) {
    default:
      return provider0.isVisibleInExplain((org.apache.calcite.rel.RelNode) r, mq, a0);
    case 3:
      return isVisibleInExplain(((org.apache.calcite.plan.hep.HepRelVertex) r).getCurrentRel(), mq, a0);
    case -1:
      throw new org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$NoHandler(r.getClass());
    }
  }

    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:376)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.access$000(JaninoRelMetadataProvider.java:94)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$1.load(JaninoRelMetadataProvider.java:113)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$1.load(JaninoRelMetadataProvider.java:110)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
    at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:464)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:477)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:203)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.isVisibleInExplain(RelMetadataQuery.java:846)
    at org.apache.calcite.rel.externalize.RelWriterImpl.explain_(RelWriterImpl.java:65)
    at org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
    at org.apache.calcite.plan.volcano.RelSubset.explain(RelSubset.java:174)
    at org.apache.calcite.plan.RelOptUtil.toString(RelOptUtil.java:1987)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.setRoot(VolcanoPlanner.java:313)
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:367)
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
    at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
    at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
    at sample.FlinkService.runFlinck(FlinkService.java:147)
    at sample.run(FlinkService.java:702)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:369)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:312)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135)
    ... 36 common frames omitted
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 0: cannot find symbol
  symbol:   class BuiltInMetadata$ExplainVisibility$Handler
  location: package org.apache.calcite.rel.metadata (compiler.err.cant.resolve.location)
    at org.codehaus.commons.compiler.jdk.SimpleCompiler$3.report(SimpleCompiler.java:322)
    at com.sun.tools.javac.api.ClientCodeWrapper$WrappedDiagnosticListener.report(ClientCodeWrapper.java:593)
    at com.sun.tools.javac.util.Log.writeDiagnostic(Log.java:616)
    at com.sun.tools.javac.util.Log$DefaultDiagnosticHandler.report(Log.java:600)
    at com.sun.tools.javac.util.Log.report(Log.java:562)
    at com.sun.tools.javac.comp.Resolve.logResolveError(Resolve.java:3514)
    at com.sun.tools.javac.comp.Resolve.accessInternal(Resolve.java:2219)
    at com.sun.tools.javac.comp.Resolve.accessBase(Resolve.java:2262)
    at com.sun.tools.javac.comp.Attr.selectSym(Attr.java:3390)
    at com.sun.tools.javac.comp.Attr.visitSelect(Attr.java:3278)
    at com.sun.tools.javac.tree.JCTree$JCFieldAccess.accept(JCTree.java:1897)
    at com.sun.tools.javac.comp.Attr.attribTree(Attr.java:576)
    at com.sun.tools.javac.comp.Attr.attribType(Attr.java:638)
    at com.sun.tools.javac.comp.Attr.attribType(Attr.java:631)
    at com.sun.tools.javac.comp.Attr.attribBase(Attr.java:786)
    at com.sun.tools.javac.comp.MemberEnter.complete(MemberEnter.java:1072)
    at com.sun.tools.javac.code.Symbol.complete(Symbol.java:574)
    at com.sun.tools.javac.code.Symbol$ClassSymbol.complete(Symbol.java:1037)
    at com.sun.tools.javac.comp.Enter.complete(Enter.java:493)
    at com.sun.tools.javac.comp.Enter.main(Enter.java:471)
    at com.sun.tools.javac.main.JavaCompiler.enterTrees(JavaCompiler.java:982)
    at com.sun.tools.javac.main.JavaCompiler.compile(JavaCompiler.java:857)
    at com.sun.tools.javac.main.Main.compile(Main.java:523)
    at com.sun.tools.javac.api.JavacTaskImpl.doCall(JavacTaskImpl.java:129)
    at com.sun.tools.javac.api.JavacTaskImpl.call(JavacTaskImpl.java:138)
    at org.codehaus.commons.compiler.jdk.SimpleCompiler.cook(SimpleCompiler.java:353)
    at org.codehaus.commons.compiler.jdk.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:197)
    at org.codehaus.commons.compiler.jdk.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:101)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:50)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:445)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374)
    ... 70 common frames omitted

我不知道这里发生了什么。任何帮助表示赞赏。

这是我的依赖项

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>3.0.7</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler-jdk</artifactId>
            <version>3.0.7</version>
        </dependency>
4

0 回答 0