0

我正在尝试在 kura 上运行 kafka 制作人。在 kura 中使用 kafka 0.8.2 jar 文件运行相同的程序时,它成功运行。但是当我在最新版本(2.8.2)的kafka中尝试了所有东西时,它显示了这个错误。“初始化 Kafka 时出错。错误:java.lang.NullPointerException”。我正在尝试在 Eclipse 中使用 Java 版本 8 运行。kafka 对 java 版本有任何依赖吗?

public class Kafka
{
KafkaProducer< String, String > producer;
Properties p = new Properties();
int gvolume=100000,inventorytracker=0, nvolume, 

temperature=0,api=0,level=0,capacityfilled = 0, apisg=0, gvolumeusable =0, nvolumeusable =0;


public void config(String bootstrap, String topic)
{
try
{
p=new Properties();
p.setProperty("bootstrap.servers",bootstrap);
p.setProperty("kafka.topic.name", topic);
p.setProperty("transactional.id", "1");
producer = new KafkaProducer < String, String > (p, new StringSerializer(), new StringSerializer());
SendData(bootstrap, topic);
}
catch(Exception e)
{
System.out.println("Error occured while initializing Kafka. Error: "+e);
}
}



public void publish(String message)
{
System.out.println("Sending message : " + message);
try
{
ProducerRecord < String, String > record = new ProducerRecord <String, String > (p.getProperty("kafka.topic.name"), message);
producer.send(record);
System.out.println(message + record);
}
catch (Exception e)
{
System.out.println("Error: " + e);
}

}

public void SendData(String bootstrap, String topic)
{
Timer timer = new Timer();
TimerTask task = new TimerTask(){
public void run()
{
String message = "{\"height\":"+56+", \"diameter\":"+115+", \"setpoint\":"+5.9+", \"usablelow\":"+5.9+", \"product\":\"JET-A\",\"tanknominalcapacity\":"+4200000+", \"year\":"+2003+", \"status\":\"Online\", \"gvolume\":"+gvolume+", \"nvolume\":"+nvolume+", \"level\":"+level+", "
+ "\"temperature\":"+temperature+", \"capacityfilled\":"+capacityfilled+", \"inventorytracker\":"+inventorytracker+", \"apisg\":"+apisg+", \"gvolumeusable\":"+gvolumeusable+", \"nvolumeusable\":"+nvolumeusable+"}";
publish(message);
level++;
temperature = temperature+2;
apisg = apisg+2;
gvolume = 100000+gvolume;
nvolume = 100000+nvolume;
gvolumeusable = gvolume-100000;
nvolumeusable = nvolume-100000;
inventorytracker = 100000+inventorytracker;
capacityfilled = 5 + capacityfilled;
if(capacityfilled >= 100) capacityfilled=5;
if(gvolume >= 10000000) gvolume=100000;
if(inventorytracker >= 100000000) inventorytracker=100000;
if(level >= 28) level=0;
if(apisg >= 28) apisg=0;
if(nvolume >= 10000000) nvolume=100000;
if(temperature >= 180) temperature=42;
}
};
timer.schedule(task, 2000, 5000);
}
}
4

0 回答 0