问题标签 [debezium]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
6458 浏览

python - 无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

我正在使用 Kafka 构建数据管道。数据流程如下:在mongodb中捕获数据变化,并发送到elasticsearch。

在此处输入图像描述

MongoDB

  • 3.6版
  • 分片集群

卡夫卡

  • Confuent 平台 4.1.0
  • mongoDB 源连接器:debezium 0.7.5
  • elasticserach 水槽连接器

弹性搜索

  • 版本 6.1.0

由于我仍在测试,因此与 Kafka 相关的系统正在单个服务器上运行。

  • 启动 ZooKeeper

    /li>
  • 启动引导服务器

    /li>
  • 启动注册表模式

    /li>
  • 启动 mongodb 源连接器

    /li>
  • 启动 elasticsearch sink 连接器

    /li>

上述系统一切正常。Kafka 连接器捕获数据更改 (CDC) 并通过接收器连接器成功将其发送到 elasticsearch。问题是我无法将字符串类型消息数据转换为结构化数据类型。例如,让我们在对 mongodb 进行一些更改后使用主题数据。

然后,我得到以下结果。

然后,如果我去弹性搜索,我会得到以下结果。

我想要实现的目标如下


我一直在尝试并仍在考虑的一些选项如下。

  • 日志存储

    • 案例1:不知道如何解析这些字符(/u0002,/u0001)

      • logstash.conf

        /li>
      • 结果

        /li>
    • 案例2

      • logstash.conf

        /li>
      • 测试.avsc

        /li>
      • 结果

        /li>
  • 蟒蛇客户端

    • 在一些数据操作之后使用主题并产生不同的主题名称,以便弹性搜索接收器连接器可以只使用来自 python 操作主题的格式良好的消息
    • kafka库:无法解码消息

      /li>
    • confluent_kafka与 python 3 不兼容


知道如何在 elasticsearch 中对数据进行 jsonify 处理吗?以下是我搜索的来源。

提前致谢。


一些尝试

1) 我已按如下方式更改了我的 connect-mongo-source.properties 文件以测试转换。

以下是我得到的错误日志。我还不习惯 Kafka,更重要的是 debezium 平台,我无法调试这个错误。

2)这一次,我改变了elasticsearch.properties,并没有改变connect-mongo-source.properties。

我得到了以下错误。

3) 更改 test.avsc 并运行 logstash。我没有收到任何错误消息,但结果不是我所期望的originsalary,name字段都是空的,即使它们被赋予了非空值。我什至能够通过控制台消费者正确读取数据。

0 投票
1 回答
2924 浏览

mysql - 在意外删除 AWS RDS 二进制日志后恢复 Debezium MySQL 连接器

当 Debezium 作为 kafka connect 中的源运行时,如果在该目标 MySQL 数据库(Amazon RDS 实例)上一段时间没有发生更新,那么一段时间后我会出现以下错误。

当我去数据库并检查 MySQL 中的二进制日志时

问题:

  1. 为什么Debezium闲置?为什么它在 002640 文件之后没有从 MySQL 读取文件?
    这未被任何服务使用。因此,在 Debezium 能够读取之前,不可能有太多写入发生的情况。
  2. 没有任何活动发生时,为什么Amazon MySQL RDS会删除 binlog 文件?
    这是一个测试数据库,只有我在其中插入记录。所以这里没有发生外部应用程序活动。
  3. 有没有办法恢复Debezium连接器并从 MySQL 当前可用的时间日志开始处理记录?(如果我对丢失的未读记录没意见)。
    我尝试重新启动作业,删除和添加连接器,但我总是以同样的错误告终。
    恢复活动的唯一解决方案
    • 删除 Kafka Connect 的offet 主题。
    • 再次删除并添加 debezium 连接器。
      我想要一种不同的方法,因为在生产中我们将拥有大量使用相同偏移主题的连接器。所以删除是不可能的。
0 投票
2 回答
1177 浏览

apache-kafka - 如何在 Kafka 中删除连接器插件

这是我的连接器插件列表:

如何删除以下连接器?

0 投票
1 回答
7733 浏览

jboss - 复制槽已经存在

每当我重新启动 debezium kafka-connect 容器或部署另一个实例时,我都会收到以下错误:

我正在使用这个图像:https ://github.com/debezium/docker-images/tree/master/connect/0.8

并有这样的配置:

我注意到这两个配置选项(slot.name 和 slot.drop_on_stop),但我不清楚是否/如何更改它们:

http://debezium.io/docs/connectors/postgresql/#connector-properties

