5

我尝试在 Apache Zeppelin 中运行以下简单命令。

%flink

var rabbit = env.fromElements(
"ARTHUR:  What, behind the rabbit?",
"TIM:  It is the rabbit!", 
"ARTHUR:  You silly sod!  You got us all worked up!",
"TIM:  Well, that's no ordinary rabbit.  That's the most foul, cruel, and bad-tempered rodent you ever set eyes on.",
"ROBIN:  You tit!  I soiled my armor I was so scared!", 
"TIM:  Look, that rabbit's got a vicious streak a mile wide, it's a killer!")

var counts = rabbit.flatMap { _.toLowerCase.split("\\W+")}.map{ (_,1)}.groupBy(0).sum(1) 

counts.print()

我尝试在笔记本上打印出结果。但不幸的是,我只得到以下输出。

rabbit: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@37fdb65c
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@1efc7158
res103: org.apache.flink.api.java.operators.DataSink[(String, Int)] = DataSink '<unnamed>' (Print to System.out)

如何将计数的内容溢出到 Zeppelin 的笔记本中?

4

2 回答 2

5

在 Zeppelin 中打印这种计算结果的方法是:

%flink
counts.collect().foreach(println(_))

//or one might prefer
//counts.collect foreach println 

输出:

(a,3)
(all,1)
(and,1)
(armor,1)
...
于 2015-09-07T03:33:06.830 回答
4

观察到行为的原因在于 Apache Zeppelin 和 Apache Flink 之间的相互作用。Zeppelin 捕获所有标准输出Console。但是,Flink 也会将输出打印到System.out,而这正是您调用counts.print(). bzz 的解决方案有效的原因是它使用Console.

我打开了一个 JIRA 问题 [1] 并打开了一个拉取请求 [2] 来纠正这种行为,以便您也可以使用counts.print().

于 2015-09-07T13:31:23.473 回答