0

有人可以帮我解决问题以正确加入 2 个流,其中 key 和 value 为GenericRecord. 首先,如您所见,我正在使用 AVRO 模式为键和值创建 2 个主题。之后,我加入了两个流,并在输出主题中创建了 new GenericRecord,即带有投影模式的所谓投影记录,我得到了代码片段后所示的异常:

@Test
public void joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord() throws Exception {
    String methodName = new Object() {
    }.getClass().getEnclosingMethod().getName();

    long timestamp = new Date().getTime();

    String firstTopic = String.format("%1$s_1_%2$s", methodName, timestamp);
    String secondTopic = String.format("%1$s_2_%2$s", methodName, timestamp);

    String outputTopic = String.format("%1$s_output_%2$s", methodName, timestamp);

    String firstStorage = String.format("%1$s_store_1_%2$s", methodName, timestamp);
    String secondStorage = String.format("%1$s_store_2_%2$s", methodName, timestamp);

    String appIdConfig = String.format("%1$s_app_id_%2$s", methodName, timestamp);
    String groupIdConfig = String.format("%1$s_group_id_%2$s", methodName, timestamp);

    String schemaIdNamespace = String.format("%1$s_id_ns_%2$s", methodName, timestamp);
    String schemaNameNamespace = String.format("%1$s_name_ns_%2$s", methodName, timestamp);
    String schemaScopeNamespace = String.format("%1$s_scope_ns_%2$s", methodName, timestamp);
    String schemaProjectionNamespace = String.format("%1$s_proj_ns_%2$s", methodName, timestamp);

    String schemaIdRecord = String.format("%1$s_id_rec_%2$s", methodName, timestamp);
    String schemaNameRecord = String.format("%1$s_name_rec_%2$s", methodName, timestamp);
    String schemaScopeRecord = String.format("%1$s_scope_rec_%2$s", methodName, timestamp);
    String schemaProjectionRecord = String.format("%1$s_proj_rec_%2$s", methodName, timestamp);

    try {
        Integer partitions = 1;
        Integer replication = 1;
        Properties topicConfig = new Properties();

        RestUtils.createTopic(firstTopic, partitions, replication, topicConfig);
        RestUtils.createTopic(secondTopic, partitions, replication, topicConfig);
        RestUtils.createTopic(outputTopic, partitions, replication, topicConfig);

        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/"); //TestUtils.tempDirectory().getAbsolutePath());
        streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);

        Serializer kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(streamsConfiguration, false);

        Deserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(streamsConfiguration, false);

        Serde<GenericRecord> avroSerde = Serdes.serdeFrom(kafkaAvroSerializer, kafkaAvroDeserializer);

        //-----

        Schema idSchema = SchemaBuilder.record(schemaIdRecord).namespace(schemaIdNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .endRecord();

        Schema nameSchema = SchemaBuilder.record(schemaNameRecord).namespace(schemaNameNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .name("Name").type().nullable().stringType().noDefault()
                .endRecord();

        Schema scopeSchema = SchemaBuilder.record(schemaScopeRecord).namespace(schemaScopeNamespace).fields()
                .name("Scope").type().nullable().stringType().noDefault()
                .endRecord();

        Schema projectionSchema = SchemaBuilder.record(schemaProjectionRecord).namespace(schemaProjectionNamespace).fields()
                .name("Id").type().nullable().intType().noDefault()
                .name("Name").type().nullable().stringType().noDefault()
                .name("Scope").type().nullable().stringType().noDefault()
                .endRecord();

        GenericRecord idRecord1 = new GenericData.Record(idSchema);
        idRecord1.put("Id", 1);
        GenericRecord idRecord2 = new GenericData.Record(idSchema);
        idRecord2.put("Id", 2);
        GenericRecord idRecord3 = new GenericData.Record(idSchema);
        idRecord3.put("Id", 3);
        GenericRecord idRecord4 = new GenericData.Record(idSchema);
        idRecord4.put("Id", 4);

        GenericRecord nameRecord1 = new GenericData.Record(nameSchema);
        nameRecord1.put("Id", 1);
        nameRecord1.put("Name", "Bruce Eckel");
        GenericRecord nameRecord2 = new GenericData.Record(nameSchema);
        nameRecord2.put("Id", 2);
        nameRecord2.put("Name", "Robert Lafore");
        GenericRecord nameRecord3 = new GenericData.Record(nameSchema);
        nameRecord3.put("Id", 3);
        nameRecord3.put("Name", "Andrew Tanenbaum");
        GenericRecord nameRecord4 = new GenericData.Record(nameSchema);
        nameRecord4.put("Id", 4);
        nameRecord4.put("Name", "Programming in Scala");

        GenericRecord scopeRecord1 = new GenericData.Record(scopeSchema);
        scopeRecord1.put("Scope", "Modern Operating System");
        GenericRecord scopeRecord2 = new GenericData.Record(scopeSchema);
        scopeRecord2.put("Scope", "Thinking in Java");
        GenericRecord scopeRecord3 = new GenericData.Record(scopeSchema);
        scopeRecord3.put("Scope", "Computer Architecture");
        GenericRecord scopeRecord4 = new GenericData.Record(scopeSchema);
        scopeRecord4.put("Scope", "Programming in Scala");

        List<KeyValue<GenericRecord, GenericRecord>> list1 = Arrays.asList(
                new KeyValue<>(idRecord1, nameRecord1),
                new KeyValue<>(idRecord2, nameRecord2),
                new KeyValue<>(idRecord3, nameRecord3)
        );

        List<KeyValue<GenericRecord, GenericRecord>> list2 = Arrays.asList(
                new KeyValue<>(idRecord3, scopeRecord1),
                new KeyValue<>(idRecord1, scopeRecord2),
                new KeyValue<>(idRecord3, scopeRecord3),
                new KeyValue<>(idRecord4, scopeRecord4)
        );

        GenericRecord projectionRecord1 = new GenericData.Record(projectionSchema);
        projectionRecord1.put("Id", nameRecord1.get("Id"));
        projectionRecord1.put("Name", nameRecord1.get("Name"));
        projectionRecord1.put("Scope", scopeRecord1.get("Scope"));

        GenericRecord projectionRecord2 = new GenericData.Record(projectionSchema);
        projectionRecord2.put("Id", nameRecord2.get("Id"));
        projectionRecord2.put("Name", nameRecord2.get("Name"));
        projectionRecord2.put("Scope", scopeRecord2.get("Scope"));

        GenericRecord projectionRecord3 = new GenericData.Record(projectionSchema);
        projectionRecord3.put("Id", nameRecord3.get("Id"));
        projectionRecord3.put("Name", nameRecord3.get("Name"));
        projectionRecord3.put("Scope", scopeRecord3.get("Scope"));

        List<KeyValue<GenericRecord, GenericRecord>> expectedResults = Arrays.asList(
                new KeyValue<>(idRecord3, projectionRecord3),
                new KeyValue<>(idRecord1, projectionRecord1),
                new KeyValue<>(idRecord3, projectionRecord3)
        );

        //-----

        KStreamBuilder builder = new KStreamBuilder();

        KStream<GenericRecord, GenericRecord> firstStream = builder.stream(avroSerde, avroSerde, firstTopic);

        KStream<GenericRecord, GenericRecord> secondStream = builder.stream(avroSerde, avroSerde, secondTopic);

        KStream<GenericRecord, GenericRecord> outputStream = firstStream.join(secondStream,
                new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                    @Override
                    public GenericRecord apply(GenericRecord l, GenericRecord r) {
                        GenericRecord projectionRecord = new GenericData.Record(projectionSchema);
                        projectionRecord.put("Id", l.get("Id"));
                        projectionRecord.put("Name", l.get("Name"));
                        projectionRecord.put("Scope", r.get("Scope"));
                        return projectionRecord;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), avroSerde, avroSerde, avroSerde);

        outputStream.to(avroSerde, avroSerde, outputTopic);

        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

        streams.start();

        Properties cfg1 = new Properties();
        cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
        cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
        cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg1.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
        IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, list1, cfg1);

        Properties cfg2 = new Properties();
        cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
        cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
        cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        cfg2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
        IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, list2, cfg2);

        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);

        List<KeyValue<GenericRecord, GenericRecord>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());

        streams.close();

        //-----

        assertThat(actualResults).containsExactlyElementsOf(expectedResults);

        //-----
    } finally {
        RestUtils.deleteTopics(firstTopic, secondTopic, outputTopic);
    }
}

