我想将发布消息的 curl 命令转换为 Kafka Rest Proxy。卷曲命令是
curl -u username:password --request POST --url https://kafka-rest-proxy-*****.com/topics/test-topic --header 'accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json' --header 'content-type: application/vnd.kafka.avro.v2+json' --data '{"value_schema_id": 5, "records": [{"value": {"event_envelope":{"data":{"test":"using curl","testEventId":23}}}}]}'
我想通过 Spring 中的 Restemplate 发送此请求。如何将 curl 命令转换为 restemplate。我已经写了一些代码,但我遇到了问题。
@SpringBootApplication
public class DemoApplication implements CommandLineRunner{
String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********",
"test-topic");
String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry-
*********", "test-topic");
@Autowired
private RestTemplate restTemplate;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
ObjectMapper obj = new ObjectMapper();
JSONObject event = new JSONObject();
JSONObject record = new JSONObject();
JSONObject eventenvolpe = new JSONObject();
JSONObject jsondata = new JSONObject();
JSONArray jsonarray =new JSONArray();
JSONObject recordvalue =new JSONObject();
// connecting to schema registary and getting back schema
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
headers.setBasicAuth("username", "password");
HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);
ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity,
String.class);
if(result.getStatusCodeValue()==200) {
JsonNode rootNode = obj.readTree(result.getBody());
JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema
event = new JSONObject();
record = new JSONObject();
eventenvolpe = new JSONObject();
jsondata = new JSONObject();
jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
eventenvolpe.put("event_envelope", jsondata);
recordvalue.put("value", eventenvolpe);
jsonarray.put(recordvalue);
event.put("value_schema_id", schema_id);
event.put("records", recordvalue);//setting up event object to send to kafka
System.out.println(event);
向卡夫卡发送消息
HttpHeaders messageheaders = new HttpHeaders();
messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
messageheaders.setBasicAuth("username", "password");
HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );
ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST,
message, String.class);
if(result1.getStatusCodeValue()==200) {
System.out.println("Message is pushed to Kafka");
} }}}
我收到的错误消息
'at com.example.demo.DemoApplication.main(DemoApplication.java:44)
[classes/:na]
Caused by: org.springframework.web.client.RestClientException: No
HttpMessageConverter for
org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted