api.xml:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<property expression="json-eval($.test)" name="valueSchema" scope="default" type="STRING"/>
<property expression="json-eval($.value)" name="value" scope="default" type="STRING"/>
<property expression="json-eval($.key)" name="key" scope="default" type="STRING"/>
<property expression="json-eval($.topic)" name="topic" scope="default" type="STRING"/>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>{$ctx:topic}</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3", "schema":"$4"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
<arg evaluator="xml" expression="$ctx:valueSchema"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
如果我使用org.apache.kafka.common.serialization.StringSerializer
工作正常
如果我使用io.confluent.kafka.serializers.KafkaAvroSerializer
我有:
[2022-02-22 12:01:37,547] 信息 {KafkaConnectionPool} - 连接池未满。继续添加新连接
还注意到这条线<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
消失了。
使用 WSO2 集成工作室 8.0.2
还添加到IntegrationStudio/runtime/microesb/lib
目录:
kafka-schema-registry-client-5.3.0.jar
metrics-core-2.2.0.jar
scala-library-2.12.3.
jarzookeeper-3.4.10.jar
zkclient-0.10.jar
common-config-5.4.0.jar
zookeeper-3.4.10.jar
kafka_2.12-1.0.0.jar
avro-1.8.1.jar
common-utils-5.4.0.jar
kafka-avro-serializer-5.3.0.jar
kafka-clients-1.0.0.jar
为什么它不适用于 AVRO?