3

我尝试在 Flink 上运行 cep,并从本地路径获取测试数据,一开始,我将文件大小设置为 1G 左右,它运行良好。但是当我将文件大小设置为 10G 时,出现了下面的问题.

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Failure happened in filter function.
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processElement(AbstractCEPPatternOperator.java:69)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processElement(KeyedCEPPatternOperator.java:147)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared buffer entry with key: State(send, Normal, [
StateTransition(TAKE, done, with filter),
StateTransition(IGNORE, send),
]), value: cep.customer.Event_d@bd2b81a4 and timestamp: 1461851418716. This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there.
at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
... 9 more

这是我的代码。谢谢帮助

public class ReadFromFile {


public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
    env.setParallelism(6);

    File dir = new File(System.getProperty("user.dir") + "/cep");
    ///data/tools/devlop/idea/flink-test/cep

    System.out.println(dir.getPath());
    if (!dir.exists()) {
        dir.mkdir();
    }

    //read data from local file
    //it looks like below
    //016-04-20T00:04:35.155Z","10.170.236.226","<p2-sidekiq> 2016-04-20T00:04:31.415Z 4982 TID-oxvsomclk AlterationWorker JID-34683abcb587e008153ce458 INFO: start"

    final DataStream<String> messageStream =env.readTextFile("file://"+dir);
    // filter data
    DataStream<String> da = messageStream.filter(new FilterFunction<String>() {
        @Override
        public boolean filter(String value) throws Exception {
            if (value !=null && value.contains(" JID-") && value.contains("INFO: ")) {
                return true;
            }
            return false;
        }
    });

    // format data
    DataStream<Tuple4<String, String, String,String>> t3 = da.map(new MapFunction<String, Tuple4<String, String,String, String>>() {
        @Override
        public Tuple4<String,String, String, String> map(String value) throws Exception {

            String[] info = value.split("INFO: ");
            if (info.length == 2) {
                String[] jid = info[0].split(" JID-");
                if (jid.length == 2){
                    return new Tuple4<String, String, String,String>(jid[0],jid[1].trim(), info[1],"");
                }
            }
            return null;
        }
    });


    //make tuple to event
    DataStream<Event_d> input = t3.map(new MapFunction<Tuple4<String, String,String, String>, Event_d>() {
        @Override
        public Event_d map(Tuple4<String, String,String, String> value) throws Exception {
            return new Event_d(value.f0, value.f1, value.f2,value.f3);
        }
    }).keyBy(new KeySelector<Event_d, String>() {

        @Override
        public String getKey(Event_d value) throws Exception {
            return value.getName();
        }
    });

    // design pattern contains (start --> SendThirdPartWorker --> done)
    Pattern<Event_d, ?> pattern= Pattern.<Event_d>begin("start").where(
            new FilterFunction<Event_d>() {
                @Override
                public boolean filter(Event_d value) throws Exception {
                    return value.getPrice().contains("start");//&& MD5Util.MD5(value.getMd5())==;
                }
            }).next("send").where(new FilterFunction<Event_d>() {
                 @Override
                     public boolean filter(Event_d value) throws Exception {
                     return value.getPrice().contains("SendThirdPartWorker");//&& jidMap.get(value.getName())==value.getName();
                }
    }).followedBy("done").where(new FilterFunction<Event_d>() {
        @Override
        public boolean filter(Event_d value) throws Exception {
            return value.getPrice().contains("done") ;//&& a;
        }
    }).within(milliseconds(1000));
    final long mi1 = new Date().getTime();


    DataStream<String> result = CEP.pattern(input, pattern).select(
            new PatternSelectFunction<Event_d, String>() {
        @Override
        public String select(Map<String, Event_d> pattern) {
            StringBuilder builder = new StringBuilder();

            builder.append(dataComt(new Date().getTime(),mi1)+" "+pattern.get("start").getName())
                    .append(" -- ").append(pattern.get("send").getPrice());
                    //.append("--").append(pattern.get("done").getPrice());

            return builder.toString();
        }
    });

    result.writeAsText(dir + "result", FileSystem.WriteMode.OVERWRITE);

    env.execute("Read from Kafka custom");
}

public static String dataComt(long current,long last) {
    long c = (current-last)/1000;
    return "\"read "+c+"s \"";
 }
}
4

0 回答 0