问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
177 浏览

scala - 在 flink 表中创建复合类型

我正在尝试在 Flink 中编写一个用户定义的标量函数,它接受多个表达式(任意数量的表达式)并将其组合成一个表达式。

来自 Spark 世界,我可以通过使用structwhich 返回一个Row类型并将其传递给 a来实现这一点udf,例如

我无法在 Flink 中找到等价物。我也想看看我是否可以写一个ScalarFunction接受任意数量的表达式,但找不到任何例子。

任何人都可以帮助指导我采用上述两种方法中的任何一种吗?谢谢!

请注意,我不能将其设为数组,因为每个表达式都可以是不同的类型(实际上,相同的值类型,但可以是数组或标量)。

0 投票
0 回答
77 浏览

cassandra - 在 flink 操作中终止对数据库的请求

我正在尝试与 Flink 和 Cassandra 合作。两者都是大规模并行环境,但我很难让它们一起工作。

现在我需要通过不同的令牌范围从 Cassandra 进行并行读取操作,并有可能在读取 N 个对象后终止查询

批处理模式更适合我,但 DataStreams 也是可能的。我尝试了 LongCounter(见下文),但它不会像我预期的那样工作。我没能和他们一起得到全球总和。仅本地值。

异步模式不是必需的,因为此操作 CassandraRequester 是在并行上下文中执行的,并行化约为 64 或 128。

这是我的尝试

在这种情况下是否可以终止查询?

0 投票
1 回答
634 浏览

apache-flink - 在 Yarn Cluser 中为 Flink Job 的 Flink Checkpoint 设置路径动态

我正在使用 Yarn 来运行 Flink 作业。对于每个 Flink 作业,我都在创建一个检查点。

我提交了一个在我的 Yarn 集群中运行的 Flink 作业。我有一个轮询作业,它检查 Yarn 上的作业是否失败并重新启动它。当再次提交作业时,Yarn 会为此 Flink 作业创建一个新的 application_id。如何配置重新提交的 Flink 作业以使用重新启动的 Flink 作业的检查点。

我已经 state.savepoints.dir = hdfs://localhost:9000/checkpoint/在 flink-conf.yaml 中设置了 conf`

在创建 Flink 作业时, streamExecutionEnvironment.setStateBackend(new FsStateBackend("hdfs://localhost:9000/checkpoint/uuid-job-1"));

当我进行此设置时,检查点保存在 conf 文件 ( hdfs://localhost:9000/checkpoint/) 中指定的路径中,而不是我在创建 Flink 作业时设置的路径中。

任何帮助将不胜感激。谢谢!

0 投票
1 回答
272 浏览

apache-flink - 在 Flink 中加入两个流并管理状态

我在 flink 中有两个 nifi 流源,我需要对这两个源执行连接。哪个是更好的解决方案?是加入DataStreams提供的api还是table api( https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/streaming.html#streaming-concepts) .?

另外如何维护流和连接的状态?flink 是否提供了任何构建解决方案。

谢谢,里尔万

0 投票
1 回答
109 浏览

hadoop-yarn - 在 Yarn 集群上提交 Flink 作业以从上一个保存点恢复其状态

我在 Yarn 集群中运行这些 Flink 作业。当 flink 作业失败或我重新启动它时,我希望 Flink 作业使用在它重新启动之前创建的保存点。如何使用此保存点重新启动 Flink 作业。我正在使用 Flink 1.4.2。

0 投票
1 回答
108 浏览

apache-flink - Flink SQL 是否支持并行运行投影

我正在运行一个带有多个预测的 SQL,每个都非常耗时,例如:

UDF1 和 UDF2 可能是耗时的函数,但是 Flink SQL 看起来像顺序运行 UDF1 和 UDF2,我的问题是 UDF1 和 UDF2 是否可以并行运行以减少延迟?

0 投票
1 回答
1724 浏览

apache-flink - How to join three or more datastreams /tables on a given key and a common window by datastrem API or Flink Table API/SQL?

I want to Join three or more data streams or tables on a given key and a common window. however I don't know how to correctly write the code. The official document https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/ give the example below, however it just join two data streams , so how to join three or more data streams on a given key and a common window?

I tried to figure out that I join the two data streams firstly with common window, and use the result data stream to join third data stream with common window? However it seems the semantic of event time for these three data streams would be changed when we set the TimeCharacteristic to event time.

==================

The same question for FlinK Table API and SQL,how to join three or more tables on a given key and a common window? The official document https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html just give the example below for single table.

I tried to write the SQL like below to join three tables on a given key and a common window , however I don't think it is right.

So what's the correct way to join three or more datastreams /tables on a given key and a common window by datastrem API or Flink Table API/SQL ?

update at 6/16/2018 to make the question more clearly.

For the Flink SQL, what I needed , just like the Pseudocode below, is the join three tables with a common TumblingEventTimeWindow, that is to say the alternative version for DataStream API, however expressed by Flink SQL,also meaning join all events from three tables, which happened in the same TumblingEventTimeWindow.

It seems that join feature also mentioned in the following Flink design document: "Event-time tumbling-windowed Stream-Stream joins: Joins tuples of two streams that are in the same tumbling event-time window", I have no idea if the Flink SQL have implemented this type of Flink SQL join feature.

https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#

0 投票
1 回答
145 浏览

apache-flink - Flink 1.5 中的 Batch Table API 问题 - 抱怨需要 Streaming API

我正在尝试使用 Flink 1.5.0 创建一个面向批处理的 Flink 作业,并希望使用 Table 和 SQL API 来处理数据。我的问题是尝试创建 BatchTableEnviroment 我收到编译错误

BatchJob.java:[46,73] 无法访问 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

造成于

据我所知,我不依赖流媒体环境。我的代码如下所示。

我的 pom 依赖项如下

请有人可以帮助我了解 Streaming API 的依赖关系是什么以及为什么我需要它来进行批处理作业?非常感谢您的帮助。奥利弗

0 投票
1 回答
1025 浏览

apache-flink - Apache Flink:如何使用 Table API 对每 n 行进行分组?

最近我正在尝试使用 Apache Flink 进行快速批处理。我有一个带有 column:value 和不相关索引列的表

基本上我想计算每 5 行值的平均值和范围。然后我将根据我刚刚计算的平均值计算平均值和标准差。所以我想最好的方法是使用Tumblewindow.

看起来像这样

但我不知道该写什么.on()。我试过"proctime"了,但它说没有这样的输入。我只希望它按从源读取的顺序分组。但它必须是时间属性,所以我不能使用"f2"- 索引列也作为排序。

我是否必须添加时间戳才能执行此操作?批处理中是否有必要,它会减慢计算速度吗?解决这个问题的最佳方法是什么?

更新: 我尝试在表格 API 中使用滑动窗口,但它得到了异常。

例外是:

线程“主”java.lang.UnsupportedOperationException 中的异常:当前不支持在事件时间计算滑动组窗口。

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456)

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139)

...

这是否意味着 Table API 不支持滑动窗口?如果我没记错的话,DataSet API 中没有窗口函数。那么如何在批处理中计算移动范围?

0 投票
1 回答
768 浏览

sql - 通过两列JOIN单表flink tableapi

我有一个包含数据的表,我需要通过两个字段进行连接。

我写了一个请求,但它不起作用

代码是

我得到以下异常