我正在尝试创建一个简单的 Resttemplate,它将从 Schema Registary 获取模式,如果响应为 200,它将获取模式 ID 并将其与 Avro 格式的消息一起发送到 kafka。这是我的代码:。
@SpringBootApplication
public class DemoApplication implements CommandLineRunner{
String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********", "test-topic");
String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry-*********", "test-topic");
@Autowired
private RestTemplate restTemplate;
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
ObjectMapper obj = new ObjectMapper();
JSONObject event = new JSONObject();
JSONObject record = new JSONObject();
JSONObject eventenvolpe = new JSONObject();
JSONObject jsondata = new JSONObject();
JSONArray jsonarray =new JSONArray();
JSONObject recordvalue =new JSONObject();
// connecting to schema registary and getting back schema
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
headers.setBasicAuth("username", "password");
HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);
ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity, String.class);
if(result.getStatusCodeValue()==200) {
JsonNode rootNode = obj.readTree(result.getBody());
JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema
event = new JSONObject();
record = new JSONObject();
eventenvolpe = new JSONObject();
jsondata = new JSONObject();
jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
eventenvolpe.put("event_envelope", jsondata);
recordvalue.put("value", eventenvolpe);
jsonarray.put(recordvalue);
event.put("value_schema_id", schema_id);
event.put("records", recordvalue);//setting up event object to send to kafka
System.out.println(event);
//Sending message to kafka
HttpHeaders messageheaders = new HttpHeaders();
messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
messageheaders.setBasicAuth("username", "password");
HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );
ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST, message, String.class);
if(result1.getStatusCodeValue()==200) {
System.out.println("Message is pushed to Kafka");
}
}
}
}
我能够成功地获取模式表单注册表,但是在发送回 kafka 时出现错误。
2020-06-30 20:02:56.078 INFO 7972 --- [ main] com.example.demo.DemoApplication :
No active profile set, falling back to default profiles: default
2020-06-30 20:02:59.235 INFO 7972 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer :
Tomcat initialized with port(s): 8089 (http)
2020-06-30 20:02:59.263 INFO 7972 --- [ main] o.apache.catalina.core.StandardService :
Starting service [Tomcat]
2020-06-30 20:02:59.264 INFO 7972 --- [ main] org.apache.catalina.core.StandardEngine :
Starting Servlet engine: [Apache Tomcat/9.0.36]
2020-06-30 20:02:59.434 INFO 7972 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] :
Initializing Spring embedded WebApplicationContext
2020-06-30 20:02:59.434 INFO 7972 --- [ main] w.s.c.ServletWebServerApplicationContext :
Root WebApplicationContext: initialization completed in 3269 ms
2020-06-30 20:02:59.847 INFO 7972 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor :
Initializing ExecutorService 'applicationTaskExecutor'
2020-06-30 20:03:00.238 INFO 7972 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer :
Tomcat started on port(s): 8089 (http) with context path ''
2020-06-30 20:03:00.251 INFO 7972 --- [ main] com.example.demo.DemoApplication :
Started DemoApplication in 4.945 seconds (JVM running for 5.758)
{"records":{"value":{"event_envelope":{"data":"
{\"test\":\"test1\",\"testEventId\":1}"}}},"value_schema_id":"5"}
2020-06-30 20:03:02.276 INFO 7972 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with
'debug' enabled.
2020-06-30 20:03:02.282 ERROR 7972 --- [ main] o.s.boot.SpringApplication :
Application run failed
java.lang.IllegalStateException: Failed to execute CommandLineRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot-
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779) [spring-boot-
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at com.example.demo.DemoApplication.main(DemoApplication.java:44) [classes/:na]
Caused by: org.springframework.web.client.RestClientException: No HttpMessageConverter for
org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
at `org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted
2020-06-30 20:03:02.878 INFO 7972 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down
ExecutorService 'applicationTaskExecutor'`