问题标签 [debezium]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON
我正在使用 Kafka 构建数据管道。数据流程如下:在mongodb中捕获数据变化,并发送到elasticsearch。
MongoDB
- 3.6版
- 分片集群
卡夫卡
- Confuent 平台 4.1.0
- mongoDB 源连接器:debezium 0.7.5
- elasticserach 水槽连接器
弹性搜索
- 版本 6.1.0
由于我仍在测试,因此与 Kafka 相关的系统正在单个服务器上运行。
启动 ZooKeeper
/li>启动引导服务器
/li>启动注册表模式
/li>启动 mongodb 源连接器
/li>启动 elasticsearch sink 连接器
/li>
上述系统一切正常。Kafka 连接器捕获数据更改 (CDC) 并通过接收器连接器成功将其发送到 elasticsearch。问题是我无法将字符串类型消息数据转换为结构化数据类型。例如,让我们在对 mongodb 进行一些更改后使用主题数据。
然后,我得到以下结果。
然后,如果我去弹性搜索,我会得到以下结果。
我想要实现的目标如下
我一直在尝试并仍在考虑的一些选项如下。
日志存储
案例1:不知道如何解析这些字符(/u0002,/u0001)
logstash.conf
/li>结果
/li>
案例2
logstash.conf
/li>测试.avsc
/li>结果
/li>
蟒蛇客户端
- 在一些数据操作之后使用主题并产生不同的主题名称,以便弹性搜索接收器连接器可以只使用来自 python 操作主题的格式良好的消息
/li>kafka
库:无法解码消息confluent_kafka
与 python 3 不兼容
知道如何在 elasticsearch 中对数据进行 jsonify 处理吗?以下是我搜索的来源。
提前致谢。
一些尝试
1) 我已按如下方式更改了我的 connect-mongo-source.properties 文件以测试转换。
以下是我得到的错误日志。我还不习惯 Kafka,更重要的是 debezium 平台,我无法调试这个错误。
2)这一次,我改变了elasticsearch.properties,并没有改变connect-mongo-source.properties。
我得到了以下错误。
3) 更改 test.avsc 并运行 logstash。我没有收到任何错误消息,但结果不是我所期望的origin
,salary
,name
字段都是空的,即使它们被赋予了非空值。我什至能够通过控制台消费者正确读取数据。
mysql - 在意外删除 AWS RDS 二进制日志后恢复 Debezium MySQL 连接器
当 Debezium 作为 kafka connect 中的源运行时,如果在该目标 MySQL 数据库(Amazon RDS 实例)上一段时间没有发生更新,那么一段时间后我会出现以下错误。
当我去数据库并检查 MySQL 中的二进制日志时
问题:
- 为什么Debezium闲置?为什么它在 002640 文件之后没有从 MySQL 读取文件?
这未被任何服务使用。因此,在 Debezium 能够读取之前,不可能有太多写入发生的情况。 - 没有任何活动发生时,为什么Amazon MySQL RDS会删除 binlog 文件?
这是一个测试数据库,只有我在其中插入记录。所以这里没有发生外部应用程序活动。 - 有没有办法恢复Debezium连接器并从 MySQL 当前可用的时间日志开始处理记录?(如果我对丢失的未读记录没意见)。
我尝试重新启动作业,删除和添加连接器,但我总是以同样的错误告终。
恢复活动的唯一解决方案- 删除 Kafka Connect 的offet 主题。
- 再次删除并添加 debezium 连接器。
我想要一种不同的方法,因为在生产中我们将拥有大量使用相同偏移主题的连接器。所以删除是不可能的。
apache-kafka - 如何在 Kafka 中删除连接器插件
这是我的连接器插件列表:
如何删除以下连接器?
jboss - 复制槽已经存在
每当我重新启动 debezium kafka-connect 容器或部署另一个实例时,我都会收到以下错误:
我正在使用这个图像:https ://github.com/debezium/docker-images/tree/master/connect/0.8
并有这样的配置:
我注意到这两个配置选项(slot.name 和 slot.drop_on_stop),但我不清楚是否/如何更改它们:
http://debezium.io/docs/connectors/postgresql/#connector-properties
connect - Debezium连接器插入kafka密钥作为avro?
我正在寻找一个 debezium mysql 连接器,以将 CDC 记录流式传输到 kafka,其中键为字符串(不是 avro 键),值作为 avro 记录。默认情况下,它将密钥作为 avro 记录。有什么建议么 ?
postgresql - Kafka Connect Debezium postgres
我正在尝试使用 Debezium 将 Amazon RDS 中托管的 Postgres SQL 数据库与 Kafka 主题连接起来。
我正在关注以下教程:
http://debezium.io/docs/tutorial/
我的 kafka 和 kafka 连接服务启动良好,并且 kafka 连接服务还在 /usr/share/java 目录中获取了我的 debezium postgres 连接器 jar。
但是,在尝试使用以下 curl 命令通过 kafka 连接 API 附加 postgres 配置 json 时:
我最终收到以下错误:
有人可以就异常以及如何解决它提出建议吗?
是否有我可能在这里丢失的属性/配置?
apache-kafka-connect - Debezium MySQL (MariaDB) 连接器:如何从以前的 bin-log 文件位置恢复
我正在使用Debezium-connector-mysql-0.7.5-plugin为 CDC 连接 MariaDB v10.0.32。CDC 记录保存在 HDFS 中以供进一步处理。一切都很完美,直到出现以下情况:
- 停止连接
- 完全停止卡夫卡
- 停止 MariaDB 服务器
从CDC Records可以看出,最新处理的bin log坐标如下:
由于上述行为,我面临以下后果:
MariaDB 在重启期间轮换它的 bin 日志,当前状态如下
+------------------+----------+--------------+------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ | mysql-bin.000009 | 326 | | | +------------------+----------+--------------+------------------+
- 我之前注册的 Debezium 连接器在新启动的 Kafka 中不再可用
在这种情况下,如果我从头开始,连接器将通过以下操作获取初始快照:
- 放置锁然后从所有表中读取数据
- 完成后,它开始读取最新的 bin 日志文件,即 mysql-bin.000009
考虑到我的情况,有没有办法指示 Debezium 从 mysql-bin.000008 - 位置 2155 恢复其操作并绕过初始快照。
在此先感谢您的帮助。
mysql - 将 docker 连接到 MySQL 数据库
我是 debezium 的新手,正在尝试将我的 kafka 连接器连接到本地 WAMP 服务器上已有的 MySQL 数据库。
我按照教程文档启动了zookeeper和kafka,然后启动了kafka connector。在我的邮递员中,我将以下 JSON 发送到我的 kafka 连接器,但我一直收到错误的请求响应。
启动 Zookepper
启动卡夫卡
启动 Kafka 连接器
使用我的邮递员,我发送了以下内容
我的坏反应
google-cloud-platform - Cloud SQL PostgreSQL 是否有可用的逻辑解码插件?
我面临以下情况:
我必须将我的 Cloud SQL PostgreSQL 实例逻辑复制到外部 PostgreSQL 数据库,反之亦然。更具体地说,CloudSQL 和外部实例将包含一些必须在每个数据库之间复制的主从表。但是,GCP 目前不支持 PostgreSQL 逻辑和外部复制。
由于不支持,我搜索了一个替代方案,我注意到 Debezium + Kafka 可以与 CloudSQL MySQL 一起使用来捕获数据库更改并进行逻辑复制。我在 Debezium google 组(下面的链接)上问了同样的问题 https://groups.google.com/forum/#!topic/debezium/yS61un46x8k
他们已经回答我:
“Debezium 需要在源 Postgres 中安装特定的逻辑解码插件(ProtoBufs 或 wal2json),我不确定您的云 SQL 提供商是否具有这种灵活性(但例如在 Amazon RDS, wal2json 默认安装)。”
如果这些插件之一可用或可以安装在 Cloud SQL PostgreSQL 中,有人可以回答我吗?如果没有,在平台不支持的情况下,是否有任何替代方法用于逻辑复制?提前致谢。
transactions - kafka-connect 支持事务吗?
我需要编写一个客户端,它将事件插入到 kafka 中。我需要在单个事务中将多个事件插入到多个主题中。我想知道是否可以通过 kafka-connector 或者我应该在 java 中为相同的创建一个新的生产者这将开始并提交事务。