1

我想用流口水做一些警报。下面是规则。但是当我触发 kieSession 时。流口水抛出异常。谁能告诉我为什么?

规则:

import   java.util.Map /n
declare Map 
  @role(event)
end

rule "sliding window time map" 

when
    $m:Map($b:this[\b\],$b>10) from entry-point \MyEntryPoint\
    accumulate(Map(this['a']>0  && c >8) over window:time( 1m ) from entry-point \MyEntryPoint\;$count:count();$count>1 )  
then
    System.out.println("alarm!!!!");
end 

例外:

线程“Thread-0”java.lang.ClassCastException 中的异常:org.drools.core.common.DefaultFactHandle 无法在 org.drools.core.reteoo.WindowNode.assertObject(WindowNode .java:168) 在 org.drools.core.reteoo.CompositeObjectSinkAdapter.propagateAssertObject(CompositeObjectSinkAdapter.java:387) 的 org.drools.core.reteoo.CompositeObjectSinkAdapter.doPropagateAssertObject(CompositeObjectSinkAdapter.java:502)。 reteoo.ObjectTypeNode.assertObject(ObjectTypeNode.java:288) at org.drools.core.reteoo.EntryPointNode.assertObject(EntryPointNode.java:251) at org.drools.core.common.NamedEntryPoint.insert(NamedEntryPoint.java:367)在 org.drools.core.common 的 org.drools.core.common.NamedEntryPoint.insert(NamedEntryPoint.java:286)。NamedEntryPoint.insert(NamedEntryPoint.java:139) at monitor.SlidingWindowMapWithEntryPoint$1.run(SlidingWindowMapWithEntryPoint.java:104) at java.lang.Thread.run(Thread.java:662)

我的代码:

SlidingWindowMapWithEntryPoint.java

package monitor;
import java.util.Map;
import org.drools.core.impl.InternalKnowledgeBase;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.EntryPoint;
import org.kie.internal.KnowledgeBaseFactory;
import org.kie.internal.builder.KnowledgeBuilder;
import org.kie.internal.builder.KnowledgeBuilderFactory;
import org.kie.internal.io.ResourceFactory;
import org.zeromq.ZMQ;
import com.alibaba.fastjson.JSON;

public class SlidingWindowMapWithEntryPoint {

    private static final String str =
            "import java.util.Map\n " +
            "declare Map \n"+
            "  @role(event)\n"+
            "end\n"+
            "rule \"sliding window time map\" \n"                              +
            "when \n"                                                         +
             "   $m:Map($b:this[\"b\"],$b>10) from entry-point \"MyEntryPoint\"\n"  + 
              "   accumulate(Map(this['a']>0  && c >8) over window:time( 1m ) from entry-point \"MyEntryPoint\";$count:count();$count>1 )\n"  +
            "then \n"                                                         +
            "    System.out.println(\"alarm!!!!\");  \n"                  +
            "end \n";

      public static void main(String[] args) {

          KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();
          config.setOption( EventProcessingOption.STREAM );
          KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
          kbuilder.add( ResourceFactory.newByteArrayResource(str.getBytes()),
                        ResourceType.DRL );
          System.out.println(kbuilder.getErrors());
          InternalKnowledgeBase kbase = (InternalKnowledgeBase) KnowledgeBaseFactory.newKnowledgeBase(config);
          kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
         //kiesession
          final KieSession ksession = kbase.newKieSession();
          new Thread(new Runnable() {
            public void run() {
                ZMQ.Context context = ZMQ.context(1);  
                ZMQ.Socket pull = context.socket(ZMQ.PULL);  
                pull.connect("tcp://*:5557");
                EntryPoint entryPoint = ksession.getEntryPoint("MyEntryPoint");
                while (true) {
                    String ss= pull.recvStr();
                    if(ss!=null){
                        Map<String, Object> record=JSON.parseObject(ss, Map.class );
                        entryPoint.insert(record);
//                      ksession.insert(record);
                    }else{
                        System.out.println(new String("---sssssssssssss---"));
                    }
                } 
            }
          }).start();
        //fire!
          ksession.fireUntilHalt();

      }
}

推送.java

包监视器;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.zeromq.ZMQ;

import com.alibaba.fastjson.JSON;

public class Push {
    //create an event
    public static void main(String args[]) throws InterruptedException {  

        ZMQ.Context context = ZMQ.context(1);  
        ZMQ.Socket push  = context.socket(ZMQ.PUSH); 
        push.bind("tcp://*:5557");  

        Map<String, Object> mapa = new HashMap<String, Object>();
        for (;;) {  
            Random r=new Random();
            int a=r.nextInt(20);
            int b=r.nextInt(20);
            int c=r.nextInt(20);
            mapa.put("a", Integer.valueOf(a));
            mapa.put("b", Integer.valueOf(b));
            mapa.put("c", Integer.valueOf(c));
            boolean res=push.send(JSON.toJSONString(mapa));
            System.out.println(res);
            Thread.sleep(1000);
        }  
//        push.close();  
//        context.term();  
    }  
}
4

0 回答 0