2

我们将 hazelcast 2.5 作为集群运行,以通过应用程序复制对象。目前有 1 个生产者和 3 个消费者。来自 writer 的更新高达每分钟 10000 次,消费者每秒读取一次,以在集群地图中搜索更新。writer 是一个 java 原生应用程序,在带有 java 7 的 Debian 8 服务器下运行。hazelcast 服务器位于同一个服务器上机器作为生产者,但在独立的应用程序上。所有的消费者都是在 Debian 8 机器下运行 java 7 的 tomcat 7 应用程序。我们面临的问题是,平均大约 4% 的集群数据到达消费者端,在共享地图中出现乱码。生产者的代码如下:

        Long imei;
    Integer clase;
    HazelCastobjGpsData datoGps;
    HazelCastobjTelemetria datoTelemetria;
    Set<HazelCastpkQueue> keys = HazelCast.tmpQueue.keySet();
    for (Iterator<HazelCastpkQueue> i = keys.iterator(); i.hasNext();) {
        HazelCastpkQueue current = i.next();
        clase = current.getClase();
        imei = current.getImei();
        if (clase == CLASE_GPS) // GPS
        {
            try {
                HazelCast.clienteHC.getLock(imei).lock();

                datoGps = (HazelCastobjGpsData) HazelCast.tmpQueue
                        .remove(current);
                HazelCast.instMapGPS.put(current.getImei(), datoGps);
                HazelCast.instQueue.put(new HazelCastpkQueue(imei,
                        CLASE_GPS), CLASE_GPS);
            } catch (Throwable t) {
                System.out.println("Error HazelCast Gps Update: " + imei
                        + " :" + t.getMessage());
                if (!HazelCast.clienteHC.isActive()) {
                    System.out.println("Hazelcast no activo");
                }
            } finally {
                HazelCast.clienteHC.getLock(imei).unlock();
            }
        } else // Telemetria
        {
            try {
                HazelCast.clienteHC.getLock(imei).lock();
                datoTelemetria = (HazelCastobjTelemetria) HazelCast.tmpQueue
                        .remove(current);
                HazelCast.instMapTelemetria.put(new HazelCastpkQueue(imei,
                        clase), datoTelemetria);
                HazelCast.instQueue.put(new HazelCastpkQueue(imei, clase),
                        clase);
            } catch (Throwable t) {
                System.out.println("Error HazelCast Telemetria Update: "
                        + imei + " :" + t.getMessage());
                if (!HazelCast.clienteHC.isActive()) {
                    System.out.println("Hazelcast no activo");
                }
            } finally {
                HazelCast.clienteHC.getLock(imei).unlock();
            }
        }
    }

其中 HazelCastobjGpsData datoGps; 是消费者要救援的对象,HazelCast.instMapGPS.put(current.getImei(), datoGps); 是有问题的地图。它有一把钥匙。

服务器的代码如下:

public class HazelServer {
@SuppressWarnings("unused")
private static ConcurrentMap<Long,HazelCastobjGpsData> instMapGPS;
@SuppressWarnings("unused")
private static ConcurrentMap<HazelCastpkQueue,HazelCastobjTelemetria> instMapTelemetria;
@SuppressWarnings("unused")
private static ConcurrentMap<HazelCastpkQueue,Integer> instQueue;
@SuppressWarnings("unused")
private static BlockingQueue<HazelCastpkTisPanicoQueue> tisPanicQueue;

public static void main(final String[] args) {
    System.out.println("HazelCast Server init 03/03/2015");
    init();
    System.out.println("OK");
}

private static void init() {
    try {
        HazelcastInstance hcServer = Hazelcast.newHazelcastInstance(null);
        instMapGPS = hcServer.getMap("gps");
        instMapTelemetria = hcServer.getMap("telemetria");
        instQueue = hcServer.getMap("colagpstele");
        tisPanicQueue = hcServer.getQueue("cola_panico_tis");

    } catch (Throwable t) {
        System.out.println("Error al tratar de obtener mapas: " + t.getMessage());
    }
}

}

消费者代码如下

