0

我正在尝试创建一个使用骆驼集成作为接收器的 Kafka 源。

当我向主题发送消息时demo-topic,日志正在尝试将消息转换为 CloudEvent 消息,但失败了。

我遵循了Knative 示例并且它有效。日志还显示该消息必须转换为 CloudEvent 消息。

有谁知道将 Camel 集成用作 Sink 时会导致处理错误的原因是什么?

卡夫卡来源:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
  namespace: demo-cluster
spec:
  consumerGroup: demo-group
  bootstrapServers:
    - kafka-cluster-kafka-bootstrap.demo-cluster.svc:9092
    - kafka-cluster-kafka-bootstrap.demo-cluster.svc:9093
  topics:
    - demo-topic
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: camel-api

日志:

{"level":"info","ts":"2021-04-07T14:53:42.435Z","caller":"consumer/consumer_handler.go:67","msg":"Starting partition consumer, topic: demo-topic, partition: 0, initialOffset: -1"}
{"level":"debug","ts":"2021-04-07T14:54:32.236Z","caller":"consumer/consumer_handler.go:75","msg":"Message claimed","topic":"demo-topic","value":"SGVsbG8gV29ybGQh"}
{"level":"debug","ts":"2021-04-07T14:54:32.236Z","caller":"adapter/message.go:57","msg":"Message is not a CloudEvent -> We need to translate it to a valid CloudEvent"}
{"level":"debug","ts":"2021-04-07T14:54:51.667Z","caller":"adapter/adapter.go:149","msg":"Unexpected status code{status code 11 404  <nil>}"}
{"level":"info","ts":"2021-04-07T14:54:51.667Z","caller":"consumer/consumer_handler.go:83","msg":"Failure while handling a message","topic":"demo-topic","partition":0,"offset":29,"error":"404 Not Found"}
{"level":"error","ts":"2021-04-07T14:54:51.667Z","caller":"adapter/adapter.go:111","msg":"An error has occurred while consuming messages occurred: ","error":"404 Not Found","stacktrace":"knative.dev/eventing-kafka/pkg/source/adapter.(*Adapter).start.func2\n\t/opt/app-root/src/go/src/knative.dev/eventing/pkg/source/adapter/adapter.go:111"}

Openshift 集群架构

4

0 回答 0