堆栈跟踪:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_1_1490264134172, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:63)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:166)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:548)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:489)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:426)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:369)
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:391)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154)
    ... 44 more
; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
4

3 回答 3

1
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema

关键线路是:

  • “注册 Avro 架构时出错”
  • “正在注册的架构与早期架构不兼容”

注册失败的架构是:

Error registering Avro schema:{  
   "type":"record",
   "name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172",
   "namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172",
   "fields":[  
      {  
         "name":"Id",
         "type":[  
            "int",
            "null"
         ]
      },
      {  
         "name":"Name",
         "type":[  
            "string",
            "null"
         ]
      }
   ]
}

也许您之前已经尝试过您的代码,并且在那段时间已经将不同的输出消息(具有不同的 Avro 模式)写入输出主题。我能想象的发生在这里:

  1. 您的新消息的架构与之前的架构不兼容,并且
  2. 您已经为输出主题配置了模式注册表(我假设它是Confluent 模式注册表),以便注册表拒绝不兼容的模式。

您可以尝试的一个快速修复,因为您似乎只是在玩它,是在输出主题的模式注册表中禁用兼容性检查。

于 2017-03-24T08:05:31.573 回答
0

来自文档:“要完全删除此测试期间产生的任何数据并下次以干净状态开始,您可以运行confluent destroy

于 2017-08-28T09:31:53.453 回答
0

有时,模式不兼容是一件真正令人头疼的事情。

这是我为解决此类问题所做的工作:

清除主题:

kafka-topics.bat  --zookeeper 1.2.3.4:2181 --delete --topic your-topic-name

在模式重试 UI 中,转到CONFIG选项卡并设置兼容性级别NONE

之后,您可以轻松地更新架构。

在模式更新和成功向主题发布新消息后,您可以恢复旧的兼容性级别。

注意,不要更改全球兼容级别。

于 2017-10-04T11:00:00.367 回答