我想用流口水做一些警报。下面是规则。但是当我触发 kieSession 时。流口水抛出异常。谁能告诉我为什么?
规则:
import java.util.Map /n
declare Map
@role(event)
end
rule "sliding window time map"
when
$m:Map($b:this[\b\],$b>10) from entry-point \MyEntryPoint\
accumulate(Map(this['a']>0 && c >8) over window:time( 1m ) from entry-point \MyEntryPoint\;$count:count();$count>1 )
then
System.out.println("alarm!!!!");
end
例外:
线程“Thread-0”java.lang.ClassCastException 中的异常:org.drools.core.common.DefaultFactHandle 无法在 org.drools.core.reteoo.WindowNode.assertObject(WindowNode .java:168) 在 org.drools.core.reteoo.CompositeObjectSinkAdapter.propagateAssertObject(CompositeObjectSinkAdapter.java:387) 的 org.drools.core.reteoo.CompositeObjectSinkAdapter.doPropagateAssertObject(CompositeObjectSinkAdapter.java:502)。 reteoo.ObjectTypeNode.assertObject(ObjectTypeNode.java:288) at org.drools.core.reteoo.EntryPointNode.assertObject(EntryPointNode.java:251) at org.drools.core.common.NamedEntryPoint.insert(NamedEntryPoint.java:367)在 org.drools.core.common 的 org.drools.core.common.NamedEntryPoint.insert(NamedEntryPoint.java:286)。NamedEntryPoint.insert(NamedEntryPoint.java:139) at monitor.SlidingWindowMapWithEntryPoint$1.run(SlidingWindowMapWithEntryPoint.java:104) at java.lang.Thread.run(Thread.java:662)
我的代码:
SlidingWindowMapWithEntryPoint.java
package monitor;
import java.util.Map;
import org.drools.core.impl.InternalKnowledgeBase;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.EntryPoint;
import org.kie.internal.KnowledgeBaseFactory;
import org.kie.internal.builder.KnowledgeBuilder;
import org.kie.internal.builder.KnowledgeBuilderFactory;
import org.kie.internal.io.ResourceFactory;
import org.zeromq.ZMQ;
import com.alibaba.fastjson.JSON;
public class SlidingWindowMapWithEntryPoint {
private static final String str =
"import java.util.Map\n " +
"declare Map \n"+
" @role(event)\n"+
"end\n"+
"rule \"sliding window time map\" \n" +
"when \n" +
" $m:Map($b:this[\"b\"],$b>10) from entry-point \"MyEntryPoint\"\n" +
" accumulate(Map(this['a']>0 && c >8) over window:time( 1m ) from entry-point \"MyEntryPoint\";$count:count();$count>1 )\n" +
"then \n" +
" System.out.println(\"alarm!!!!\"); \n" +
"end \n";
public static void main(String[] args) {
KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();
config.setOption( EventProcessingOption.STREAM );
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add( ResourceFactory.newByteArrayResource(str.getBytes()),
ResourceType.DRL );
System.out.println(kbuilder.getErrors());
InternalKnowledgeBase kbase = (InternalKnowledgeBase) KnowledgeBaseFactory.newKnowledgeBase(config);
kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
//kiesession
final KieSession ksession = kbase.newKieSession();
new Thread(new Runnable() {
public void run() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket pull = context.socket(ZMQ.PULL);
pull.connect("tcp://*:5557");
EntryPoint entryPoint = ksession.getEntryPoint("MyEntryPoint");
while (true) {
String ss= pull.recvStr();
if(ss!=null){
Map<String, Object> record=JSON.parseObject(ss, Map.class );
entryPoint.insert(record);
// ksession.insert(record);
}else{
System.out.println(new String("---sssssssssssss---"));
}
}
}
}).start();
//fire!
ksession.fireUntilHalt();
}
}
推送.java
包监视器;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.zeromq.ZMQ;
import com.alibaba.fastjson.JSON;
public class Push {
//create an event
public static void main(String args[]) throws InterruptedException {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket push = context.socket(ZMQ.PUSH);
push.bind("tcp://*:5557");
Map<String, Object> mapa = new HashMap<String, Object>();
for (;;) {
Random r=new Random();
int a=r.nextInt(20);
int b=r.nextInt(20);
int c=r.nextInt(20);
mapa.put("a", Integer.valueOf(a));
mapa.put("b", Integer.valueOf(b));
mapa.put("c", Integer.valueOf(c));
boolean res=push.send(JSON.toJSONString(mapa));
System.out.println(res);
Thread.sleep(1000);
}
// push.close();
// context.term();
}
}