我正在尝试创建一个使用骆驼集成作为接收器的 Kafka 源。
当我向主题发送消息时demo-topic
,日志正在尝试将消息转换为 CloudEvent 消息,但失败了。
我遵循了Knative 示例并且它有效。日志还显示该消息必须转换为 CloudEvent 消息。
有谁知道将 Camel 集成用作 Sink 时会导致处理错误的原因是什么?
卡夫卡来源:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
namespace: demo-cluster
spec:
consumerGroup: demo-group
bootstrapServers:
- kafka-cluster-kafka-bootstrap.demo-cluster.svc:9092
- kafka-cluster-kafka-bootstrap.demo-cluster.svc:9093
topics:
- demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: camel-api
日志:
{"level":"info","ts":"2021-04-07T14:53:42.435Z","caller":"consumer/consumer_handler.go:67","msg":"Starting partition consumer, topic: demo-topic, partition: 0, initialOffset: -1"}
{"level":"debug","ts":"2021-04-07T14:54:32.236Z","caller":"consumer/consumer_handler.go:75","msg":"Message claimed","topic":"demo-topic","value":"SGVsbG8gV29ybGQh"}
{"level":"debug","ts":"2021-04-07T14:54:32.236Z","caller":"adapter/message.go:57","msg":"Message is not a CloudEvent -> We need to translate it to a valid CloudEvent"}
{"level":"debug","ts":"2021-04-07T14:54:51.667Z","caller":"adapter/adapter.go:149","msg":"Unexpected status code{status code 11 404 <nil>}"}
{"level":"info","ts":"2021-04-07T14:54:51.667Z","caller":"consumer/consumer_handler.go:83","msg":"Failure while handling a message","topic":"demo-topic","partition":0,"offset":29,"error":"404 Not Found"}
{"level":"error","ts":"2021-04-07T14:54:51.667Z","caller":"adapter/adapter.go:111","msg":"An error has occurred while consuming messages occurred: ","error":"404 Not Found","stacktrace":"knative.dev/eventing-kafka/pkg/source/adapter.(*Adapter).start.func2\n\t/opt/app-root/src/go/src/knative.dev/eventing/pkg/source/adapter/adapter.go:111"}