0

我正在尝试创建一个简单的 Resttemplate,它将从 Schema Registary 获取模式,如果响应为 200,它将获取模式 ID 并将其与 Avro 格式的消息一起发送到 kafka。这是我的代码:。

 @SpringBootApplication
        public class DemoApplication implements CommandLineRunner{

  String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********", "test-topic");
    String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry-*********", "test-topic");
  @Autowired
  private RestTemplate restTemplate;

@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
    
    ObjectMapper obj = new ObjectMapper();
     JSONObject event = new JSONObject();
     JSONObject record = new JSONObject();
     JSONObject eventenvolpe = new JSONObject();
     JSONObject jsondata  = new JSONObject();
     JSONArray jsonarray =new JSONArray();
     JSONObject recordvalue =new JSONObject();
  // connecting to schema registary and getting back schema   
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
    headers.setBasicAuth("username", "password");
    HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);
     
    ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity, String.class);
         if(result.getStatusCodeValue()==200) {
             
             JsonNode rootNode = obj.readTree(result.getBody());
             JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema
             
             event = new JSONObject();
             record = new JSONObject();
             eventenvolpe = new JSONObject();
             jsondata  = new JSONObject();
            
            
            
            jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
            eventenvolpe.put("event_envelope", jsondata);
            recordvalue.put("value", eventenvolpe);
            jsonarray.put(recordvalue);
            event.put("value_schema_id", schema_id);
            event.put("records", recordvalue);//setting up event object to send to kafka
        
            System.out.println(event);
        
        //Sending message to kafka
        HttpHeaders messageheaders = new HttpHeaders();
        messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
        messageheaders.setBasicAuth("username", "password");
        HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );
         
        ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST, message, String.class);
        
        if(result1.getStatusCodeValue()==200) {
            System.out.println("Message is pushed to Kafka");
        }
        
             
        
             
         }
         
    
}

 }

我能够成功地获取模式表单注册表,但是在发送回 kafka 时出现错误。

2020-06-30 20:02:56.078  INFO 7972 --- [           main] com.example.demo.DemoApplication         : 
No active profile set, falling back to default profiles: default
2020-06-30 20:02:59.235  INFO 7972 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : 
Tomcat initialized with port(s): 8089 (http)
2020-06-30 20:02:59.263  INFO 7972 --- [           main] o.apache.catalina.core.StandardService   : 
Starting service [Tomcat]
  2020-06-30 20:02:59.264  INFO 7972 --- [           main] org.apache.catalina.core.StandardEngine  : 
 Starting Servlet engine: [Apache Tomcat/9.0.36]
 2020-06-30 20:02:59.434  INFO 7972 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : 
 Initializing Spring embedded WebApplicationContext
 2020-06-30 20:02:59.434  INFO 7972 --- [           main] w.s.c.ServletWebServerApplicationContext : 
  Root WebApplicationContext: initialization completed in 3269 ms
2020-06-30 20:02:59.847  INFO 7972 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : 
Initializing ExecutorService 'applicationTaskExecutor'
2020-06-30 20:03:00.238  INFO 7972 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : 
Tomcat started on port(s): 8089 (http) with context path ''
2020-06-30 20:03:00.251  INFO 7972 --- [           main] com.example.demo.DemoApplication         : 
 Started DemoApplication in 4.945 seconds (JVM running for 5.758)
 {"records":{"value":{"event_envelope":{"data":" 
 {\"test\":\"test1\",\"testEventId\":1}"}}},"value_schema_id":"5"}
 2020-06-30 20:03:02.276  INFO 7972 --- [           main] ConditionEvaluationReportLoggingListener : 

 Error starting ApplicationContext. To display the conditions report re-run your application with 
 'debug' enabled.
 2020-06-30 20:03:02.282 ERROR 7972 --- [           main] o.s.boot.SpringApplication               : 
 Application run failed

 java.lang.IllegalStateException: Failed to execute CommandLineRunner
 at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot- 
 2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]


at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot- 
 2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at com.example.demo.DemoApplication.main(DemoApplication.java:44) [classes/:na]
Caused by: org.springframework.web.client.RestClientException: No HttpMessageConverter for 
org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
at `org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted

2020-06-30 20:03:02.878 INFO 7972 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down 
ExecutorService 'applicationTaskExecutor'`
4

1 回答 1

0

问题似乎是由于您的客户端不知道如何读取这种“异国情调”数据类型:“application/vnd.kafka.avro.v2+json”

这是这篇文章中可能的解决方案: 没有找到适合响应类型的 HttpMessageConverter

总而言之 - 您要么必须为 MIME 类型“application/vnd.kafka.avro.v2+json”注册相应的 HttpMessageConverter,要么更改(如果您有权访问)从服务器返回的 MIME 类型。您还可以“破解”一个快速代理脚本,以便它进入服务器并获取数据,然后将其发送给您更改 MIME 类型的客户端。

依赖项:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

和代码片段:

@Bean
public RestTemplate restTemplate() {
   final RestTemplate restTemplate = new RestTemplate();

   List<HttpMessageConverter<?>> messageConverters = new ArrayList<>();
   MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
   List<MediaType> types = Arrays.asList(
         new MediaType("application", "*+json", DEFAULT_CHARSET)
    );
   
   converter.setSupportedMediaTypes(types);
   messageConverters.add(converter);
   restTemplate.setMessageConverters(messageConverters);

   return restTemplate;
}
于 2020-07-01T07:47:33.490 回答