1

我在 Eclipse 中运行 flink,Maven 已经获取了必要的 jar。我的机器有一个具有八个内核的处理器,我必须编写的流应用程序从它的输入中读取行并计算一些统计数据。

当我在我的机器上运行程序时,我希望 flink 使用 CPU 的所有内核作为线程良好的代码。但是,当我观察核心时,我发现只使用了一个核心。我尝试了很多东西,最后一次尝试留下了以下代码,即设置环境的并行度。我还尝试单独为流设置它等等。

public class SemSeMi {


    public static void main(String[] args) throws Exception {
        System.out.println("Starting Main!");

        System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
                .getLocalFileSystem().getWorkingDirectory());

        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        env.setParallelism(8);

        env.socketTextStream("localhost", 9999).flatMap(new SplitterX());

        env.execute("Something");       
    }

    public static class SplitterX implements
            FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // Do Nothing!

        }
    }
} 

我使用 netcat 为程序提供了数据:

 nc -lk 9999 < fileName

问题是如何使程序在本地扩展并使用所有可用的内核?

4

1 回答 1

2

您不必明确指定并行度。使用默认设置运行的作业将自动将并行度设置为可用内核的数量。

在您的情况下,源代码将以并行方式运行,1因为无法分发从套接字读取。但是,对于该flatMap操作,系统将实例化 8 个实例。如果您打开日志记录,那么您也会看到它。flatMap现在输入数据以循环方式分配给任务。每个flatMap任务都由一个单独的线程执行。

我怀疑您仅在单个核心上看到负载的原因是因为SplitterX它不做任何工作。尝试以下代码计算每个字符的数量,String然后将结果打印到控制台:

public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
        .getLocalFileSystem().getWorkingDirectory());

    StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print();

    env.execute("Something");
}

public static class SplitterX implements
    FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence,
                        Collector<Tuple2<String, Integer>> out) throws Exception {
        out.collect(Tuple2.of(sentence, sentence.length()));

    }
}

每行开头的数字告诉您哪个任务打印了结果。

于 2016-01-15T16:30:10.327 回答