我有一个从 Kafka 主题读取数据的 Spring Boot 代码。当通过 Kafka Producer Console 向主题提供数据时,代码按预期工作。当我尝试通过金门将数据推送到 kafka 主题时,代码不会从主题中读取数据,尽管我可以看到金门能够将数据写入 kafka 主题。谁能提出为什么这种行为变化?
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.bson.Document;
import org.json.JSONArray;
import org.json.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class VideoConsumer implements Runnable {
private ObjectMapper objectMapper;
private KafkaStream<byte[], byte[]> kafkaStream;
private int threadNumber;
public VideoConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
this.objectMapper = new ObjectMapper();
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
byte[] messageData = it.next().message();
try {
//String videoFromMessage = objectMapper.readValue(messageData, String.class);
//byte[] videoFromMessage = it.next().message();
//System.out.print("got message");
String streamData = new String(messageData);
System.out.print("Thread:" + threadNumber + ".Consuming video: " + streamData + "\n");
String changed=streamData.toString();
int pos=changed.lastIndexOf("}}");
String change=changed.substring(0,pos );
change=change.replace("}}", "}},");
String res=change.concat("}}");
String result="[" +res+ "]";
System.out.println(result);
JSONArray json;
json = new JSONArray(result);
Map<String, List<JSONObject>> orderMongo = new HashMap<>();
Map<String, List<JSONObject>> orderItemMongo = new HashMap<>();
MongoClient mongoClient = new MongoClient( "localhost" , 27017 );
MongoDatabase db = mongoClient.getDatabase("Mongotest");
MongoCollection<Document> table = db.getCollection("test1");
Document doc1=new Document();
//Gson gson=new Gson();
BasicDBObject document = new BasicDBObject();
for (int i = 0; i < json.length(); i++) {
JSONObject obj = json.getJSONObject(i);
if(obj.getString("table").equals("TEST.S_ORDER_MONGO1")){
List<JSONObject> list = orderMongo.getOrDefault(obj.getString("table").equals("TEST.S_ORDER_MONGO1"),new ArrayList<>());
list.add(obj);
orderMongo.put(obj.getJSONObject("after").getString("ROW_ID"),list);
}
else if(obj.getString("table").equals("TEST.S_ORDER_ITEM_MONGO1")){
List<JSONObject> nextlist = orderItemMongo.getOrDefault(obj.getString("table").equals("TEST.S_ORDER_ITEM_MONGO1"),new ArrayList<>());
nextlist.add(obj);
orderItemMongo.put(obj.getJSONObject("after").getString("ORDER_ID"),nextlist);
}
}
System.out.println(orderMongo);
System.out.println(orderItemMongo);
// System.out.println(orderItemMongo);
for (Entry<String, List<JSONObject>> entry : orderMongo.entrySet()) {
for(Entry<String, List<JSONObject>> entry1 : orderItemMongo.entrySet()){
if(entry.getKey().equals(entry1.getKey())){
//String gsonString=gson.toJson(entry.getValue());
//System.out.println(gsonString);
List<JSONObject> listnext = entry.getValue();
List <JSONObject> orderlineList=entry1.getValue();
for(JSONObject obj:listnext){
Document doc = new Document("STATUS_CD", obj.getJSONObject("after").getString("STATUS_CD"));
if(obj.getJSONObject("after").isNull("INTEGRATION_ID")==true){
doc.append("INTEGRATION_ID", null);}
doc.append("X_CUST_REF", obj.getJSONObject("after").getString("X_CUST_REF"));
doc.append("REQ_SHIP_DT",obj.getJSONObject("after").getString("REQ_SHIP_DT"));
if(obj.getJSONObject("after").isNull("QUOTE_ID")==true){
doc.append("QUOTE_ID",null);}
doc.append("ACCNT_ID",obj.getJSONObject("after").getString("ACCNT_ID"));
doc.append("ACTIVE_FLG",obj.getJSONObject("after").getString("ACTIVE_FLG"));
doc.append("PROCESS_TIMESTAMP",obj.getJSONObject("after").getString("PROCESS_TIMESTAMP"));
doc.append("CONTACT_ID",obj.getJSONObject("after").getString("CONTACT_ID"));
doc.append("BU_ID", obj.getJSONObject("after").getString("BU_ID"));
doc.append("SHIP_CON_ID",obj.getJSONObject("after").getString("SHIP_CON_ID"));
doc.append("LAST_UPD", obj.getJSONObject("after").getString("LAST_UPD"));
if(obj.getJSONObject("after").isNull("X_CLOSE_DT")==true){
doc.append("X_CLOSE_DT", null);}
doc.append("X_SUB_STAT", obj.getJSONObject("after").getString("X_SUB_STAT"));
doc.append("ORDER_NUM", obj.getJSONObject("after").getString("ORDER_NUM"));
doc.append("SOFT_DELETE", obj.getJSONObject("after").getString("SOFT_DELETE"));
doc.append("ROW_ID", obj.getJSONObject("after").getString("ROW_ID"));
doc.append("LAST_UPD_BY",obj.getJSONObject("after").getString("LAST_UPD_BY"));
doc.append("REV_NUM",obj.getJSONObject("after").getString("REV_NUM"));
doc.append("ORDER_DT", obj.getJSONObject("after").getString("ORDER_DT"));
for(JSONObject object:orderlineList){
if(object.getJSONObject("after").isNull("ASSET_ID")==true){
doc1.append("ASSET_ID", null);}
if(object.getJSONObject("after").isNull("SERV_ACCNT_ID")==true){
doc1.append("SERV_ACCNT_ID", null);}
doc1.append("REQ_SHIP_DT",object.getJSONObject("after").getString("REQ_SHIP_DT"));
if(object.getJSONObject("after").isNull("X_PROD_DESC")==true){
doc1.append("X_PROD_DESC",null);}
if(object.getJSONObject("after").isNull("SHIP_CON_ID")==true){
doc1.append("SHIP_CON_ID",null);}
doc1.append("X_BES_STATUS",object.getJSONObject("after").getString("X_BES_STATUS"));
doc1.append("ROW_ID",object.getJSONObject("after").getString("ROW_ID"));
doc1.append("STATUS_CD",object.getJSONObject("after").getString("STATUS_CD"));
doc1.append("ORDER_ID",object.getJSONObject("after").getString("ORDER_ID"));
if(object.getJSONObject("after").isNull("COMPLETED_DT")==true){
doc1.append("COMPLETED_DT",null);}
doc1.append("LAST_UPD",object.getJSONObject("after").getString("LAST_UPD"));
doc1.append("SOFT_DELETE",object.getJSONObject("after").getString("SOFT_DELETE"));
doc1.append("INTEGRATION_ID",object.getJSONObject("after").getString("INTEGRATION_ID"));
doc1.append("X_CDD",object.getJSONObject("after").getString("X_CDD"));
doc1.append("ACTION_CD",object.getJSONObject("after").getString("ACTION_CD"));
doc1.append("X_ORDER_ITEM_SUBSTATUS",object.getJSONObject("after").getString("X_ORDER_ITEM_SUBSTATUS"));
if(object.getJSONObject("after").isNull("X_APPT_REF")==true){
doc1.append("X_APPT_REF",null);}
if(object.getJSONObject("after").isNull("X_CANCELLED_DT")==true){
doc1.append("X_CANCELLED_DT",null);}
doc1.append("PROD_ID",object.getJSONObject("after").getString("PROD_ID"));
if(object.getJSONObject("after").isNull("SERVICE_NUM")==true){
doc1.append("SERVICE_NUM",null);}
if(object.getJSONObject("after").isNull("MUST_DLVR_BY_DT")==true){
doc1.append("MUST_DLVR_BY_DT",null);}
doc1.append("ROLLUP_FLG",object.getJSONObject("after").getString("ROLLUP_FLG"));
doc1.append("ROOT_ORDER_ITEM_ID",object.getJSONObject("after").getString("ROOT_ORDER_ITEM_ID"));
doc1.append("BILL_ACCNT_ID",object.getJSONObject("after").getString("BILL_ACCNT_ID"));
doc1.append("PROCESS_TIMESTAMP",object.getJSONObject("after").getString("PROCESS_TIMESTAMP"));
doc1.append("QTY_REQ",object.getJSONObject("after").getString("QTY_REQ"));
}
doc.append("ORDERLINE", doc1);
table.insertOne(doc);
}
}
}
}
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println("Shutting down Thread: " + kafkaStream);
}
}
}