public static void fetchHazelCast() {

try {
    if (GetHazelCast.isHazelcastActive()) {
    double errores = 0;
    double count=0; 
    Set<HazelCastpkQueue> keys = GetHazelCast.hcQueue.keySet();
    for (final Iterator<HazelCastpkQueue> i = keys.iterator(); i.hasNext();) {
        HazelCastpkQueue currentKey = i.next();
        if (currentKey.getClase() == CLASE_GPS) // GPS Base
        {
        if(Realtime.newGPS(GetHazelCast.hcGPS.get(currentKey.getImei()))){
            errores++;
        }
        count++;
        } else // Telemetria
        {
        Realtime.newTelemetria(GetHazelCast.hcTelemetria.get(currentKey));
        }
        //count++;

        GetHazelCast.hcQueue.remove(currentKey);


        currentKey = null;
    }
    /*ReqInit.logger.warn("Errores "+(double)errores);
    ReqInit.logger.warn("Cantidad "+(double)count);
    ReqInit.logger.warn("Porcentaje "+(double) ((((double)errores)*100.0f)/(double)count));*/
    if(count>0){

            double promedio = (double) ((((double)errores)*100.0f)/(double)count);
            CloudWatchSubmit.sendMetric("metricasreq", "errores",  errores);
            CloudWatchSubmit.sendMetric("metricasreq", "cantidad",  count);
            CloudWatchSubmit.sendMetric("metricasreq", "erroresporcentaje", promedio);
    }

    keys = null;
    } else {
    System.out.println("Hazelcast no activo...");
    GetHazelCast.contectarHZRealTime();
    }
} catch (Exception e) {
    e.printStackTrace();
}
}

请注意,消费者只取出最近更新的数据,这样做是因为有一个更新了键的映射,一旦数据出来,键就会从该映射中删除。

以下是对象的代码

public class HazelCastobjGpsData implements DataSerializable {
    private static final long serialVersionUID = 1L;
    private Calendar utc;
    private Calendar lastupdate;
    private String now = "n/a";
    private double latitud = 0.0, longitud = 0.0, altitud = 0.0,
            velocidad = 0.0, cog = 0.0, nsat = 4.0, hdop = 2.0, adc1 = -200,
            adc2 = -200, adc3 = -200, adc4 = -200, bateria = -1;
    private int ignicion, io1 = -1, io2 = -1, io3 = -1, io4 = -1, power = -1,
            antirobo = -1, panico = -1;
    private long imei = 0L, driverId = -1;
    private boolean valid = true;
    private long odometro = -1L;
    private long horometro = -1L;
    private String ip = "N/A";

    private long ibutton2 = -1;
    private long ibutton3 = -1;
    private long ibutton4 = -1;
    private long odometroInterno = -1L;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    private double anterior_lat = 0.0;
    private double anterior_lon = 0.0;
    private double anterior_sog = 0;
    private double anterior_cog = 0;
    private double anterior_ignicion = 0;
    private long anterior_odometro = 0;
    private long anterior_horometro = 0;
    private long delta_time = 0;
    private double delta_dist = 0;

    private long trailerId = -1L;
    private long tripTime = 0L;
    private long horometroInterno = 0L;

    public long getHorometroInterno() {
        return horometroInterno;
    }

    public void setHorometroInterno(long horometroInterno) {
        this.horometroInterno = horometroInterno;
    }

    public void setAnterior(HazelCastobjGpsData anterior) {
        this.setDelta_dist(Math.round((new Geo(this.latitud, this.longitud)
                .distanceKM(new Geo(anterior.getLatitud(), anterior
                        .getLongitud()))) * 1000));
        this.setDelta_time((this.getUTC().getTimeInMillis() - anterior.getUTC()
                .getTimeInMillis()) / 1000);
        this.setAnterior_cog(anterior.getCOG());
        this.setAnterior_sog(anterior.getVelocidad());
        this.setAnterior_lat(anterior.getLatitud());
        this.setAnterior_lon(anterior.getLongitud());
        this.setAnterior_ignicion(anterior.getIgnicion());
    }

    private boolean fromDB = false;

    private int evento = 0; // Por tiempo, curva u otro

    public Calendar getUTC() {
        return utc;
    }

    public String getNow() {
        return now;
    }

    public double getLatitud() {
        return latitud;
    }

    public double getLongitud() {
        return longitud;
    }

    public double getAltitud() {
        return altitud;
    }

    public double getVelocidad() {
        return velocidad;
    }

    public double getCOG() {
        return cog;
    }

    public String getCOGtoString() {
        String txtDireccion = "";
        if ((this.cog >= 0) && (this.cog <= 5)) {
            txtDireccion = "Norte";
        } else if ((this.cog > 5) && (this.cog <= 85)) {
            txtDireccion = "Noreste (NorOriente)";
        }

        else if ((this.cog > 85) && (this.cog <= 95)) {
            txtDireccion = "Este (Oriente)";
        } else if ((this.cog > 95) && (this.cog <= 175)) {
            txtDireccion = "Sureste (SurOriente)";
        }

        else if ((this.cog > 175) && (this.cog <= 185)) {
            txtDireccion = "Sur";
        }

        else if ((this.cog > 185) && (this.cog <= 265)) {
            txtDireccion = "Suroeste (SurPoniente)";
        } else if ((this.cog > 265) && (this.cog <= 275)) {
            txtDireccion = "Oeste (Poniente)";
        } else if ((this.cog > 275) && (this.cog <= 355)) {
            txtDireccion = "Noroeste (NorPoniente)";
        }

        else if ((this.cog > 355) && (this.cog <= 360)) {
            txtDireccion = "Norte";
        }

        return txtDireccion;
    }

    public double getNsat() {
        return nsat;
    }

    public double getHdop() {
        return hdop;
    }

    public double getAdc1() {
        return adc1;
    }

    public double getAdc2() {
        return adc2;
    }

    public double getBateria() {
        return bateria;
    }

    public int getIgnicion() {
        return ignicion;
    }

    public int getIo(int index) {
        int io_array[] = { 0, this.io1, this.io2, this.io3, this.io4,this.panico };
        return io_array[index];
    }

    public int getIo1() {
        return io1;
    }

    public int getIo2() {
        return io2;
    }

    public int getIo3() {
        return io3;
    }

    public int getIo4() {
        return io4;
    }

    public int getPower() {
        return power;
    }

    public long getHorometro() {
        return horometro;
    }

    public long getOdometro() {
        return odometro;
    }

    public int getEvento() {
        return evento;
    }

    public int getAntirobo() {
        return antirobo;
    }

    public int getPanico() {
        return panico;
    }

    public long getImei() {
        return imei;
    }

    public long getDriverId() {
        return driverId;
    }

    public boolean isValid() {
        return valid;
    }

    public Calendar getLastupdate() {
        return lastupdate;
    }

    public void setLastupdate(Calendar lastupdate) {
        this.lastupdate = lastupdate;
    }

    public HazelCastobjGpsData() {
    }

    public HazelCastobjGpsData(final GpsData original) {
        this.imei = original.getImei();
        this.driverId = original.getDriverId();
        this.panico = original.getPanico();
        this.antirobo = original.getAntirobo();
        this.ignicion = original.getIgnicion();
        this.now = original.getNow();
        this.ip = original.getIpaddress();

        if (original.getDriverIdArray() != null
                && original.getDriverIdArray().length == 4) {
            this.ibutton2 = original.getDriverIdArray()[1];
            this.ibutton3 = original.getDriverIdArray()[2];
            this.ibutton4 = original.getDriverIdArray()[3];
        } else {
            this.ibutton2 = -1;
            this.ibutton3 = -1;
            this.ibutton4 = -1;
        }

        if (original.getTimeCalendar() == null || !original.isFullProcessed()
                || original.isOnlyStatus()) {
            this.valid = false;
            Calendar fecha_gps = Calendar.getInstance(TimeZone
                    .getTimeZone("UTC"));
            fecha_gps.setTimeInMillis(0);
            this.utc = fecha_gps;
            this.lastupdate = original.getLastUpdate();
        } else {
            this.utc = original.getTimeCalendar();
            this.latitud = original.getLatitud();
            this.longitud = original.getLongitud();
            this.altitud = original.getAltitud();
            this.velocidad = original.getVelocidad();
            this.cog = original.getCOG();
            this.nsat = original.getNSAT();
            this.io1 = original.getIo1();
            this.io2 = original.getIo2();
            this.io3 = original.getIo3();
            this.io4 = original.getIo4();
            this.hdop = original.getHdop();
            this.adc1 = original.getAdc();
            this.adc2 = original.getAdc2();
            this.power = original.getPower();
            this.bateria = original.getBateria();
            this.odometro = original.getOdometro();
            this.horometro = original.getHorometro();
            this.evento = original.getEvento();
            this.setTrailerId(original.getTrailerId());
            this.setTripTime(original.getTripTime());
            this.setHorometroInterno(original.getHorometroInterno());

        }
    }

