3

我是 Esper 的新手,我想得到一些帮助。我已经设法将 Esper 与 CSV 文件一起使用,但现在我需要使用 Java 对象作为通过套接字发送的事件,而且我在 Internet 上找不到简单的示例来用作指南。

有没有人基于它们的一些简单的例子?

无论如何,我把我正在努力工作的代码放在这里。当我运行它时没有任何反应,似乎套接字连接不起作用。

服务器类(它还包含事件类)。应该发送事件:

import java.io.* ;
import java.net.* ;

class Server {

static final int PORT=5002;

    public Server( ) {
        try {
            ServerSocket skServer = new ServerSocket( PORT );
            System.out.println("Listening at " + PORT );
            Socket skClient = skServer.accept();        
            System.out.println("Serving to Esper");
            OutputStream aux = skClient.getOutputStream();
            ObjectOutputStream flux = new ObjectOutputStream( aux );
            int i = 0;
            while (i<10) {
                flux.writeObject( new MeteoEvent(i,"A") );
                i++;
                }
            flux.flush();
            skClient.close();
            System.out.println("End of transmission");
            } catch( Exception e ) {
            System.out.println( e.getMessage() );
        }
    }

    public static void main( String[] arg ) {
        new Server();
    }

    class MeteoEvent{

        private int sensorId;
        private String GeoArea;

        public MeteoEvent() {
        }

        public MeteoEvent(int sensorid, String geoarea) {
            this.sensorId = sensorid;
            this.GeoArea = geoarea;
        }

        public int getSensorId() {
            return sensorId;
        }

        public void setSensorId(int sensorId) {
            this.sensorId = sensorId;
        }

        public String getGeoArea() {
            return GeoArea;
        }

        public void setGeoArea(String geoArea) {
            GeoArea = geoArea;
        }   
    }
}

还有基于 Esper 的课程。

import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.event.map.MapEventBean;
import com.espertech.esperio.socket.EsperIOSocketAdapter;
import com.espertech.esperio.socket.config.ConfigurationSocketAdapter;
import com.espertech.esperio.socket.config.DataType;
import com.espertech.esperio.socket.config.SocketConfig;

public class Demo {

    public static class CEPListener implements UpdateListener {

        private String tag;
        public CEPListener (String tag)
        {
            this.tag = tag;
        }

public static void main(String[] args) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();

        Map<String, Object> eventProperties = new HashMap<String, Object>();
        eventProperties.put("sensorId", int.class);
        eventProperties.put("GeoArea", String.class);
        configuration.addEventType("MeteoEvent", eventProperties);

        ConfigurationSocketAdapter socketAdapterConfig = new ConfigurationSocketAdapter();

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setDataType(DataType.OBJECT);
        socketConfig.setPort(5002);
        socketAdapterConfig.getSockets().put("MeteoSocket", socketConfig);

        EPServiceProvider cepService = EPServiceProviderManager.getProvider("MeteoSocket",configuration);

        EPRuntime cepServiceRT = cepService.getEPRuntime();

        EPAdministrator cepAdmin = cepService.getEPAdministrator();

        EsperIOSocketAdapter socketAdapter = new EsperIOSocketAdapter (socketAdapterConfig, "MeteoSocket");
        socketAdapter.start();

        EPStatement stmt = cepAdmin.createEPL("insert into JoinStream select * from MeteoEvent");

        EPStatement outputStatementX = cepAdmin.createEPL("select * from JoinStream");

        outputStatementX.addListener(new CEPListener("JS"));

        cepService.initialize();

        Object lock = new Object();
        synchronized (lock)
        {
                lock.wait();
         }
}

如果有人需要一些时间来帮助我,请提前非常感谢。

4

1 回答 1

4

问题解决了!Esper 开发人员列表非常有用。我通过位于此处的测试类学习了如何使用 Esper + 套接字

此致!

于 2012-10-03T08:26:38.237 回答