0

生产者代码如下

static PltPage pltPage;
public static void main(String[] args) throws IOException {

    Short itemtype = 1;
    Properties props = new Properties();
    props.put("metadata.broker.list", "localhost:9092");
    props.put("partitioner.class", "com.rms.com.SimplePartitioner");
    props.put("serializer.class", "com.rms.com.CustomSerializer");
    props.put("request.required.acks", "1");

    ProducerConfig config = new ProducerConfig(props);
   Producer<String,PltResultPage> producer = new Producer<String,PltResultPage>(config);

        String folder = new File(".").getAbsoluteFile().getPath();
        String parent = new File(folder).getParentFile().getParent();
        String path = parent + "/KafkaProducerSparkConsumer/src/resources/PortfolioPLT.txt";
        FileReader fr = new FileReader(path);
        BufferedReader br = new BufferedReader(fr);
        String sCurrentLine;
        List<Integer> periodIds = new ArrayList<Integer>();
        List<Integer> sampleIds = new ArrayList<Integer>();
        List<Integer> eventIds = new ArrayList<Integer>();
        List<Integer> dates = new ArrayList<Integer>();
        List<Double> losses = new ArrayList<Double>();

        while ((sCurrentLine = br.readLine()) != null) {
            String [] entries = sCurrentLine.toString().split("~");
            if (entries[1].equalsIgnoreCase("GR"))
            {
                periodIds.add(Integer.parseInt(entries[2]));
                sampleIds.add(Integer.parseInt(entries[3]));
                eventIds.add(Integer.parseInt(entries[4]));
                dates.add(2040);
                losses.add(Double.parseDouble(entries[6]));
            }
        }

    pltPage = ExportUtilities.generatepltpage(periodIds,sampleIds,eventIds,dates,losses,periodIds.size(),1000000000002L,Short.parseShort("1"));
    PltResultPage resultPage = new PltResultPage();
    resultPage.setAnalysisId(1);
    resultPage.setExternalID("1");
    resultPage.setItemId(1L);
    resultPage.setItemType(itemtype);
    resultPage.setOutputProperty(itemtype);
    resultPage.setResultType(itemtype);
    resultPage.setResultPage(pltPage);
    resultPage.setJobId(1L);
        KeyedMessage<String, PltResultPage> message = new KeyedMessage<String, PltResultPage>("test", resultPage);
        producer.send(message);
    producer.close();
}

消费者代码如下

 Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "20000");
        props.put("zookeeper.sync.time.ms", "3000");
        props.put("auto.commit.interval.ms", "2000");
        props.put("auto.offset.reset", "smallest");
        props.put("serializer.class", "com.rms.com.CustomSerializer");

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        final StringDecoder decoder =
                new StringDecoder(new VerifiableProperties(returnProperties(zooKeeper,groupId)));
        final CustomSerializer decoder2 = new CustomSerializer(new VerifiableProperties(returnProperties(zooKeeper,groupId)));
        final Map<String, List<KafkaStream<String, PltResultPage>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, decoder, decoder2);
        final List<KafkaStream<String, PltResultPage>> streams = consumerMap.get(topic);
        executor = Executors.newFixedThreadPool(a_numThreads);
        int threadNumber = a_numThreads;
        for(KafkaStream stream : streams) {
            executor.submit(new ExecuteConsumerClient(stream, threadNumber));
            threadNumber++;
        }

System.out.println("calling ExecuteConsumerClient.run()");
        ConsumerIterator<String,PltResultPage> it = m_stream.iterator();

        while (it.hasNext())
        {
            try {
                CreateJavaSparkContext();
                System.out.println("Converting to ResultPage");
                PltResultPage pltResultPage = (PltResultPage)it.next().message();
                System.out.println("Before Impl Accept");
                sparkExportPLTToFile.accept(pltResultPage.getJobId(), pltResultPage.getItemId(), pltResultPage.getItemType(), pltResultPage.getOutputProperty(), pltResultPage.getResultType(), pltResultPage.getResultPage(), pltResultPage.getAnalysisId(), pltResultPage.getExternalID());
            }
            catch (Exception e)
            {
                System.out.println( "Exception in it.Run" + e.getStackTrace().toString() );
            }
            System.out.println("Executed impl for thread " + m_threadNumber);
        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }

当我尝试将消息转换为对象时,它失败了。我从其中一篇文章中获得了自定义序列化程序代码,如下所示。任何人都可以指出实施有什么问题。我尝试使用自定义序列化程序中的 FromBytes 并没有帮助。序列化程序返回空对象

自定义序列化器

public class CustomSerializer implements Encoder<PltResultPage>, Decoder<PltResultPage> {
    public CustomSerializer(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    public CustomSerializer() {

    }


    public byte[] toBytes(PltResultPage o) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }

    @Override
    public PltResultPage fromBytes(byte[] bytes) {
        try {
            return (PltResultPage) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
        } catch (Exception e) {
            return null;
        }
    }
}

PltResultPage 如下。

public class PltResultPage implements Serializable {
    private Long jobId;
    private Long itemId;
    private Short itemType;
    private Short outputProperty;
    private Short resultType;
    private LossPage resultPage;
    private Integer analysisId;
    private String externalID;
private static final long serialVersionUID = 0L;

    public Long getJobId()
    {return this.jobId;}

    public Long getItemId()
    {return this.itemId;}

    public String getExternalID()
    {return this.externalID;}

    public Short getItemType()
    {return this.itemType;}

    public Short getOutputProperty()
    {return this.outputProperty;}

    public Short getResultType()
    {   return this.resultType;}

    public LossPage getResultPage()
    {return this.resultPage;}

    public Integer getAnalysisId()
    {return this.analysisId;}

    public void setJobId(Long jobid)
    {this.jobId = jobid;}

    public void setOutputProperty(Short output)
    {this.outputProperty = output;}

    public void setItemId(Long itemId)
    {this.itemId = itemId;}

    public void setItemType(Short type)
    {
        this.itemType = type;
    }

    public void setResultType(Short resultType)
    {
        this.resultType = resultType;
    }

    public void setResultPage(LossPage page)
    {this.resultPage = page;}

    public void setAnalysisId(Integer id)
    {
        this.analysisId = id;
    }

    public void setExternalID(String externalID)
    {
        this.externalID = externalID;
    }
}
4

1 回答 1

0

尝试添加

private static final long serialVersionUID = 0L;

到 PltResultPage。您看不到它,但是该值与其他值一起被序列化,并且在反序列化时,该值与当前 JVM 中加载的类中的值进行比较。如果值不同,则序列化将失败并且您将获得空结果,即使您在消费者和生产者 JVM 中为 PltResultPage 使用完全相同的源代码。如果您没有为类指定 serialVersionUID,JVM 将为您生成一个值,并且可以肯定的是,消费者 JVM 中的 serialVersionUID 的随机值将不同于生产者 JVM 中的 serialVersionUID 的随机值。

简而言之,如果在自定义序列化器中使用默认的 Java 序列化/反序列化,则必须在被序列化对象的类中声明 serialVersionUID。

于 2015-12-15T15:16:53.467 回答