我创建了 Kafka Standalone.properties 文件来建立连接。该文件位于 home/kafka/config/connect-standalone.properties 中,如下所示:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/kafka-connect-cassandra-sink-1.4.0/kafka-connect-cassandra-sink-1.4.0.jar
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
#listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
在第二步,我添加了 kafka-connect-cassandra-sink-1.4.0 文件。该文件位于路径 home/kafka-connect-cassandra-sink-1.4.0 ,文件如下:
name=users-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=10
loadBalancing.localDc=datacenter1
contactPoints=localhost
port=9042
username=...
password=...
topics=demo
topic.demo.demo.users.mapping=lastname=value.lastname, firstname=value.firstname, email=value.email
在我的 PC 上,Zookeeper 已经在运行,我还使用以下命令启动了 Kafka bin/kafka-server-start.sh config/connect-standalone.properties
现在为了将 kafka 连接到接收器连接器,我使用了以下命令sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/cassandra-sink-standalone.properties &> standalone-mode.log &
我的standalone-mode.log 文件是空的(我假设这意味着没有错误,因为在之前的尝试中,我已经解决了这个日志文件中的错误)。
现在为了通过 cassandra 中的 kafka sink 连接器上传文本文件,我使用了以下命令 cat Desktop/users.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; sleep 10;
终端上再次没有错误。但现在的问题是,当我编写查询以查看 cassandra 数据库中的数据时,表为空:
user.txt 文件如下:
Pruitt:{"lastname":"Pruitt", "firstname":"Allie", "email":"allie@example.com"}
Krause:{"lastname":"Krause", "firstname":"Duncan", "email":"duncan@example.com"}
Chase:{"lastname":"Chase", "firstname":"Juana", "email":"juana@example.com"}
Estrada:{"lastname":"Estrada", "firstname":"Edward", "email":"edward@example.com"}
Singleton:{"lastname":"Singleton", "firstname":"Marie", "email":"Marie@example.com"}
Poole:{"lastname":"Poole", "firstname":"Olivia", "email":"olivia@example.com"}
Marks:{"lastname":"Marks", "firstname":"Timothy", "email":"timothy@example.com"}
Suarez:{"lastname":"Suarez", "firstname":"Claud", "email":"claud@example.com"}
Sloan:{"lastname":"Sloan", "firstname":"Eloy", "email":"eloy@example.com"}
Rodriguez:{"lastname":"Rodriguez", "firstname":"Gale", "email":"gale@example.com"}
Bautista:{"lastname":"Bautista", "firstname":"Constance", "email":"Constance@example.com"}
Mcintyre:{"lastname":"Mcintyre", "firstname":"Donte", "email":"donte@example.com"}
Lang:{"lastname":"Lang", "firstname":"Willa", "email":"willa@example.com"}
Richmond:{"lastname":"Richmond", "firstname":"Dionne", "email":"dionne@example.com"}