我正在使用 Apache Pulsar v2.4.2。并参考了 pulsar-io-jdbc 并创建了打包 nar 的自定义接收器连接器。
并运行以下命令上传架构并创建连接器
1.上传 AVRO Schema。
bin/pulsar-admin schemas upload myjdbc --filename ./connectors/pulsar-io-jdbc-schema.conf
这是 avro 架构:
{
"type": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
"properties": {}
}
2.创建连接器
bin/pulsar-admin sinks localrun --archive ./connectors/pulsar-io-jdbc-2.4.2.nar --inputs myjdbc --name myjdbc --sink-config-file ./connectors/pulsar-io-jdbc.yaml
一旦我从 producer.SendAsync(message) 向主题“myjdbc”发送消息。消息在哪里
for (int i = 0; i < count; i++)
{
JObject obj = new JObject();
obj.Add("id", i);
obj.Add("name", "testing topic" + i);
var str = obj.ToString(Formatting.None);
var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes(str));
Console.WriteLine($"MessageId is: '{messageId}'");
}
Pulsar localrun 连接器出现以下异常:
19:48:37.437 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x557fb218, L:/127.0.0.1:39488 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at 192.168.1.97:6650
19:48:37.445 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc][public/default/myjdbc] Subscribing to topic on cnx [id: 0x557fb218, L:/127.0.0.1:39488 - R:localhost/127.0.0.1:6650]
19:48:37.503 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc][public/default/myjdbc] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
19:48:37.557 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
19:48:37.715 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc] [public/default/myjdbc] Closed consumer
19:48:37.942 [main] INFO org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of
java.lang.NullPointerException: null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[com.google.guava-guava-25.1-jre.jar:?]
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[com.google.guava-guava-25.1-jre.jar:?]
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[com.google.guava-guava-25.1-jre.jar:?]
at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:72) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:36) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
at org.apache.pulsar.client.api.Schema.decode(Schema.java:97) ~[org.apache.pulsar-pulsar-client-api-2.4.2.jar:2.4.2]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:268) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:473) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
19:49:03.105 [function-timer-thread-5-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/myjdbc-java.lang.NullPointerException Function Container is dead with exception.. restarting
而且也不会触发 write() 方法。