0 投票
1 回答
610 浏览

connect - Debezium连接器插入kafka密钥作为avro?

我正在寻找一个 debezium mysql 连接器,以将 CDC 记录流式传输到 kafka,其中键为字符串(不是 avro 键),值作为 avro 记录。默认情况下,它将密钥作为 avro 记录。有什么建议么 ?

0 投票
1 回答
2816 浏览

postgresql - Kafka Connect Debezium postgres

我正在尝试使用 Debezium 将 Amazon RDS 中托管的 Postgres SQL 数据库与 Kafka 主题连接起来。

我正在关注以下教程:

http://debezium.io/docs/tutorial/

我的 kafka 和 kafka 连接服务启动良好,并且 kafka 连接服务还在 /usr/share/java 目录中获取了我的 debezium postgres 连接器 jar。

但是,在尝试使用以下 curl 命令通过 kafka 连接 API 附加 postgres 配置 json 时:

我最终收到以下错误:

有人可以就异常以及如何解决它提出建议吗?

是否有我可能在这里丢失的属性/配置?

0 投票
2 回答
1382 浏览

apache-kafka-connect - Debezium MySQL (MariaDB) 连接器:如何从以前的 bin-log 文件位置恢复

我正在使用Debezium-connector-mysql-0.7.5-plugin为 CDC 连接 MariaDB v10.0.32。CDC 记录保存在 HDFS 中以供进一步处理。一切都很完美,直到出现以下情况:

  1. 停止连接
  2. 完全停止卡夫卡
  3. 停止 MariaDB 服务器

从CDC Records可以看出,最新处理的bin log坐标如下:

由于上述行为,我面临以下后果:

  1. MariaDB 在重启期间轮换它的 bin 日志,当前状态如下

    +------------------+----------+--------------+------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ | mysql-bin.000009 | 326 | | | +------------------+----------+--------------+------------------+

  2. 我之前注册的 Debezium 连接器在新启动的 Kafka 中不再可用

在这种情况下,如果我从头开始,连接器将通过以下操作获取初始快照:

  1. 放置锁然后从所有表中读取数据
  2. 完成后,它开始读取最新的 bin 日志文件,即 mysql-bin.000009

考虑到我的情况,有没有办法指示 Debezium 从 mysql-bin.000008 - 位置 2155 恢复其操作并绕过初始快照。

在此先感谢您的帮助。

0 投票
1 回答
1084 浏览

mysql - 将 docker 连接到 MySQL 数据库

我是 debezium 的新手,正在尝试将我的 kafka 连接器连接到本地 WAMP 服务器上已有的 MySQL 数据库。

我按照教程文档启动了zookeeper和kafka,然后启动了kafka connector。在我的邮递员中,我将以下 JSON 发送到我的 kafka 连接器,但我一直收到错误的请求响应。

启动 Zookepper

启动卡夫卡

启动 Kafka 连接器

使用我的邮递员,我发送了以下内容

我的坏反应

0 投票
2 回答
926 浏览

google-cloud-platform - Cloud SQL PostgreSQL 是否有可用的逻辑解码插件?

我面临以下情况:

我必须将我的 Cloud SQL PostgreSQL 实例逻辑复制到外部 PostgreSQL 数据库,反之亦然。更具体地说,CloudSQL 和外部实例将包含一些必须在每个数据库之间复制的主从表。但是,GCP 目前不支持 PostgreSQL 逻辑和外部复制。

由于不支持,我搜索了一个替代方案,我注意到 Debezium + Kafka 可以与 CloudSQL MySQL 一起使用来捕获数据库更改并进行逻辑复制。我在 Debezium google 组(下面的链接)上问了同样的问题 https://groups.google.com/forum/#!topic/debezium/yS61un46x8k

他们已经回答我:
“Debezium 需要在源 Postgres 中安装特定的逻辑解码插件(ProtoBufs 或 wal2json),我不确定您的云 SQL 提供商是否具有这种灵活性(但例如在 Amazon RDS, wal2json 默认安装)。”

如果这些插件之一可用或可以安装在 Cloud SQL PostgreSQL 中,有人可以回答我吗?如果没有,在平台不支持的情况下,是否有任何替代方法用于逻辑复制?提前致谢。

0 投票
1 回答
894 浏览

transactions - kafka-connect 支持事务吗?

我需要编写一个客户端,它将事件插入到 kafka 中。我需要在单个事务中将多个事件插入到多个主题中。我想知道是否可以通过 kafka-connector 或者我应该在 java 中为相同的创建一个新的生产者这将开始并提交事务。