1

我正在使用 Apache Pulsar v2.4.2。并参考了 pulsar-io-jdbc 并创建了打包 nar 的自定义接收器连接器。

并运行以下命令上传架构并创建连接器

1.上传 AVRO Schema

bin/pulsar-admin schemas upload myjdbc --filename ./connectors/pulsar-io-jdbc-schema.conf

这是 avro 架构:

{
  "type": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
  "properties": {}
}

2.创建连接器

bin/pulsar-admin sinks localrun --archive ./connectors/pulsar-io-jdbc-2.4.2.nar --inputs myjdbc --name myjdbc --sink-config-file ./connectors/pulsar-io-jdbc.yaml

一旦我从 producer.SendAsync(message) 向主题“myjdbc”发送消息。消息在哪里

for (int i = 0; i < count; i++)
{
   JObject obj = new JObject();
   obj.Add("id", i);
   obj.Add("name", "testing topic" + i);
   var str = obj.ToString(Formatting.None);

   var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes(str));
                Console.WriteLine($"MessageId is: '{messageId}'");
}

Pulsar localrun 连接器出现以下异常:

19:48:37.437 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x557fb218, L:/127.0.0.1:39488 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at 192.168.1.97:6650
19:48:37.445 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc][public/default/myjdbc] Subscribing to topic on cnx [id: 0x557fb218, L:/127.0.0.1:39488 - R:localhost/127.0.0.1:6650]
19:48:37.503 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc][public/default/myjdbc] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
19:48:37.557 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
19:48:37.715 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [myjdbc] [public/default/myjdbc] Closed consumer
19:48:37.942 [main] INFO  org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of
java.lang.NullPointerException: null
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
        at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[com.google.guava-guava-25.1-jre.jar:?]
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[com.google.guava-guava-25.1-jre.jar:?]
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[com.google.guava-guava-25.1-jre.jar:?]
        at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
        at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:72) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
        at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:36) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
        at org.apache.pulsar.client.api.Schema.decode(Schema.java:97) ~[org.apache.pulsar-pulsar-client-api-2.4.2.jar:2.4.2]
        at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:268) ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
        at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:473) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) ~[org.apache.pulsar-pulsar-functions-instance-2.4.2.jar:2.4.2]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
19:49:03.105 [function-timer-thread-5-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/myjdbc-java.lang.NullPointerException Function Container is dead with exception.. restarting 

而且也不会触发 write() 方法。

4

0 回答 0