问题标签 [flink-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
redis - 如何高效地将数据从 flink 管道写入 redis
我正在 Apache flink sql api 中构建管道。管道执行简单的投影查询。但是,我需要在查询之前和查询之后编写一次元组(确切地说是每个元组中的一些元素)。事实证明,我用来写入 redis 的代码会严重降低性能。即 flink 以非常小的数据速率产生背压。我的代码有什么问题以及如何改进。请有任何建议。
当我停止写redis之前和之后的表现非常好。这是我的管道代码:
附加部分:我尝试了第二个实现,使用 Jedis 批量编写 toredis 的过程功能。但是我收到以下错误。org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误。
下面是流程函数的实现:
/** * 使用进程函数写入redis */
apache-flink - 用于状态检查点的 Flink sql
当我使用 flink sql api 处理数据时。
重启app,sum
结果没有保存在checkpoint中,还是从1开始。
登录为:
1 2 3 ... ... 100
重新启动应用程序它仍然启动:1 2 3 ...
我认为总和值将存储在 checkpint 文件中,并且重新启动应用程序可以从 checkpint 读取最后一个结果,例如:
101 102 103 ... 120
apache-kafka - Flink 的嵌套输出
我正在使用 Flink SQL 处理 Kafka 流,其中每条消息都从 Kafka 中提取,使用 flink sql 处理并推回 kafka。我想要一个嵌套输出,其中输入是平坦的,输出是嵌套的。例如说我的输入是
并希望输出为
我尝试在这里搜索和一些类似的链接,但找不到。是否可以使用 Apache Flink SQL API 来做到这一点?如有必要,可以使用用户定义的函数,但希望避免这样做。
java - 为什么 Flink SQL 对所有表都使用 100 行的基数估计?
我不确定为什么在这个例子中没有正确评估逻辑计划。
我更深入地查看了 Flink 基本代码,并检查了当 calcite 评估/估计对象中查询的行数时。出于某种原因,对于任何表源,它总是返回100。
实际上在 Flink 中,在创建程序计划的过程中,对于每一个转换后的规则,它都被TableEnvironment .runVolcanoPlanner 称为VolcanoPlanner类 。计划者尝试通过调用RelMetadataQuery .getRowCount来优化和计算一些估计
我通过创建一个失败的测试来重现该错误,该测试应该将 0 断言为关系表“S”的行数,但它始终返回 100。
为什么会这样?有人对这个问题有答案吗?
apache-flink - 将 ROW() 用于嵌套数据结构
我已经成功地使用来自 flink-json 工件的 JsonRowSerializationSchema 来TableSink<Row>
使用 ROW 从 SQL 创建和输出 json。它非常适合发出平面数据:
现在,我正在尝试一个嵌套模式,它以一种奇怪的方式分解:
这是一个解析问题,但我对它为什么会发生感到困惑。col1 和 'ttt' 是 String 类型的表达式,应该是可替换的;但不知何故,解析器受到以下行的干扰,正如堆栈跟踪所说:
我错过了一些关于语法的东西吗?解析器试图做什么?我应该以另一种方式使用 ROW() 吗?
这是一个错误吗?
apache-flink - Flink 如何使用从 Avro 输入数据推断出的模式创建表
我在 Flink 数据集中加载了一个 Avro 文件:
以下是打印 DS 的结果:
现在我想从 DS Dataset 创建一个与 Avro 文件模式完全相同的表,我的意思是列应该是 N_NATIONKEY、N_NAME、N_REGIONKEY 和 N_COMMENT。
我知道使用这条线:
我可以创建一个表并设置列,但我希望从数据中自动推断列。可能吗?另外,我试过
但它使用架构创建了一个表:
apache-flink - 基于 Date 类型的字段过滤 Flink Table
我创建了一个表,其中有一个 Date 类型的字段,即f_date。我想要的查询的一部分基于f_date字段过滤表行。所以我做了以下事情:
和
然后我在所有两种情况下都得到了以下相同的错误:
我什至删除了单个 quatation 并尝试了:
我得到了错误:
最后我创建了一个 Date 对象并尝试:
我得到了错误:
在上述情况下,Flink 将日期识别为 String 或 Integer。我怎样才能以正确的格式给出日期!?
apache-flink - 使用 Flink Sql 选择 Top N Per Group
我正在使用 Flink SQL 来处理批处理案例。如何使用 FLINK SQL 获得每组的前 n 条记录?
java - 将 TemporalTableFunction 注册为函数时出现编译器错误
我正在关注 Flink 的Defining Temporal Table Function示例,编译器拒绝采用该代码:
我的编译器告诉我“TableEnvironment 类型中的方法 registerFunction(String, ScalarFunction) 不适用于参数 (String, TemporalTableFunction)”
我已经在这里和这里搜索了源代码,确实没有 registerFunction 具有 TemporalTableFunction 签名。只有 ScalarFunction 类型。我对它为什么会编译感到困惑;但他们确实对此进行了测试。
我只是无法导航 TemporalJoinITCase 从何处获取其 registerFunction 。
带有 scala 2.11 的 Flink 1.7.1,以及以下工件:flink-core、flink-java、flink-clients_、flink-streaming-java_、flink-table_、flink-streaming-scala_、flink-json、flink-runtime-web_ .
我不太清楚 Scala 是如何工作的。会不会是我从另一个神器中遗漏的一些特征?
最良好的问候!