问题标签 [confluent-schema-registry]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
docker - Kafka - 从命令行生成时出错(字符('<'(代码 60)):预期有效值)
我在笔记本电脑上旋转了 Docker 中的 Kafka(使用 docker-compose)。
之后,创建了新的 kafka 主题:
(尚未在模式注册表中创建模式)。
现在尝试生成(基于此示例 - 第 3 步 - https://docs.confluent.io/4.0.0/quickstart.html):
输入值:
错误:
如何解决这个问题?
可能是因为 Kafka 集群使用 SSL 但错误是虚假的吗?谢谢。
ssl - 带有 SSL 的 Kafka - 写入主题 - 授权错误
我正在尝试从命令行生成启用 SSL 的本地 Kafka 集群上的主题。
刚刚创建的主题是:
生产命令是:
打字:
错误:
如何添加对该主题的访问权限?
ACL 的正确命令是什么(我在本地机器上运行它)。
apache-kafka - 无法将 Kafka MQTT 源连接器链接到 InfluxDB 接收器连接器
我们正在尝试将 MQTT 源连接器链接到 InfluxDB 接收器连接器。现在前者工作正常,但后者给出以下例外:
org.apache.kafka.connect.errors.ConnectException:由于不可恢复的异常而退出 WorkerSinkTask。在 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) 在 org.apache.kafka.connect .runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask .java:146) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util。 concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.
这是 InfluxDB 配置文件
这是消息结构:
{ "timestamp": 1524572345184, "partition": 0, "key": { "topic": "machine/sensor/mytopic/test", "id": "1" }, "offset": 0, "topic" :“simMetrics”,“值”:{“指标”:{“蜂鸣器”:0,“LED”:0,“水”:假,“蜂鸣器时间戳”:1524571762798,“温度时间戳”:1524571762816,“水时间戳”:1524571762835 ,“风扇”:0,“光”:500,“温度”:27.371554588194957,“资产名称”:“SIMopcua”,“fan_timestamp”:1524571762791,“光时间戳”:1524571762808,“led_timestamp”:1524571762827 }}}
MQTT 源连接器配置:
更新
我们发现问题出在温度值格式上。由于我们没有配置字段的类型,InfluxDB 将温度值理解为双精度值。所有具有小数点分隔符的值都被正确保存,当 Kafka 发送值时出现问题,没有小数部分,省略了小数点分隔符。我们如何解决这个问题?
PS:实际的解决方法是为所有输入温度添加 0.00000001。
apache-kafka - 架构注册表:部分共享/授权系统
我们需要与另一家公司共享部分模式注册表,并且不希望他们看到所有模式。他们也需要为他们做同样的事情。
有什么方法可以让我们每个人只共享部分模式注册表?
apache-kafka - 使用 kafka s3 sink connect 时,我可以使用模式注册表来获取模式吗?
我有一个 kafka 主题,其值为 avro 格式,其中架构存储在架构注册表中。
现在我想设置一个 S3 Sink,如下:https ://docs.confluent.io/current/connect/connect-storage-cloud/kafka-connect-s3/docs/s3_connector.html#basic-example
在网页中,他们使用
当我尝试重新加载生成的 .avro 数据时,我发现架构有点不同。例如,嵌套的枚举类型变成了字符串。我只能恢复 aGenericRecord
而不是 a SpecificRecord
。
有没有办法指定一个模式生成器,它从模式注册表中检索模式?
apache-kafka - 使用远程 Kafka schema_registry 时写入失败
我的代码实现了通过 Schema_Registry 服务器将 Avro 数据写入 Kafka 代理。本地测试很好(I setup a local Broker and Schema_Registry server)
。但是当我更改配置文件并使用远程 kafka 和 schema_registry 服务器时,由于 TimeoutException,写入失败。(but when I logged in the remote server and used Avro producer/consumer command line, it works good too. )
我是否需要在 Schema_Registry 端或客户端更改任何配置?
我的 schema_registry 服务器属性文件如下:
我的生产者代码如下:
python - 我们是否需要手动缓存模式注册表?
我们目前正在使用 Protocol Buffers 作为 kafak 消息的序列化机制。我们将搬到 Avro。我们使用 Schema Registry 测试了 Avro Confluent 消费者,根据这些测试,与 protobuff 消费者相比,Avro 消费者有点慢。
我的问题是我们是否需要手动兑现模式或 Python AvroConsumer 自己处理现金?我正在使用 confluent_kafka AvroConsumer。
java - 如何从kafka avro记录生成pojo?
我有一个 User 类,我将它序列化为 avro,(使用 Confluent avro 序列化程序和模式注册表)并将其发布到 Kafka 主题。我让消费者将数据打印到控制台,它工作正常。我现在尝试的是从这些数据中创建原始对象。例如,我将“用户”对象作为 avro 发布到 Kafka 主题。我正在尝试在使用后重新创建该用户对象(而不是控制台输出)。这可能吗?
下面是我的代码
用户类
消费者代码
谢谢