0

我有以下成功部署的 EPL 模块:

module context;

import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;

@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);

@Name('createSchemaEvent')
create schema TickEvent as TickEvent; 

@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;


@Name('compareStocks') 
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,     
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and         
B.stockCode =  context.initEvent.secondStock
);

我的听众/订阅者有问题。根据我的检查和调试,这些类没有任何问题,注释有效,它们在部署时附加到语句中,但它们都没有从事件中收到任何更新。

这是我的订阅者,我只想打印它已收到:

package subscribers;
import java.util.Map;

public class MySubscriber {

public void update(Map row) {
    System.out.println("got it");
    }
}

我以前有相同的模块,没有任何上下文分区,然后订阅者工作没有问题。添加上下文后,它停止了。

到目前为止,我已经尝试过:

  1. 检查语句是否附加了任何订阅者/侦听器(确实如此)
  2. 检查他们的名字
  3. 删除注释并在部署后在 Java 代码中手动设置它们(同样的事情 - 它们附加,我可以检索它们的名称但仍然没有收到更新)
  4. 调试订阅者类。程序要么根本不去那里停止在断点处,要么我得到一个错误(缺少行号属性错误 - (“不能在那里放置断点”我试图修复无济于事)

知道什么可能导致这种情况,或者将订阅者设置为具有上下文分区的语句的最佳方法是什么?

这是先前问题的延续,此处已解决 -创建 Esper 的 epl 实例

编辑:以我使用的格式和 EPL 在线工具格式发送事件:

我首先从用户那里得到要遵循的对:

    System.out.println("First stock:"); 
    String first = scanner.nextLine();
    System.out.println("Second stock:"); 
    String second = scanner.nextLine();
    System.out.println("Difference:"); 
    double diff= scanner.nextDouble();
    InitEvent init = new InitEvent(first, second, diff);

之后我有一个引擎线程不断发送事件,但在它开始之前 InitEvents 是这样发送的:

@Override
public void run() {

    runtime.sendEvent(initEvent);   

    while (contSimulation) {

        TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
        runtime.sendEvent(tick1);

        TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
        runtime.sendEvent(tick2);

        TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
        runtime.sendEvent(tick3);

        TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
        runtime.sendEvent(tick4);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();

    }

} 

我以前没有使用过在线工具,但我认为我可以使用它。这是模块文本:

module context; 

create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);

create context TwoStocksContext
initiated by InitEvent as initEvent;

context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice, 
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and 
B.stockCode =  context.initEvent.secondStock
);

以及事件的顺序:

InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}

TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}

我使用它们的结果:

At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH', 
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH', 
b_stockCode='GOO'}

如果我让第二组事件在 YAH/GOO 之间的差异小于 5,我只能从第一对得到输出,这是有道理的。这是,我认为它应该做的。

如果需要,这两个方法读取并处理 EPL 模块的注释(我自己没有编写它们,它们取自可在此处找到的 coinTrader Context 类 - https://github.com/timolson/cointrader/ blob/master/src/main/java/org/cryptocoinpartners/module/Context.java):

private static Object getSubscriber(String className) throws Exception {

    Class<?> cl = Class.forName(className);
    return cl.newInstance();
}

private static void processAnnotations(EPStatement statement) throws Exception {

    Annotation[] annotations = statement.getAnnotations();
    for (Annotation annotation : annotations) {
        if (annotation instanceof Subscriber) {

            Subscriber subscriber = (Subscriber) annotation;
            Object obj = getSubscriber(subscriber.className());
            System.out.println(subscriber.className());
            statement.setSubscriber(obj);

        } else if (annotation instanceof Listeners) {

            Listeners listeners = (Listeners) annotation;
            for (String className : listeners.classNames()) {
                Class<?> cl = Class.forName(className);
                Object obj = cl.newInstance();
                if (obj instanceof StatementAwareUpdateListener) {
                    statement.addListener((StatementAwareUpdateListener) obj);
                } else {
                    statement.addListener((UpdateListener) obj);
                }
            }


        }
    }
}
4

1 回答 1

0

好吧,经过一个月的努力,我终于解决了。如果将来有人遇到类似的问题,这就是问题所在。epl 在在线工具中运行良好,但在我的代码中却不行。最终,我发现初始事件没有触发,因此没有创建上下文分区,因此订阅者和侦听器没有收到任何更新。我的错误是我触发了 POJO InitEvent,但是上下文使用的事件是通过创建模式在 EPL 模块中创建的。我不知道我在想什么,现在它没有工作是有道理的。因此,我在 Java 中触发的事件不是上下文使用的事件。我的解决方案只在 EPL 内。由于我不知道是否可以触发在模块中创建的 Java 事件,

@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference 
double);

@Name('insertInitEvent')
insert into StartEvent 
select * from InitEvent; 

其他一切都保持不变,Java 代码也是如此。

于 2018-03-13T06:54:38.577 回答