0

I'm trying to wrap the Confluent kafka proxy api in one class that will handle producing and consuming.
Following this link: https://docs.confluent.io/platform/current/kafka-rest/api.html I tried to implement it as follows:

    def send(self, topic, data):
        try:
            r = requests.post(self._url('/topics/' + topic), json=data, headers=headers_v2)
            if not r.ok:
                raise Exception("Error: ", r.reason)
        except Exception as e:
            print(" ")
            print('Event streams send request failed')
            print(Exception, e)
            print(" ")
            return e

    

but I ended up working with 2 versions of the api (v2/v3) cause I didn't find some api's in one implementation and vise versa...
For example I didn't find how to create topic in v2, so I implemented it with v3.

My issue now is with the send method, I'm getting Internal server error and I can't find why!
Maybe because the create topic was done with v3 and I'm trying to produce messages with v2.

4

1 回答 1

0

I changed the data payload for the send to look like:
data = {"records": [{"value": data}]} and send passed,

poll passed when using:
r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'), headers={'Accept': 'application/vnd.kafka.json.v2+json'})

于 2021-05-25T11:58:03.917 回答