    // Para INIT por BD
    public HazelCastobjGpsData(final boolean fromDB, final long imei,
            final String registro, final Calendar utc,
            final Calendar lastupdate, final long driverid,
            final double latitud, final double longitud, final double altitud,
            final double velocidad, final double cog, final double nsat,
            final double hdop, final double adc1, final double adc2,
            final double adc3, final double adc4, final double bateria,
            final int ignicion, final int io1, final int io2, final int io3,
            final int io4, final int power, final int antirobo,
            final int panico, final long odometro, final long horometro,
            final int evento) {
        this.imei = imei;
        this.driverId = driverid;
        this.panico = panico;
        this.antirobo = antirobo;
        this.ignicion = ignicion;
        this.now = registro;
        this.utc = utc;
        this.lastupdate = lastupdate;
        this.latitud = latitud;
        this.longitud = longitud;
        this.altitud = altitud;
        this.velocidad = velocidad;
        this.cog = cog;
        this.nsat = nsat;
        this.io1 = io1;
        this.io2 = io2;
        this.io3 = io3;
        this.io4 = io4;
        this.hdop = hdop;
        this.adc1 = adc1;
        this.adc2 = adc2;
        this.adc3 = adc3;
        this.adc4 = adc4;
        this.power = power;
        this.bateria = bateria;
        this.horometro = horometro;
        this.odometro = odometro;
        this.setFromDB(fromDB);

    }

    @Override
    public void readData(final DataInput arg0) throws IOException {
        this.utc = Calendar.getInstance();
        try {
            this.utc.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
                    .parse(arg0.readUTF()));
        } catch (ParseException e) {
            ReqInit.logger.fatal(e.getMessage(), e);
        }

        this.lastupdate = Calendar.getInstance();
        try {
            this.lastupdate.setTime(new SimpleDateFormat(
                    "yyyy-MM-dd HH:mm:ss.SSS").parse(arg0.readUTF()));
        } catch (ParseException e) {
            ReqInit.logger.fatal(e.getMessage(), e);
        }

