2

我是 Pulsar 的新手,我只是在探索一个新项目的功能。我正在尝试一个非常基本的示例来从基于模式的生产者发送数据。提供一些背景知识,我的想法是将数据从 apache-pulsar 发送到 Clickhouse 数据库。我已经完成了接收器连接器的设置,并使用以下命令进行了验证

bin/pulsar-admin sinks status --tenant public --namespace default --name jdbc-clickhouse-sink

bin/pulsar-admin sinks list --tenant public --namespace default 输出:[“jdbc-clickhouse-sink”]

所以我在 Clickhouse DB 中创建了一个表。我希望将数据发送到应保存在数据库中的主题。在这样做时,我想保持模式一致,所以我想设置一个模式。下面的示例代码

import pulsar
from pulsar.schema import *

class Example(Record):
    a = Integer()
    b = Integer()
    c = Integer()


client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
                    topic='my-topic',
                    schema=AvroSchema(Example) )

producer.send(Example(  a=444 , b=62, c=999 ))

当我运行上面的代码时,我收到以下错误

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-114-3b0aa7d0415f> in <module>
      9 
     10 client = pulsar.Client('pulsar://localhost:6650' class="ansi-blue-fg">)
---> 11 producer = client.create_producer(
     12                     topic='my-topic',
     13                     schema=AvroSchema(Example) )

~/opt/anaconda3/lib/python3.8/site-packages/pulsar/__init__.py in 
create_producer(self, topic, producer_name, schema, initial_sequence_id, 
send_timeout_millis, compression_type, max_pending_messages, 
max_pending_messages_across_partitions, block_if_queue_full, batching_enabled, 
batching_max_messages, batching_max_allowed_size_in_bytes, 
batching_max_publish_delay_ms, message_routing_mode, properties, batching_type)
    560 
    561         p = Producer()
--> 562         p._producer = self._client.create_producer(topic, conf)
    563         p._schema = schema
    564         return p

Exception: Pulsar error: IncompatibleSchema

有人可以帮助我在这里缺少什么

4

1 回答 1

0

确保你已经安装了带有 avro 的 Pulsar Python 客户端

pip3 安装 fastavro pip3 安装 pytz pip3 安装 pulsar-client[avro]

在这里查看我的 python 示例与模式 https://github.com/tspannhw/FLiP-Pi-Weather/blob/main/weather.py

查看我的示例 https://github.com/tspannhw/FLiP-Stream2Clickhouse

检查您的架构 bin/pulsar-admin 架构获取持久性://public/default/my-topic

Python 文档 https://pulsar.apache.org/api/python/ https://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema

Pulsar 每个客户端可用的功能 https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit

您可能需要从实际的 Avro Schema 文件生成类,这通常在 Java 中完成。

看这个例子:

https://github.com/ta1meng/pulsar-python-avro-schema-examples

如果您不需要 Avro,则 JsonSchema 不需要此额外步骤

于 2022-02-23T14:12:51.187 回答