0

api.xml:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <property expression="json-eval($.test)" name="valueSchema" scope="default" type="STRING"/>
            <property expression="json-eval($.value)" name="value" scope="default" type="STRING"/>
            <property expression="json-eval($.key)" name="key" scope="default" type="STRING"/>
            <property expression="json-eval($.topic)" name="topic" scope="default" type="STRING"/>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
                <valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
                <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>{$ctx:topic}</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3", "schema":"$4"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                    <arg evaluator="xml" expression="$ctx:valueSchema"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

如果我使用org.apache.kafka.common.serialization.StringSerializer工作正常

如果我使用io.confluent.kafka.serializers.KafkaAvroSerializer我有:

[2022-02-22 12:01:37,547] 信息 {KafkaConnectionPool} - 连接池未满。继续添加新连接

还注意到这条线<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>消失了。

使用 WSO2 集成工作室 8.0.2

还添加到IntegrationStudio/runtime/microesb/lib目录:

kafka-schema-registry-client-5.3.0.jar 
metrics-core-2.2.0.jar    
scala-library-2.12.3. 
jarzookeeper-3.4.10.jar 
zkclient-0.10.jar    
common-config-5.4.0.jar 
zookeeper-3.4.10.jar
kafka_2.12-1.0.0.jar
avro-1.8.1.jar
common-utils-5.4.0.jar
kafka-avro-serializer-5.3.0.jar
kafka-clients-1.0.0.jar

为什么它不适用于 AVRO?

4

0 回答 0