我正在尝试使用自定义 java 类直接将消息从 Oracle 发布到 Kafka 代理,因为我在 oracle 中上传了所需的库并使用 oracle 函数/过程调用 java 函数。基本的 java 函数工作正常,但 Kafka 正在初始化并使用的函数抛出未捕获的异常。即使我将整个函数体放在 try-catch 块中,但仍然没有将我的自定义异常作为消息。
我有具有 JVM 版本 1.8 的 Oracle 12C。
此消息在 java 函数调用 (java.lang.NoClassDefFound 错误) 时在 oracle SQL 提示符上弹出。
我为我的自定义类创建了一个 jar 文件,并将其与受人尊敬的库一起上传到 oracle DB。我使用以下命令上传了库文件。
loadjava -u <user>/<Password>@DB -resolve <library>.jar
请告诉如何从 oracle 内部初始化 Kafka Producer 并开始发送消息
如何调试在 oracle DB 中加载的 java 函数。
我的自定义类如下
public class KafkaPublisher {
static ProducerRecord<String, String> PR = null;
static Producer<String, String> producer = null;
static Properties prop = new Properties();
public static void init() {
prop.put("bootstrap.servers", "111.11.11.11:9092");
prop.put("acks", "1");
prop.put("retries", "0");
prop.put("batch.size", "16384");
prop.put("linger.ms", "1");
prop.put("buffer.memory", "33554432");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static String push(String data) {
String response = "No responce";
init();
try {
producer = new KafkaProducer<String, String>(prop);
PR = new ProducerRecord<String, String>("topic1", data);
Future<RecordMetadata> future = producer.send(PR);
RecordMetadata rMetaData = future.get();
response = "Current Offset: " + rMetaData.offset();
} catch (Exception e) {
response = "Message: " + e.getMessage() + " Cause: " + e.getCause();
}
return response;
}
public static String getProperties(String msg) {
return "My message: " + msg + prop.toString();
}
}
函数 getProperties(String msg) 工作正常,但函数 push(String data) 不工作。