我按照http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/的教程进行操作,并且能够将数据从 avro 控制台插入到 cassandra。现在我正在尝试扩展它以使用水槽,并且我在我的机器中设置了水槽,它将选择日志文件并将其推送到 kafka,尝试将我的数据插入到 cassandra 数据库。在文本文件中,我将数据
{“id”:1,“创建”:“2016-05-06 13:53:00”,“产品”:“OP-DAX-P-20150201-95.7”,“价格”:94.2}
{“id”:2,“created”:“2016-05-06 13:54:00”,“product”:“OP-DAX-C-20150201-100”,“price”:99.5}
{“id”:3,“创建”:“2016-05-06 13:55:00”,“产品”:“FU-DATAMOUNTAINEER-20150201-100”,“价格”:10000}
{“id”:4,“创建”:“2016-05-06 13:56:00”,“产品”:“FU-KOSPI-C-20150201-100”,“价格”:150}
Flume 正在挑选这些数据并将其推送到 kafka。
在 cassandra sink 中,我面临一个错误,
run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent .ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 由:org。 apache.kafka.common.errors.SerializationException:为 id -1 反序列化 Avro 消息时出错 原因:org.apache.kafka.common.errors.SerializationException:未知的魔法字节![2016-09-28 15:47:00,951] 错误任务正在被杀死,并且在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.WorkerTask:143)[2016-09-28 15:47:00,951 ] 信息 停止 Cassandra 接收器。(com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016-09-28 15:47:00,952] 信息关闭 Cassandra 驱动程序会话和集群。(com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)
我正在使用的架构
./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
水槽的配置:Flume-kafka.conf.properties
agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576
agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = orders-topic
agent.sinks.kafkaSink.brokerList = localhost:9092
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.batchSize = 20
谁能帮助我,如何解决这个错误?