我在 Eclipse 中运行 flink,Maven 已经获取了必要的 jar。我的机器有一个具有八个内核的处理器,我必须编写的流应用程序从它的输入中读取行并计算一些统计数据。
当我在我的机器上运行程序时,我希望 flink 使用 CPU 的所有内核作为线程良好的代码。但是,当我观察核心时,我发现只使用了一个核心。我尝试了很多东西,最后一次尝试留下了以下代码,即设置环境的并行度。我还尝试单独为流设置它等等。
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
我使用 netcat 为程序提供了数据:
nc -lk 9999 < fileName
问题是如何使程序在本地扩展并使用所有可用的内核?