我将数据存储到一个 kafka 主题中,我想使用CassandraSinkConnector
from Landoop 移动到 cassandra。
我在尝试启动连接器时遇到此错误:
Caused by: java.lang.IllegalArgumentException: A KCQL error occurred.FIELD_ID is not a valid field name
at com.datamountaineer.streamreactor.connect.converters.Transform$.raiseException$1(Transform.scala:40)
at com.datamountaineer.streamreactor.connect.converters.Transform$.apply(Transform.scala:83)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$insert$1.apply(CassandraJsonWriter.scala:182)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$insert$1.apply(CassandraJsonWriter.scala:181)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
似乎架构有架构注册表验证不喜欢的东西,但我不明白问题出在哪里。这是我的架构(这是从 Attunity Replicate 自动生成的):
{
"type": "record",
"name": "DataRecord",
"namespace": "com.attunity.queue.msg.test 1 dlx express.DCSDBA.PURGE_SETUP",
"fields": [
{
"name": "data",
"type": {
"type": "record",
"name": "Data",
"fields": [
{
"name": "FIELD_ID",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SERVER_INSTANCE",
"type": [
"null",
"int"
],
"default": null
}
]
}
},
{
"name": "beforeData",
"type": [
"null",
"Data"
],
"default": null
},
{
"name": "headers",
"type": {
"type": "record",
"name": "Headers",
"namespace": "com.attunity.queue.msg",
"fields": [
{
"name": "operation",
"type": {
"type": "enum",
"name": "operation",
"symbols": [
"INSERT",
"UPDATE",
"DELETE",
"REFRESH"
]
}
},
{
"name": "changeSequence",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "streamPosition",
"type": "string"
},
{
"name": "transactionId",
"type": "string"
},
{
"name": "changeMask",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "columnMask",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "transactionEventCounter",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "transactionLastEvent",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
]
}
这是我的接收器配置:
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=eucsarch
topics=DCSDBA.PURGE_SETUP1
tasks.max=1
connect.cassandra.mapping.collection.to.json=false
connect.cassandra.kcql=INSERT INTO purge_setup SELECT data.* FROM DCSDBA.PURGE_SETUP1
connect.cassandra.password=pass
connect.cassandra.username=user
value.converter.schema.registry.url=http://schema-registry:8081
connect.cassandra.contact.points=cassandra.local
connect.cassandra.port=9042
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081