1

我有一个从 Kafka 主题读取数据的 Spring Boot 代码。当通过 Kafka Producer Console 向主题提供数据时,代码按预期工作。当我尝试通过金门将数据推送到 kafka 主题时,代码不会从主题中读取数据,尽管我可以看到金门能够将数据写入 kafka 主题。谁能提出为什么这种行为变化?

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.bson.Document;
import org.json.JSONArray;
import org.json.JSONObject;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class VideoConsumer implements Runnable {
    private ObjectMapper objectMapper;
    private KafkaStream<byte[], byte[]> kafkaStream;
    private int threadNumber;

    public VideoConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
        this.objectMapper = new ObjectMapper();
    }
    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();

        while (it.hasNext()) {
            byte[] messageData = it.next().message();
            try {
                //String videoFromMessage = objectMapper.readValue(messageData, String.class);
                //byte[] videoFromMessage = it.next().message();
                //System.out.print("got message");

                String streamData = new String(messageData);
                System.out.print("Thread:" + threadNumber + ".Consuming video: " + streamData + "\n");
                String changed=streamData.toString();
                int pos=changed.lastIndexOf("}}");
                String change=changed.substring(0,pos );
                change=change.replace("}}", "}},");
                String res=change.concat("}}"); 

                String result="[" +res+ "]";
                System.out.println(result);
                JSONArray json;

                    json = new JSONArray(result);
                    Map<String, List<JSONObject>> orderMongo = new HashMap<>();
                    Map<String, List<JSONObject>> orderItemMongo = new HashMap<>();
                    MongoClient mongoClient = new MongoClient( "localhost" , 27017 );
                    MongoDatabase db = mongoClient.getDatabase("Mongotest");  
                    MongoCollection<Document> table = db.getCollection("test1");
                    Document doc1=new Document();
                    //Gson gson=new Gson();
                    BasicDBObject document = new BasicDBObject();

                    for (int i = 0; i < json.length(); i++) {
                        JSONObject obj = json.getJSONObject(i);
                       if(obj.getString("table").equals("TEST.S_ORDER_MONGO1")){
                        List<JSONObject> list = orderMongo.getOrDefault(obj.getString("table").equals("TEST.S_ORDER_MONGO1"),new ArrayList<>());
                        list.add(obj);
                        orderMongo.put(obj.getJSONObject("after").getString("ROW_ID"),list);
                       }
                       else if(obj.getString("table").equals("TEST.S_ORDER_ITEM_MONGO1")){
                           List<JSONObject> nextlist = orderItemMongo.getOrDefault(obj.getString("table").equals("TEST.S_ORDER_ITEM_MONGO1"),new ArrayList<>());
                        nextlist.add(obj);
                     orderItemMongo.put(obj.getJSONObject("after").getString("ORDER_ID"),nextlist);

                       }
                    }
                    System.out.println(orderMongo);
                    System.out.println(orderItemMongo);
                   // System.out.println(orderItemMongo);
                    for (Entry<String, List<JSONObject>> entry : orderMongo.entrySet()) {
                        for(Entry<String, List<JSONObject>> entry1 : orderItemMongo.entrySet()){
                            if(entry.getKey().equals(entry1.getKey())){
                            //String gsonString=gson.toJson(entry.getValue());  
                            //System.out.println(gsonString);
                                List<JSONObject> listnext = entry.getValue();
                                List <JSONObject> orderlineList=entry1.getValue();
                                for(JSONObject obj:listnext){
                            Document doc = new Document("STATUS_CD", obj.getJSONObject("after").getString("STATUS_CD"));
                                if(obj.getJSONObject("after").isNull("INTEGRATION_ID")==true){
                            doc.append("INTEGRATION_ID", null);}

                            doc.append("X_CUST_REF", obj.getJSONObject("after").getString("X_CUST_REF"));
                            doc.append("REQ_SHIP_DT",obj.getJSONObject("after").getString("REQ_SHIP_DT"));

                            if(obj.getJSONObject("after").isNull("QUOTE_ID")==true){
                            doc.append("QUOTE_ID",null);}

                            doc.append("ACCNT_ID",obj.getJSONObject("after").getString("ACCNT_ID"));
                            doc.append("ACTIVE_FLG",obj.getJSONObject("after").getString("ACTIVE_FLG"));
                            doc.append("PROCESS_TIMESTAMP",obj.getJSONObject("after").getString("PROCESS_TIMESTAMP"));
                            doc.append("CONTACT_ID",obj.getJSONObject("after").getString("CONTACT_ID"));
                            doc.append("BU_ID", obj.getJSONObject("after").getString("BU_ID"));
                            doc.append("SHIP_CON_ID",obj.getJSONObject("after").getString("SHIP_CON_ID"));
                            doc.append("LAST_UPD", obj.getJSONObject("after").getString("LAST_UPD"));

                            if(obj.getJSONObject("after").isNull("X_CLOSE_DT")==true){
                            doc.append("X_CLOSE_DT", null);}

                            doc.append("X_SUB_STAT", obj.getJSONObject("after").getString("X_SUB_STAT"));
                            doc.append("ORDER_NUM", obj.getJSONObject("after").getString("ORDER_NUM"));
                            doc.append("SOFT_DELETE", obj.getJSONObject("after").getString("SOFT_DELETE"));
                            doc.append("ROW_ID", obj.getJSONObject("after").getString("ROW_ID"));
                            doc.append("LAST_UPD_BY",obj.getJSONObject("after").getString("LAST_UPD_BY"));
                            doc.append("REV_NUM",obj.getJSONObject("after").getString("REV_NUM"));
                            doc.append("ORDER_DT", obj.getJSONObject("after").getString("ORDER_DT"));
                            for(JSONObject object:orderlineList){
                                if(object.getJSONObject("after").isNull("ASSET_ID")==true){

                                    doc1.append("ASSET_ID", null);}

                                if(object.getJSONObject("after").isNull("SERV_ACCNT_ID")==true){
                                doc1.append("SERV_ACCNT_ID", null);}
                                doc1.append("REQ_SHIP_DT",object.getJSONObject("after").getString("REQ_SHIP_DT"));

                                if(object.getJSONObject("after").isNull("X_PROD_DESC")==true){
                                doc1.append("X_PROD_DESC",null);}

                                if(object.getJSONObject("after").isNull("SHIP_CON_ID")==true){
                                doc1.append("SHIP_CON_ID",null);}

                                doc1.append("X_BES_STATUS",object.getJSONObject("after").getString("X_BES_STATUS"));
                                doc1.append("ROW_ID",object.getJSONObject("after").getString("ROW_ID"));
                                doc1.append("STATUS_CD",object.getJSONObject("after").getString("STATUS_CD"));
                                doc1.append("ORDER_ID",object.getJSONObject("after").getString("ORDER_ID"));
                                if(object.getJSONObject("after").isNull("COMPLETED_DT")==true){
                                doc1.append("COMPLETED_DT",null);}

                                doc1.append("LAST_UPD",object.getJSONObject("after").getString("LAST_UPD"));
                                doc1.append("SOFT_DELETE",object.getJSONObject("after").getString("SOFT_DELETE"));
                                doc1.append("INTEGRATION_ID",object.getJSONObject("after").getString("INTEGRATION_ID"));
                                doc1.append("X_CDD",object.getJSONObject("after").getString("X_CDD"));
                                doc1.append("ACTION_CD",object.getJSONObject("after").getString("ACTION_CD"));
                                doc1.append("X_ORDER_ITEM_SUBSTATUS",object.getJSONObject("after").getString("X_ORDER_ITEM_SUBSTATUS"));
                                if(object.getJSONObject("after").isNull("X_APPT_REF")==true){

                                doc1.append("X_APPT_REF",null);}

                                if(object.getJSONObject("after").isNull("X_CANCELLED_DT")==true){
                                doc1.append("X_CANCELLED_DT",null);}

                                doc1.append("PROD_ID",object.getJSONObject("after").getString("PROD_ID"));

                                if(object.getJSONObject("after").isNull("SERVICE_NUM")==true){

                                doc1.append("SERVICE_NUM",null);}

                                if(object.getJSONObject("after").isNull("MUST_DLVR_BY_DT")==true){


                                doc1.append("MUST_DLVR_BY_DT",null);}

                                doc1.append("ROLLUP_FLG",object.getJSONObject("after").getString("ROLLUP_FLG"));
                                doc1.append("ROOT_ORDER_ITEM_ID",object.getJSONObject("after").getString("ROOT_ORDER_ITEM_ID"));
                                doc1.append("BILL_ACCNT_ID",object.getJSONObject("after").getString("BILL_ACCNT_ID"));
                                doc1.append("PROCESS_TIMESTAMP",object.getJSONObject("after").getString("PROCESS_TIMESTAMP"));
                                doc1.append("QTY_REQ",object.getJSONObject("after").getString("QTY_REQ"));

                            }
                            doc.append("ORDERLINE", doc1);
                            table.insertOne(doc);
                                }
                            }
                    }
            }
       }        


            catch (Exception e) {
                e.printStackTrace();
        }

        System.out.println("Shutting down Thread: " + kafkaStream);
        }
    }
}    
4

0 回答 0