问题标签 [debezium]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
113 浏览

debezium - 如何使用 java 捕获 Embedded-debezium 数据库连接失败?

在我基于嵌入式 debezium 的 CDC 应用程序中,我想捕获数据库连接失败以自动重试连接。

有没有办法在我的程序中捕获数据库连接失败?

0 投票
0 回答
476 浏览

apache-kafka-connect - Kafka 将 MySQL 连接到 Redshift - 日期导致错误

尝试将 RDS MySQL 表流式传输到 Redshift 时出现此错误:转换数据时出错,参数类型无效

问题字段是DATETIMEMySQL 和timestamp without time zoneRedshift 中的 a (同样适用于timestamp with time zone)。注意:在我填充日期字段之前,管道工作正常。

我们使用 Debezium 作为 Kafka Connect 源,用于将数据从 RDS 获取到 Kafka。以及用于接收器的带有 Redshift JDBC 驱动程序的 JDBC 接收器连接器。

另外...如果我将 Redshift 字段设为 avarchar或 a ,我就能让数据流动起来bigint。当我这样做时,我看到数据以毫秒为单位的 unix 纪元整数出现。但我们真的很想要一个时间戳!

上下文中的错误消息:

谢谢,

汤姆

0 投票
1 回答
1092 浏览

sql-server - Kafka:从启用更改跟踪的 SQL 服务器读取

我一直在尝试将数据从 SQL 服务器(启用更改跟踪)加载到 Kafka,以便它可以被一个或多个系统(报告、其他数据库等)使用

我已经设法为 sql server (confluentinc/kafka-connect-cdc-mssql:1.0.0-preview) 配置了 Kafka 连接插件,并且我还设法在 kafka 机器上启动了它。

我一直在寻找有助于回答以下问题的文档(找不到任何文档)

  1. 我如何将 kafka 主题与此连接相关联?
  2. 根据我发现的信息(在 debezium 论坛上),将为每个单独的表创建一个主题->它与 kafka sql server 连接器的工作方式相同吗?
  3. 我已经在分布式模式下配置了连接,我们在多台服务器上运行 kafka,我们需要在每台服务器上运行连接吗?
  4. 有没有人将 Debezium 与 sql server 更改跟踪和 kafka 一起使用?Debezium 的网站描述了“alpha 阶段”中的连接,我想知道是否有任何活跃用户。

PS:我也对将实时数据从 sql server 加载到 Kafka 的其他选项持开放态度(带有时间戳/数字字段的 jdbc 连接是我的备份选项。备份选项,因为我的源数据库中有一些表不包含此类字段[没有也不能使用数字/时间戳字段跟踪更改])。

0 投票
0 回答
187 浏览

apache-spark - 如何使用 Pyspark 处理来自 Kafka 的 Debezium 事件?

我遵循此说明https://debezium.io/docs/tutorial-for-0-2/。我的 CDC 针对 mysql 事件(创建、更新、删除)运行良好。

我尝试从 pyspark 使用 python 获取这个 kafka 事件,我的代码仍然无法获取该事件。

代码下方:

从这段代码中,我得到了以下错误:

2018-11-14 16:22:39 错误执行程序:91 - org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala: 99) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark 的 org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) .streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo (KafkaInputDStream.scala:68) 在 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) 在org.apache。spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark。 streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$ $anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker。在 java.lang.Thread.run(Thread.java:748) 2018-11-14 16:22:39 运行(ThreadPoolExecutor.java:624) WARN TaskSetManager:66 - 在阶段 0.0 中丢失任务 0.0(TID 0,本地主机,执行器驱动程序):在 org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68)的 org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)的 java.lang.AbstractMethodError ) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) 在 org.apache.spark。 internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart( KafkaInputDStream.scala:90) 在 org.apache.spark。streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$ $anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun $34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent。ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

2018-11-14 16:22:39 错误 TaskSetManager:70 - 阶段 0.0 中的任务 0 失败 1 次;中止作业 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - 从池中删除 TaskSet 0.0,其任务已全部完成 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - 取消阶段 0 2018-11 -14 16:22:39 INFO DAGScheduler:54 - ResultStage 0(从 NativeMethodAccessorImpl.java:0 开始)在 0.438 秒内失败,原因是作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:丢失阶段 0.0 中的任务 0.0(TID 0,本地主机,执行程序驱动程序):org.apache.spark.internal.Logging$class.initializeLogIfNecessary 的 java.lang.AbstractMethodError (Logging.scala:99) at org.apache.spark.streaming。 kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) 在 org.apache.spark.internal.Logging$class。

任何建议如何获取流数据?此致。

0 投票
1 回答
329 浏览

postgresql - 有没有办法在 Debezium Postgres 连接器中启用支持以捕获复合类型列?

