2

我想将发布消息的 curl 命令转换为 Kafka Rest Proxy。卷曲命令是

curl -u username:password --request POST --url https://kafka-rest-proxy-*****.com/topics/test-topic --header 'accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json' --header 'content-type: application/vnd.kafka.avro.v2+json' --data '{"value_schema_id": 5, "records": [{"value": {"event_envelope":{"data":{"test":"using curl","testEventId":23}}}}]}'

我想通过 Spring 中的 Restemplate 发送此请求。如何将 curl 命令转换为 restemplate。我已经写了一些代码,但我遇到了问题。

@SpringBootApplication
    public class DemoApplication implements CommandLineRunner{

   String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********", 
   "test-topic");
   String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry- 
   *********", "test-topic");

   @Autowired
   private RestTemplate restTemplate;

   public static void main(String[] args) {
   SpringApplication.run(DemoApplication.class, args);
   }

  @Override
  public void run(String... args) throws Exception {

 ObjectMapper obj = new ObjectMapper();
 JSONObject event = new JSONObject();
 JSONObject record = new JSONObject();
 JSONObject eventenvolpe = new JSONObject();
 JSONObject jsondata  = new JSONObject();
 JSONArray jsonarray =new JSONArray();
 JSONObject recordvalue =new JSONObject();
  // connecting to schema registary and getting back schema   
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
  headers.setBasicAuth("username", "password");
  HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);
 
     ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity, 
        String.class);
     if(result.getStatusCodeValue()==200) {
         
         JsonNode rootNode = obj.readTree(result.getBody());
         JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema
         
         event = new JSONObject();
         record = new JSONObject();
         eventenvolpe = new JSONObject();
         jsondata  = new JSONObject();
         jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
         eventenvolpe.put("event_envelope", jsondata);
         recordvalue.put("value", eventenvolpe);
         jsonarray.put(recordvalue);
         event.put("value_schema_id", schema_id);
         event.put("records", recordvalue);//setting up event object to send to kafka
         System.out.println(event);

向卡夫卡发送消息

    HttpHeaders messageheaders = new HttpHeaders();
    messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
    messageheaders.setBasicAuth("username", "password");
    HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );
     
    ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST, 
    message, String.class);
    
    if(result1.getStatusCodeValue()==200) {
        System.out.println("Message is pushed to Kafka");
    } }}}

我收到的错误消息

'at com.example.demo.DemoApplication.main(DemoApplication.java:44) 
 [classes/:na]
 Caused by: org.springframework.web.client.RestClientException: No 
  HttpMessageConverter for 
  org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
  at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted
4

1 回答 1

0

JacksonObjectMapper无法识别JSONObject,因此您需要将其转换为String

HttpEntity<String> message = new HttpEntity<>(event.toString(), messageheaders);

ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST,
            message, String.class);
于 2020-07-01T22:22:02.883 回答