问题标签 [debezium]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3662 浏览

apache-kafka - Apache kafka 生产集群设置问题

我们一直在尝试在 AWS Linux 机器上建立一个生产级别的 Kafka 集群,直到现在我们都没有成功。

卡夫卡版本:2.1.0

机器:

默认代理配置:

我们的问题主要与海量数据有关。我们正在尝试使用 debezium 将我们现有的表转移到 kafka 主题中。其中许多表非常庞大,超过 50000000 行。

到目前为止,我们已经尝试了很多事情,但我们的集群每次都会失败,原因有一个或多个。

错误计划任务“isr-expiration”(kafka.utils.KafkaScheduler)org.apache.zookeeper.KeeperException$SessionExpiredException 中未捕获的异常:KeeperErrorCode = org.apache 上 /brokers/topics/__consumer_offsets/partitions/0/state 的会话已过期。 zookeeper.KeeperException.create(KeeperException.java:130) 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:54)..

错误2:

] INFO [Partition xxx.public.driver_operation-14 broker=3] 缓存的 zkVersion [21] 不等于 zookeeper 中的,跳过更新 ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,551] INFO [分区 xxx.public.hub-14 broker=3] 将 ISR 从 1,3 缩小到 3 (kafka.cluster.Partition) [2018-12-12 14:07:26,556] INFO [分区 xxx.public.hub-14 broker=3]缓存的zkVersion [3]不等于zookeeper中的,跳过更新ISR(kafka.cluster.Partition)[2018-12-12 14:07:26,556] INFO [Partition xxx.public.field_data_12_2018-7 broker= 3] 将 ISR 从 1,3 缩小到 3 (kafka.cluster.Partition)

错误 3:

isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=888665879, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) java.io.IOException: 在 org.apache.kafka.clients 读取响应之前,与 3 的连接已断开.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)

还有一些错误:

  1. 代理之间经常断开连接,这可能是 ISR 不断收缩和扩展而没有自动恢复的原因。
  2. 架构注册表超时。我不知道模式注册表是如何受到影响的。我没有看到该服务器上的负载过多。我错过了什么吗?我应该为架构注册表的多个实例使用负载均衡器作为故障转移吗?. __schemas 主题中只有 28 条消息。确切的错误消息是 RestClientException: Register operation timed out。错误代码:50002
  3. 有时消息传输率超过 100000 条消息/秒,有时它下降到 2000 条消息/秒?消息大小可能会导致这种情况?

    为了解决上面的一些问题,我们增加了brokers的数量并增加了zookeeper.session.timeout.ms=30000,但我不确定它是否真的解决了我们的问题,如果解决了,如何解决?

我有几个问题:

  1. 我们的集群是否足以处理这么多数据。
  2. 有什么明显的我们遗漏的吗?
  3. 如何在进入生产级别之前对我的设置进行负载测试?
  4. 什么可能导致代理和模式注册表之间的会话超时。
  5. 处理模式注册表问题的最佳方法。

我们的一位经纪人的网络负载。

网络字节在我们的一位经纪人中

随时询问更多信息。

0 投票
1 回答
519 浏览

mongodb - 如何配置在更新事件上发送的 debezium 字段(mongo 连接器)

我想使用 debezium mongo 连接器来:

-> 从 mongo 获取事件

-> 把它们放进我的卡夫卡

-> 从卡夫卡读取

我的问题是,当 debezium 从 mongo 获取更新事件时,它只发送更新的字段:

此集合上的更新更改事件的值实际上将具有完全相同的架构,其有效负载的结构将相同但将包含不同的值。具体来说,更新事件不会有 after 值,而是会有一个包含幂等更新操作的 JSON 表示的补丁字符串。

我想知道我是否可以以某种方式配置它,因为我想通过更新事件获得一些字段。

0 投票
1 回答
692 浏览

apache-kafka - MongoDb Debezium - “连接器配置不包含连接器类型”

我正在尝试为 Kafka 和 debezium 做 POC。

我已经启动了 kafka 和 zookeeper 并且他们正在工作......现在当我尝试加载 kafka-connect 时(我对此有点陌生......)我收到这个错误,我无法理解我做错了什么.

注意:我已经使用 Debezium 教程 docker images 测试了所有这些,但我想从远程服务器连接,我认为在没有 docker 的情况下安装所有东西会更容易使用配置

使用以下命令开始连接

连接-standalone.properties

连接独立worker.properties

debezium-connector.properties

运行连接时我得到以下信息:

0 投票
0 回答
2618 浏览

mysql - 创建kafka连接器时,使用localhost:8083,总是报错:curl(7) connection denied

我正在尝试使用 Debezium 将 MySql 连接到 Kafka。这是 REST API,我正在尝试执行。

0 投票
0 回答
202 浏览

java - MongoDb+Debezium+Kafka:ClassCastException,ObjectId 无法转换为 java.lang.String

我想用 Debezium 获取有关 MongoDB 更改的数据,并将数据写入 Kafka。但是当我使用此命令时
bin/connect-standalone.sh config/connect-standalone.properties mongo.properties
出现 ClassCastException。</p>

