0

如 Flink 文档中所述,我可以通过使用打开本地套接字从文本服务器读取文本输入

amar@admin:~$ nc -l 12345

然后在 Flink 程序上使用

DataStream<String> text = env.socketTextStream("localhost", 12345);

text.print();

env.execute();

但是,由于我正在模拟一些场景,所以我想从 VM(然后是各种 VM)获取数据流并将其发送到在主机操作系统上运行的 CEP 程序。

所以,我已经安装了 VM,使用 Vagrant 和 SSH 到它使用vagrant ssh

  1. 来宾操作系统的主机名是精确 64

  2. 使用 ifconfig = 10.0.2.15 的 IP 地址

现在,我现在想做的是看看我是否可以从 VM 发送一些数据并在 Flink 程序中接收它,就像我在本地环境中所做的一样。

我通过使用在来宾操作系统上打开了 Netcat 套接字

vagrant@precise64:~$ nc -l 12345

我试图通过使用在主机程序上接收它,但出现错误

DataStream<String> text = env.socketTextStream("precise64", 12345);

text.print();

env.execute();

我也试过上面的precision64@10.0.2.15,但我认为我做错了。

任何想法,我应该如何将数据流从 VM 发送到主机 Flink 程序

欢迎提出建议,在此先感谢!

4

1 回答 1

1

你可以试试这个:

1.程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

2.运行上述程序后,就可以启动了。

nc -lk 9999

这将起作用。

于 2017-08-16T05:31:01.050 回答