问题标签 [debezium]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
mongodb - MongoDB 强制更新
我正在开发一个使用Debezium将更改从 MongoDB 流式传输到 Kafka 集群的应用程序。tl;dr - debezium 跟踪 mongo oplog 以更改文档并将整个 doc 存储在 kafka 上作为 json。
我们有一堆旧的、陈旧的生产数据,我们仍然希望按原样保留在 Kafka 上。为了让 Debezium 提取文档,它们首先必须出现在 OpLog 中。本质上,我们需要touch
在我们的生产 mongo replset 中的每个文档不更改文档上的任何数据。我最初的想法是用当前内容更新一个字段:
但是,mongo 将其视为 noop:
WriteResult({ "nMatched" : 959, "nUpserted" : 0, "nModified" : 0 })
我的问题是:是否可以$set
在 MongoDB 中强制进行更新,nModified
以使上述输出中的值959
不实际修改任何字段?
注意:我们使用 WiredTiger 作为后端,因此我们touch
无法使用 mongo 命令。
谢谢您的帮助!
mysql - 使用kafka连接配置debezium mysql连接器失败
我正在使用 kafka 2.0 和 kafka connect 在分布式模式下运行并尝试配置 debezium mysql 连接器但出现错误
这是电话:
卡夫卡连接日志:
INFO [Worker clientId=connect-1, groupId=connect-cluster] 成功加入第 7 代组(org.apache.kafka.clients.consum er.internals.AbstractCoordinator:409)[2018-09-12 07:38: 17,370] INFO 加入组并获得分配:Assignment{error =0, leader='connect-1-63af40e1-da48-4055-99f8-8799582ba5aa', leaderUrl=' http://l apach e.kafka.connect.runtime.distributed.DistributedHerder:852) Sep 12, 2018 7:38:17 AM org.glassfish.jersey.internal.Errors logErrors 警告:已检测到以下警告:警告:(子) org.apache.kafka.connect.runtime.rest.resources.Connect orsResource 中的资源方法 createConnector 包含空路径注解。警告:org.apache.kafka.connect.run time.rest.resources.ConnectorsResource 中的(子)资源方法 listConnectors 包含空路径注解。警告:org.apache.kafka.conne ct.runtime.rest.resources.ConnectorPluginsResource 中的(子)资源方法 listConnectorPlugins 包含空路径注释。警告:org.apache.kafka.connect 中的(子)资源方法 serverInfo。
[2018-09-12 07:38:18,213] 信息开始 oejsServletContextHandler@21d1b321{/, null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744) [2018-09-12 07:38:18,223 ] INFO Started http_8083@6ec65b5e{HTTP/1.1}{0.0.0.0:8083 } (org.eclipse.jetty.server.ServerConnector:266) [2018-09-12 07:38:18,223] INFO Started @15360ms (org. eclipse.jetty.server.Server:379)[2018-09-12 07:38:18,223]信息广告URI:http://localhost:8083/ (org.apach e.kafka.connect.runtime.rest.RestServer:248) [2018-09-12 07:38:18,223] INFO REST 服务器在http://127.0.0.1监听:8083/ , 广告网址http://localhost:8083/(org.apache.kafka.connect.runtime.rest.Re stServer:207) [2018-09-12 07:38:18,223] 信息 Kafka Connect 已启动 (org.apache.kafka.connect.r untime.Connect:55) [2018-09-12 07:43:16,612] 信息集群 ID:8oMHsNLASgubK4CBagB-dw (org.apache.ka fka.clients.Metadata:265) [2018-09-12 07:43:17,052] 信息集群 ID:8oMHsNLASgubK4CBagB -dw (org.apache.ka fka.clients.Metadata:265) [2018-09-12 07:43:17,289] 信息集群 ID:8oMHsNLASgubK4CBagB-dw (org.apache.ka fka.clients.Metadata:265)
postgresql - postgres-decoderbufs-master -- centos7
我正在尝试通过 postgres 9.6 的 debezium protobuf 进行 CDC,并且按照说明,我已经从 git 下载了源代码并尝试使其出现一些问题,并尝试在 satckoverflow 和其他站点上找到解决方案,然后对于 ubuntu os,许多解决方案是可用,但对于 centosh 7 未找到解决方案
当我触发 make 命令时出现以下错误
如果有任何帮助,它将对我的项目非常有用。
postgresql - 汇合控制中心不拦截流
我将 CCC 与 Kafka 流一起使用,该流由 Debezium 的 Postgres 连接器填充。
我正在使用以下内容docker-compose.yml
:
我已将 Postgres 连接器映射到 Kafka Connect(通过volumes
Compose),并且在创建新的源连接器时可以在 CCC 中看到它。
创建源连接器时,我可以看到指示此连接器的主题已创建的日志消息。我也在 CCC 的 Connect 区看到了这个话题。我还可以看到 Connect 能够通过此连接器对 Postgres 进行身份验证。
当我对我在连接器中指定的表进行更改时,我看到 Kafka(我有 3 个集群)正在确定谁将存储此消息。这意味着,Postgres tx 日志创建了一条相应主题的消息以响应我的更改,因此 DB、Connector 和 Kafka 工作正常。
但是,无论我做什么,我都无法让这个事件显示在Data Streams
or 中(编辑:这现在有效。数据流仍然没有)。System Health
(既不是> Topics
nor> Brokers
区域)
我不知道出了什么问题。我得到的唯一指示是最初的消息说
仔细检查是否已为任何从集群 controlcenter.cluster 生产或消费的客户端正确配置了监控拦截器
我的印象是,这基本上意味着我的控制中心容器配置了*_INTERCEPTOR_CLASSES
我在上面粘贴的 . 我点击了此消息中的链接,该链接将您带到他们的文档站点,该站点说要检查提供 kafka 数据的 Web 服务的响应。正如他们的文档所暗示的那样,我得到了 just 的响应{}
,表明 Kafka 说它没有数据。但它确实如此。
是不是想说我还需要以某种方式将这些拦截器配置到连接器中?我不知道为任何消费者/生产者监视拦截器意味着什么——我还没有任何原始的 Java 消费者/生产者……现在只有源连接器。
如果重要,我的连接器配置如下(通过 CCC UI 创建):
启动所有服务时,我在相应的日志中看到以下我怀疑可能感兴趣的内容(以下没有特定顺序):
任何帮助表示赞赏。谢谢!
jdbc - Kafka Connect 接收到不在公共模式中的 Redshift 表
我无法使 Kafka Connect 接收器为不在公共模式中的表工作。
我正在使用 Kafka Connect 通过使用 JdbcSinkConnector 的接收器操作将记录发送到 Redshift 数据库。
我已经在 Redshift 中创建了我的目标表,但它不在public
架构中。( my_schema.test_table
. 注意:auto.create
&auto.evolve
在连接器配置中是关闭的)
当我尝试在连接器配置中指定表的位置时,就像这样......
"table.name.format": "my_schema.test_table",
...接收器连接器的任务在尝试自行运行时遇到此错误:
“表 my_schema.test_table 丢失并且自动创建被禁用”
从
Caused by: org.apache.kafka.connect.errors.ConnectException: Table my_schema.test_table is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:86)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
...
我尝试了以下格式来提供表名:
my_schema.test_table
dev.my_schema.test_table
test_table
<-- 在这种情况下,我通过了阻止其他人的存在检查,但每次 Kafka Connect 尝试写入一行时都会遇到此错误:- “org.apache.kafka.connect.errors.RetriableException:java.sql.SQLException:java.sql.SQLException:亚马逊无效操作:关系“test_table”不存在;”
- 可能是因为
test_table
不在公共模式中。: (
看起来代码正在尝试正确解析这个表名,但不幸的是它没有记录它的结果。
这是我的连接字符串:"connection.url": "jdbc:redshift://xxx.xxx.xxx.xxx:5439/dev"
我一直在尝试currentSchema=my_schema
在连接字符串中指定......既适用于redshift
jdbc 驱动程序,也适用于postgresql
. 没运气。
我正在使用 Kafka Connect 1.1.0 版
红移 JDBC JAR:RedshiftJDBC42-1.2.16.1027.jar
我可以通过将表放入public
架构中并指定没有架构的表名来获得数据流动:"table.name.format": "test_table"
。不幸的是,这不是我们需要数据的地方。
非常感谢任何帮助。
debezium - 如何在 debezium 中将 offset.commit.policy 设置为 AlwaysCommitOffsetPolicy?
我创建了一个 Debezium Embedded 引擎来捕获 MySQL 更改数据。我想尽快提交补偿。在代码中,创建的配置包括以下内容。
.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())
运行此返回,java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)
但是,当我使用 启动嵌入式引擎时,
.with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
嵌入式引擎工作正常。
请注意,类OffsetCommitPolicy.PeriodicCommitOffsetPolicy
构造函数包含 config 参数,而OffsetCommitPolicy.AlwaysCommitOffsetPolicy
没有。
public PeriodicCommitOffsetPolicy(Configuration config) {
...
}
如何让 debezium 嵌入式引擎使用它AlwaysCommitOffsetPolicy
?
avro - 无法从 debezium-postgres 的 kafka-stream 读取 kafka-stream 数据
我使用以下命令启动了 kafka 连接器:
connect-avro-standalone.properties 中的序列化道具是:
我创建了一个 java 后端,它监听这个 kafka 流主题,并且能够通过每次添加/更新/删除从 postgres 获取数据。
但是数据以某种未知的编码格式出现,这就是我无法正确读取数据的原因。
这是相关的代码片段:
我对需要更改的地方和内容感到困惑;在 avro 连接器道具或 java 端代码中
database - 使用 binlog_format="row" 在只读副本上启用 BinLogs
我最近通过启用自动备份在我的只读副本上启用了二进制日志(如此处所述)。但是,默认 binlog_format 设置为 MIXED。
由于这种不一致,Debezium 连接器失败,因为它找到了 MIXED 格式的初始二进制日志。有没有办法从一开始就启用 ROW 格式的二进制日志?
添加错误日志:
oracle - 如何在 Java 应用程序中嵌入 debezium 以捕获 Oracle 更改数据?
我在我的 java 应用程序中使用 debezium 来捕获来自 Oracle 12c 的更改。可以在 localhost:1521 上访问 oracle 数据库。下面是各自的java代码。
请注意,Oracle 数据库的配置如下所示。此外,从Oracle Instant Client获得的 ojdbc8.jar 和 xstreams.jar已导入到 java 项目中。
执行上述代码时,即使 Oracle 数据库关闭,它也会产生以下输出。也不会捕获更改。
我在这里做错了什么?
mysql - Kafka乱码——中文单词在debezium中变成乱码
我的 debezium 代码很乱:
mysql数据库中有中文的话,我用debezium把数据发给kafka。消费消息时发现中文单词乱码,请问如何解决?有什么我可以使用的配置吗?
当我使用flume和kafka producer生成中文单词时,效果很好
部分配置:
mysql 字符集:utf8 mysql 配置图片
版本:debezium v0.7.5、kafka v1.1.1
添加:
当我用控制台测试它时,./kafka-console-consumer.sh --zookeeper 192.168.0.100:2181 --topic mysqlfullfillment.test.doulist
我得到了凌乱的代码
在我的 spark 代码中,我得到了相同的混乱代码: