0

如何将 Spark Streaming 配置为从 Java 中的 Flume 接收输入数据?(我被困在代码中)这是我的代码:

public static void main(String[] args)
{
Duration batchInterval = new Duration(2000);
System.out.println("-Starting Spark Context"); 
System.out.println("-Spark_home:" + System.getenv("SPARK_HOME")); 
JavaStreamingContext sc = new JavaStreamingContext(master, 
"FlumeEventCount", batchInterval, 
System.getenv("SPARK_HOME"), "/home/cloudera/SparkOnALog.jar"); 
System.out.println("-Setting up Flume Stream: " + host + " " + port); 
JavaDStream<SparkFlumeEvent> flumeStream  


=FlumeUtils.createStream(sc,host, port); 
 flumeStream.count().print(); 
 flumeStream.count().map(new Function<Long, String>() 
 { 
 public String call(Long in) { 
return "????????????? Received " + in + " flume events."; 
} 
}).print(); 
System.out.println("-Starting Spark Context"); 
sc.start(); 
System.out.   println("-Finished"); 
} 
}
4

0 回答 0