1

我的用例是

  1. 从 AWS Kinesis 数据流中获取数据并使用 flink 数据流 api 过滤/映射
  2. 使用 StreamTable 环境对数据进行分组和聚合
  3. 使用 JDBC 连接器使用 SQLTableEnvironment 写入 mysql

我能够将我的数据流结果写入 mySQL 表,但由于流式传输它附加了每个新行,而我想覆盖。

    consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-central-1");
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

    // Parse Message
    DataStream<Event> events = env.addSource(
            new FlinkKinesisConsumer<>(
                    Config.INPUT_STREAM,
                    new KinesisEventDeserializationSchema(),
                    consumerConfig
            )
    )
            .uid("kinesisEventSource");
      ....    
      ....
      ....

      SingleOutputStreamOperator<ArticleView> filteredDetailsViewEvents = articleViews
            .filter(new FilterFunction<ArticleView>() {
                @Override
                public boolean filter(ArticleView event) throws Exception {
                    return StringUtils.isNotBlank(event.getArticleNumber());
                }
            })
            .uid("filteredDetailsViewFilter");
    
   
    Table t=tEnv.fromDataStream(filteredDetailsViewEvents);

  
    tEnv.executeSql("CREATE TABLE eventsSlider1 (\n" +
            "  articleNumber String,\n" +
            "  mandant String,\n" +
            "  category STRING,\n" +
            "  cnt BIGINT NOT NULL,\n" +
            " CONSTRAINT pk_event PRIMARY KEY (articleNumber,mandant,category) NOT ENFORCED\n" +
            ") WITH (\n" +
            "   'connector.type' = 'jdbc',\n" +

            "   'connector.url' = 'jdbc:mysql://localhost:3306/events',\n" +
            "   'connector.table' = 'categorySliderItems',\n" +
            "   'connector.username' = 'root',\n" +
            "   'connector.password' = '123456'\n" 

            ")");

   tEnv.executeSql("INSERT INTO eventsSlider1 (SELECT articleNumber,mandant,category,cnt "+
            "FROM ("+
            " SELECT articleNumber,mandant,category,count(articleNumber) as cnt,"+
            " ROW_NUMBER() OVER (PARTITION BY mandant,category ORDER BY count(articleNumber) DESC) as row_num"+
            " FROM "+t+" group by articleNumber,category, mandant)"+
            " WHERE row_num <= 3)");
4

1 回答 1

0

问题是我没有在表中设置正确的主键。因为主键是 flink 唯一可以检查 upsert 操作并选择更新或插入操作的东西。

于 2021-11-08T15:39:10.377 回答