2

下面的代码从套接字读取,但我没有看到任何输入进入工作。虽然我已经nc -l 1111运行并转储数据,但不确定为什么我的 Spark 作业无法从10.176.110.112:1111.

Dataset<Row> d = sparkSession.readStream().format("socket") 
                                    .option("host", "10.176.110.112")
                                    .option("port", 1111).load();
4

1 回答 1

2

下面的代码从套接字读取,但我没有看到任何输入进入工作。

好吧,老实说,您不会从任何地方阅读任何内容。您只描述了启动流式传输管道时要执行的操作。

由于您使用结构化流从套接字读取数据集,因此您应该使用start运算符来触发数据获取(并且仅在定义接收器之后)。

start(): StreamingQuery开始执行流式查询,随着新数据的到来,它将不断地输出结果到给定的路径。返回的 StreamingQuery 对象可用于与流交互。

start您定义流式传输数据的位置之前。它可以是 Kafka、文件、自定义流式接收器(可能使用foreach运算符)或控制台。

console在以下示例中使用接收器(又名格式)。我还使用 Scala 并将其重写为 Java 作为您的家庭练习。

d.writeStream.  // <-- this is the most important part
  trigger(Trigger.ProcessingTime("10 seconds")).
  format("console").
  option("truncate", false).
  start         // <-- and this
于 2017-07-01T16:24:10.497 回答