我的 mongo.properties:

这是我第一次搭建。请帮我。非常感谢。

0 投票
1 回答
630 浏览

elasticsearch - 使用 Kafka Connect Elasticsearch 连接器的消息顺序

我们在执行使用 Kafka Connect Elasticsearch 连接器将来自 Kafka 主题的消息发送到 Elasticsearch 的顺序时遇到问题。在主题中,消息的顺序正确且偏移量正确,但如果连续快速创建两条具有相同 ID 的消息,它们会以错误的顺序间歇性地发送到 Elasticsearch。这会导致 Elasticsearch 获得来自倒数第二条消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间添加一两秒的人为延迟,问题就会消失。

这里的文档指出:

通过使用分区级 Kafka 偏移量作为文档版本,并使用version_mode=external.

但是,我在任何地方都找不到有关此version_mode设置的任何文档,以及我们是否需要将其设置在某个地方。

在来自 Kafka Connect 系统的日志文件中,我们可以看到两条消息(对于相同的 ID)以错误的顺序被处理,相隔几毫秒。看起来这些是在不同的线程中处理的,这可能很重要。另请注意,该主题只有一个分区,因此所有消息都在同一个分区中。

下面是日志片段,为清楚起见稍作编辑。Kafka 主题中的消息由 Debezium 填充,我认为这与问题无关,但恰好包含时间戳值。这表明消息的​​处理顺序错误(尽管它们在 Kafka 主题中的顺序正确,由 Debezium 填充):

有谁知道在将消息发送到 Elasticsearch 时如何强制此连接器维护给定文档 ID 的消息顺序?

0 投票
3 回答
5206 浏览

apache-kafka - MySQL 的 Debezium 连接器。缺少数据库历史主题

我正在使用 Debezium 连接器 0.8 版从 MySQL 数据库中捕获更改并将其移动到 Kafka。我将 Docker 与 MySQL 容器一起使用,另一个用于连接器,另一个用于 Kafka。

当我停止 Docker ( docker-compose down) 并再次启动 Docker 时,通常会收到以下错误:

我已在此处的官方页面上阅读了此问题的解决方案:

https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/

但我遵循了这些步骤,我认为我的配置还可以:

请注意,如果我设置log.retention.ms为 -1 则log.retention.minutes不会log.retention.hours像官方文档解释的那样使用,然后我已经解决了保留大小和保留时间问题。

那么,有人知道我为什么会收到此错误吗?

这是大学工作的一部分。docker-compose我想在我的大学发布之前我无法分享完整的文件,但我可以向您展示与此问题相关的重要内容。我不认为这是一个配置问题,因为我的docker-compose.

而其他部分只是网络或不相关的东西。

0 投票
1 回答
3731 浏览

apache-kafka - 卡夫卡连接 | 由于操作冲突,无法完成请求

1)我们有 3 个节点 kafka 和 kafka 连接集群

2)我们仅在分布式模式下在 kafka 节点上运行 kafka-connect

3)当我尝试使用以下配置创建连接器时:

在创建连接器的请求中,它在 3 个节点中的 2 个节点上给了我以下错误:

在其中一个节点上:我能够创建一个连接器,但任务没有开始,我可以在日志中看到以下错误:

我无法弄清楚是什么导致了这个问题。

0 投票
1 回答
315 浏览

postgresql - 是否可以将 Debezium 连接到 9.6 以上版本的 Postgres 从站?

我在 Debezium 文档中发现了这个限制:

PostgreSQL 9.6 仅支持主服务器上的逻辑复制槽。这意味着不能为 PostgreSQL 集群中的副本配置逻辑复制,因此 Debezium PostgreSQL 连接器只能与主服务器连接和通信。

此限制是否也适用于以上版本的 PostgreSQL 9.6
可以在以下版本中将 Debezium 与 PostgreSQL 从站一起使用:10.x11.x

0 投票
3 回答
3014 浏览

microservices - 使用数据库 CDC 的事件溯源是否被认为是好的架构?

当我们谈论采购事件时,我们有一个简单的双写架构,我们可以在其中写入数据库,然后将事件写入像 Kafka 这样的队列。其他下游系统可以读取这些事件并相应地处理/使用它们。

但是当尝试使数据库和事件同步时会出现问题,因为需要这些事件的顺序才能理解它。

为了解决这个问题,人们鼓励使用数据库提交日志作为事件源,并且围绕它构建了一些工具,如 Airbnb 的 Spinal Tap、Redhat 的 Debezium、Oracle 的 Golden Gate 等……它解决了一致性、排序保证和所有这些。

但是使用数据库提交日志作为事件源的问题是我们与数据库模式紧密耦合。微服务的数据库架构被暴露,数据库架构中的任何重大更改(如数据类型更改或列名更改)实际上都会破坏下游系统。

那么使用 DB CDC 作为事件源是个好主意吗?

讨论这个问题并使用 Debezium 进行事件溯源