1

I want to Join three or more data streams or tables on a given key and a common window. however I don't know how to correctly write the code. The official document https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/ give the example below, however it just join two data streams , so how to join three or more data streams on a given key and a common window?

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

I tried to figure out that I join the two data streams firstly with common window, and use the result data stream to join third data stream with common window? However it seems the semantic of event time for these three data streams would be changed when we set the TimeCharacteristic to event time.

==================

The same question for FlinK Table API and SQL,how to join three or more tables on a given key and a common window? The official document https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html just give the example below for single table.

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

I tried to write the SQL like below to join three tables on a given key and a common window , however I don't think it is right.

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "   

So what's the correct way to join three or more datastreams /tables on a given key and a common window by datastrem API or Flink Table API/SQL ?

update at 6/16/2018 to make the question more clearly.

For the Flink SQL, what I needed , just like the Pseudocode below, is the join three tables with a common TumblingEventTimeWindow, that is to say the alternative version for DataStream API, however expressed by Flink SQL,also meaning join all events from three tables, which happened in the same TumblingEventTimeWindow.

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

It seems that join feature also mentioned in the following Flink design document: "Event-time tumbling-windowed Stream-Stream joins: Joins tuples of two streams that are in the same tumbling event-time window", I have no idea if the Flink SQL have implemented this type of Flink SQL join feature.

https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#

4

1 回答 1

1

很难对您的问题给出明确的答案,因为您需要的连接语义尚不清楚。DataStream API 的窗口化连接实现的语义不同于 Table API/SQL 的窗口化连接。

在 DataStream API 上,您可以简单地定义另一个连接,如下所示:

firstStream
  .join(secondStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})
  .join(thirdStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

由于 Flink 实现了标准 SQL,因此您可以像往常一样定义三个表的连接:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE

窗口范围 (A.ts BETWEEN B.ts - X AND B.ts + Y)可以根据需要定义。

于 2018-06-15T15:53:14.313 回答