        this.now = arg0.readUTF();
        this.latitud = arg0.readDouble();
        this.longitud = arg0.readDouble();
        this.altitud = arg0.readDouble();
        this.velocidad = arg0.readDouble();
        this.cog = arg0.readDouble();
        this.nsat = arg0.readDouble();
        this.hdop = arg0.readDouble();
        this.adc1 = arg0.readDouble();
        this.adc2 = arg0.readDouble();
        this.adc3 = arg0.readDouble();
        this.adc4 = arg0.readDouble();
        this.bateria = arg0.readDouble();
        this.ignicion = arg0.readInt();
        this.io1 = arg0.readInt();
        this.io2 = arg0.readInt();
        this.io3 = arg0.readInt();
        this.io4 = arg0.readInt();
        this.power = arg0.readInt();
        this.antirobo = arg0.readInt();
        this.panico = arg0.readInt();
        this.imei = arg0.readLong();
        this.driverId = arg0.readLong();
        this.valid = arg0.readBoolean();
        this.odometro = arg0.readLong();
        this.horometro = arg0.readLong();
        this.evento = arg0.readInt();
        this.ip = arg0.readUTF();
        this.trailerId = arg0.readLong();
        this.tripTime = arg0.readLong();
        this.horometroInterno = arg0.readLong();
        this.ibutton2 = arg0.readLong();
        this.ibutton3 = arg0.readLong();
        this.ibutton4 = arg0.readLong();
        this.odometroInterno = arg0.readLong();
    }

    @Override
    public void writeData(final DataOutput arg0) throws IOException {
        arg0.writeUTF(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
                .format(utc.getTime()));
        arg0.writeUTF(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
                .format(lastupdate.getTime()));
        arg0.writeUTF(now);
        arg0.writeDouble(latitud);
        arg0.writeDouble(longitud);
        arg0.writeDouble(altitud);
        arg0.writeDouble(velocidad);
        arg0.writeDouble(cog);
        arg0.writeDouble(nsat);
        arg0.writeDouble(hdop);
        arg0.writeDouble(adc1);
        arg0.writeDouble(adc2);
        arg0.writeDouble(adc3);
        arg0.writeDouble(adc4);
        arg0.writeDouble(bateria);
        arg0.writeInt(ignicion);
        arg0.writeInt(io1);
        arg0.writeInt(io2);
        arg0.writeInt(io3);
        arg0.writeInt(io4);
        arg0.writeInt(power);
        arg0.writeInt(antirobo);
        arg0.writeInt(panico);
        arg0.writeLong(imei);
        arg0.writeLong(driverId);
        arg0.writeBoolean(valid);
        arg0.writeLong(odometro);
        arg0.writeLong(horometro);
        arg0.writeInt(evento);
        arg0.writeUTF(ip);
        arg0.writeLong(trailerId);
        arg0.writeLong(tripTime);
        arg0.writeLong(horometroInterno);
        arg0.writeLong(ibutton2);
        arg0.writeLong(ibutton3);
        arg0.writeLong(ibutton4);
        arg0.writeLong(odometroInterno);
    }

    public boolean isFromDB() {
        return fromDB;
    }

    public void setFromDB(boolean fromDB) {
        this.fromDB = fromDB;
    }

    public long getDeltaTime() {
        return delta_time;
    }

    private void setDelta_time(long delta_time) {
        this.delta_time = delta_time;
    }

    public double getAnteriorSog() {
        return anterior_sog;
    }

    private void setAnterior_sog(double anterior_sog) {
        this.anterior_sog = anterior_sog;
    }

    public double getAnterior_lon() {
        return anterior_lon;
    }

    private void setAnterior_lon(double anterior_lon) {
        this.anterior_lon = anterior_lon;
    }

    public double getAnterior_lat() {
        return anterior_lat;
    }

    private void setAnterior_lat(double anterior_lat) {
        this.anterior_lat = anterior_lat;
    }

    public double getAnteriorCog() {
        return anterior_cog;
    }

    private void setAnterior_cog(double anterior_cog) {
        this.anterior_cog = anterior_cog;
    }

    public double getAnteriorIgnicion() {
        return anterior_ignicion;
    }

    private void setAnterior_ignicion(double anterior_ignicion) {
        this.anterior_ignicion = anterior_ignicion;
    }

    public double getDeltaDist() {
        return delta_dist;
    }

    private void setDelta_dist(double delta_dist) {
        this.delta_dist = delta_dist;
    }

    public long getAnterior_odometro() {
        return anterior_odometro;
    }

    public long getAnterior_horometro() {
        return anterior_horometro;
    }

    public double getAdc3() {
        return adc3;
    }

    public void setAdc3(double adc3) {
        this.adc3 = adc3;
    }

    public double getAdc4() {
        return adc4;
    }

    public void setAdc4(double adc4) {
        this.adc4 = adc4;
    }

    public long getTrailerId() {
        return trailerId;
    }

    public void setTrailerId(long trailerId) {
        this.trailerId = trailerId;
    }

    public long getTripTime() {
        return tripTime;
    }

    public void setTripTime(long tripTime) {
        this.tripTime = tripTime;
    }

    public long getIbutton2() {
        return ibutton2;
    }

    public void setIbutton2(long ibutton2) {
        this.ibutton2 = ibutton2;
    }

    public long getIbutton3() {
        return ibutton3;
    }

    public void setIbutton3(long ibutton3) {
        this.ibutton3 = ibutton3;
    }

    public long getIbutton4() {
        return ibutton4;
    }

    public void setIbutton4(long ibutton4) {
        this.ibutton4 = ibutton4;
    }

    public long getOdometroInterno() {
        return odometroInterno;
    }

    public void setOdometroInterno(long odometroInterno) {
        this.odometroInterno = odometroInterno;
    }

}

我们知道我们正在使用旧版本的 hazelcast,但升级它还不是一个选项,我们想先知道是否存在代码问题。你能帮我找到问题吗?

4

0 回答 0