我正在使用 Debezium Postgres 连接器,我发现该连接器没有捕获复合类型的列或复合类型的数组。启用此功能是否涉及某些配置,或者不支持此功能?我没有在 debezium 消息的模式部分中看到复合类型的定义,也没有在有效负载中看到复合类型本身。谢谢。

0 投票
3 回答
6574 浏览

mysql - 使用表白名单选项更新 Debezium MySQL 连接器

我正在使用 Debezium (0.7.5) MySQL 连接器,如果我想使用选项更新此配置,我试图了解什么是最佳方法table.whitelist

假设我创建了一个连接器,如下所示:

一段时间后(2 周),我需要在myDb.table3这个选项中添加一个新表()table.whitelist(这个表是旧表,它是在连接器之前创建的)

我尝试的是:

  • 暂停连接器。
  • 删除了历史主题(也许这是问题所在?)。
  • 通过 API 更新配置端点更新配置。
  • 恢复连接器。

通过 API 更新命令:

但它没有用,也许这根本不是最好的方法。在其他连接器中,我没有使用 option table.whitelist,所以当我需要收听新表时,我没有这个问题。

我的最后一个选择是删除此连接器并使用此新配置创建另一个连接器,同时监听新表(myDb.table3)。myDb.table3问题是,如果我想要来自我必须使用快照创建的初始数据,initial但我不想从其他表的快照中生成所有消息myDb.table1,myDb.table2

0 投票
1 回答
780 浏览

java - Debezium 没有为 mysql 提供嵌入式版本的 CDC

我正在使用以下依赖项,

下面是我的java方法,

我的 sql 配置,.

当我运行这段代码时,我得到了历史日志的偏移量,mysql.log 文件也得到了添加到它的偏移量。但是,当我对表执行任何更新语句时,它没有给我任何日志,即没有调用 handleEvent 方法。谁能告诉我代码或配置有什么问题?

下面是运行java代码后的日志,

log4j:WARN 找不到记录器 (org.apache.kafka.connect.json.JsonConverterConfig) 的附加程序。log4j:WARN 请正确初始化 log4j 系统。

log4j:WARN 见http://logging.apache.org/log4j/1.2/faq.html#noconfig了解更多信息。2018 年 11 月 28 日下午 1:29:47 com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO:得到记录:SourceRecord{sourcePartition={server=my-app-connector},sourceOffset={file=mysql-bin.000002,pos= 980, gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, 快照=true}} ConnectRecord{topic='my-app-connector', kafkaPartition=0, key=Struct{databaseName=}, value=Struct {source=Struct{version=0.8.3.Final,name=my-app-connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true}, databaseName=,ddl=SET character_set_server=latin1, collat​​ion_server=latin1_swedish_ci;}, timestamp=null, headers=ConnectHeaders(headers=)} 2018 年 11 月 28 日下午 1:29:47 com.github.shyiko.mysql.binlog.BinaryLogClient 连接信息:在 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 连接到 localhost:3306 (sid:6326, cid:21)

0 投票
2 回答
1162 浏览

apache-kafka - Debezium-不包含连接器类型

我正在尝试使用 Debezium 连接到本地机器上的 mysql 数据库。

尝试使用以下命令调用 kafka:sudo kafka/bin/connect-standalone.shsh kafka/config/connect-standalone.properties kafka/config/connector.properties

这是connector.properties中的配置:

运行上述命令时出现以下错误:

任何帮助将不胜感激。

0 投票
2 回答
336 浏览

kubernetes - 无法在 minikube 上部署 debezium

我是 kubernetes 的新手,我正在尝试将 kafka 与 debezium 和 mysql 集成。我在 minikube 上成功部署了 kafka 和 mysql,一旦我在 minikube 上部署了 debezium yml,它就被挂起并且根本没有响应,然后我重新启动 minikube,在运行所有 pod minikube 后再次被挂起。下面是我的代码:

动物园管理员服务

动物园管理员部署:

卡夫卡服务:

卡夫卡部署:

MySql-持久化卷:

mysql部署:

Debezium 部署:

当我部署 debezium 时,问题就开始了,minikube 响应就像

操作系统:Centos
minikube 版本:v0.30.0

0 投票
1 回答
120 浏览

debezium - CDC-Debezium 在链中捕获数据

CDC-Debezium 会在源系统(例如 postgres)中发生此类事件(例如插入、更新或删除)时捕获事件,并将数据流式传输并发送到目标系统(例如 NoSQL 或 Apache-Kafka)。我对这种配置和设置非常陌生。

我想知道是否有任何方法可以在任何事件触发时捕获表链,例如假设源系统中有表 A 父和 B 子。现在表 B 中发生了一些变化,它已被成功捕获并流向目标系统。现在我需要 CDC-Debezium 中的一些方法或配置,它们将能够捕获表 B 中所做的更改,并且该表依赖于表 A,尽管表 A 或反之亦然没有发生任何更改。

请让我知道这方面的想法。