1

我在 Flink 中有这个应用程序,它使用 Table API 从源打印数据。Flink 的官方文档说 Table API 在其核心使用 Calcite 来翻译和优化查询计划。他们没有很深入地描述它,所以我去了源代码并尝试从那里复制一些代码。但是,据我所知,他们也使用方解石规则。

如果我想实施自己的规则怎么办?可能吗?例如,如何在 Calcite 中实现一个简单的规则来更改过滤器的参数?

这是我的代码

public class HelloWorldCalcitePlanTableAPI {
    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCalcitePlanTableAPI.class);
    private static final String TICKETS_STATION_01_PLATFORM_01 = "TicketsStation01Plat01";

    public static void main(String[] args) throws Exception {
        new HelloWorldCalcitePlanTableAPI("127.0.0.1", "127.0.0.1");
    }

    public HelloWorldCalcitePlanTableAPI(String ipAddressSource01, String ipAddressSink) throws Exception {
        // Start streaming from fake data source sensors
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableConfig);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Calcite configuration file to change the query execution plan
        // CalciteConfig cc = tableEnv.getConfig().getCalciteConfig();
        CalciteConfig cc = new CalciteConfigBuilder()
                .addNormRuleSet(RuleSets.ofList(MyFilterReduceExpressionRule.FILTER_INSTANCE))
                .replaceDecoRuleSet(RuleSets.ofList(MyDataStreamRule.INSTANCE))
                .build();
        tableEnv.getConfig().setCalciteConfig(cc);

        // obtain query configuration from TableEnvironment
        StreamQueryConfig qConfig = tableEnv.queryConfig();
        qConfig.withIdleStateRetentionTime(Time.minutes(30), Time.hours(2));

        // Register Data Source Stream tables in the table environment
        tableEnv.registerTableSource(TICKETS_STATION_01_PLATFORM_01,
                new MqttSensorTableSource(ipAddressSource01, TOPIC_STATION_01_PLAT_01_TICKETS));
        Table result = tableEnv.scan(TICKETS_STATION_01_PLATFORM_01)
                .filter(VALUE + " >= 50 && " + VALUE + " <= 100 && " + VALUE + " >= 50")
                ;
        tableEnv.toAppendStream(result, Row.class).print();

        System.out.println("Execution plan ........................ ");
        System.out.println(env.getExecutionPlan());
        System.out.println("Plan explaination ........................ ");
        System.out.println(tableEnv.explain(result));
        System.out.println("........................ ");
        System.out.println("NormRuleSet: " + cc.getNormRuleSet().isDefined());
        System.out.println("LogicalOptRuleSet: " + cc.getLogicalOptRuleSet().isDefined());
        System.out.println("PhysicalOptRuleSet: " + cc.getPhysicalOptRuleSet().isDefined());
        System.out.println("DecoRuleSet: " + cc.getDecoRuleSet().isDefined());
        // @formatter:on

        env.execute("HelloWorldCalcitePlanTableAPI");
    }
}

public class MyDataStreamRule extends RelOptRule {
    public static final MyDataStreamRule INSTANCE = new MyDataStreamRule(operand(DataStreamRel.class, none()), "MyDataStreamRule");

    public MyDataStreamRule(RelOptRuleOperand operand, String description) {
        super(operand, "MyDataStreamRule:" + description);
    }

    public MyDataStreamRule(RelBuilderFactory relBuilderFactory) {
        super(operand(DataStreamRel.class, any()), relBuilderFactory, null);
    }

    public void onMatch(RelOptRuleCall call) {
        DataStreamRel dataStreamRel = (DataStreamRel) call.rel(0);
        System.out.println("======================= MyDataStreamRule.onMatch ====================");
    }
}
public class MyFilterReduceExpressionRule extends RelOptRule {

    public static final MyFilterReduceExpressionRule FILTER_INSTANCE = new MyFilterReduceExpressionRule(
            operand(LogicalFilter.class, none()), "MyFilterReduceExpressionRule");

    public MyFilterReduceExpressionRule(RelOptRuleOperand operand, String description) {
        super(operand, "MyFilterReduceExpressionRule:" + description);
    }

    public MyFilterReduceExpressionRule(RelBuilderFactory relBuilderFactory) {
        super(operand(LogicalFilter.class, any()), relBuilderFactory, null);
    }

    public MyFilterReduceExpressionRule(RelOptRuleOperand operand) {
        super(operand);
    }

    @Override
    public void onMatch(RelOptRuleCall arg0) {
        System.out.println("======================= MyFilterReduceExpressionRule.onMatch ====================");
    }
}
4

0 回答 0