1

在尝试将 Zeek 日志推送到 Kafka (3.1.0) 主题时,它可以正常工作并按预期工作。然后我尝试通过 DataStax Apache Kafka® 连接器 (kafka-connect-cassandra-sink-1.4.0) 将它们从 Kafka 写入 Cassandra (4.0.1),但我遇到了一个奇怪的映射错误(见下文)。

我的 connect-standalone.properties:

bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/kafka/plugins/kafka-connect-cassandra-sink-1.4.0.jar

我的 Cassandra 连接器属性 cassandra-sink.properties:

name=cassandra-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=1
topics=dns, http
#transforming the zeek-field-names so no error occours
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceFiel>
"transforms.RenameField.renames": "id.orig_h:id_orig_h,id.orig_p:id_orig_p,id.resp_h:id_resp_h,id.resp_p:id_resp_p,AA:aa,Z:z,RA:ra,TC:tc,TS:ts,RD:rd"
#Mapping
topic.dns.zeek.dns.mapping=  ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,proto=value,trans_id=value,rtt=value,query=value,qclass=value,qclass_name=value,qtype=value,qtype_name=value,rcode=value,rcode_name=value,aa=value,tc=value,rd=value,ra=value,z=value,answers=value,rejected=value
topic.http.zeek.http.mapping= ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,trans_depth=value,method=value,host=value,uri=value,version=value,user_agent=value,request_body_len=value,response_body_len=value,status_code=value,status_msg=value,tags=value,resp_fuids=value
topic.dns.zeek.dns.ttlTimeUnit=SECONDS
topic.http.zeek.http.ttlTimeUnit=SECONDS
topic.dns.zeek.dns.timestampTimeUnit=MICROSECONDS
topic.http.zeek.http.timestampTimeUnit=MICROSECONDS

卡桑德拉数据库:

cqlsh> DESCRIBE KEYSPACE zeek

CREATE KEYSPACE zeek WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE zeek.dns (
    uid text PRIMARY KEY,
    aa text,
    answers text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    proto text,
    qclass int,
    qclass_name text,
    qtype int,
    qtype_name text,
    query text,
    ra text,
    rcode double,
    rcode_name text,
    rd text,
    rejected text,
    rtt int,
    tc text,
    trans_id double,
    ts text,
    ttls text,
    z double
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

CREATE TABLE zeek.http (
    uid text PRIMARY KEY,
    host text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    method text,
    orig_fuids text,
    request_body_len int,
    resp_fuids text,
    resp_mime_types text,
    response_body_len int,
    status_code int,
    status_msg text,
    tags text,
    trans_depth int,
    ts text,
    uri text,
    user_agent text,
    version text
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

我尝试写入 Cassandra 的 Zeek-log 示例如下所示:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --from-beginning --property print.key=true --max-messages 1 \
> --topic dns
null    {"dns": {"ts":1644391084.805351,"uid":"CUXnim2Q50AeUZoZXc","id.orig_h":"192.168.2.35","id.orig_p":55882,"id.resp_h":"192.168.2.1","id.resp_p":53,"proto":"udp","trans_id":8173,"rtt":0.01738905906677246,"query":"36.247.213.34.in-addr.arpa","qclass":1,"qclass_name":"C_INTERNET","qtype":12,"qtype_name":"PTR","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["ec2-34-213-247-36.us-west-2.compute.amazonaws.com"],"TTLs":[300.0],"rejected":false}}
Processed a total of 1 messages

随着 Zeek、Zookeeper、Kafka 和 Cassandra 的开始,我开始使用 Kafka-Connect

bin/connect-standalone.sh config/connect-standalone.properties config/cassandra-sink.properties

这会不间断地运行,但会引发无数这样的警告:

[2022-02-09 13:43:50,615] WARN [cassandra-sink|task-0] Error decoding/mapping Kafka record SinkRecord{kafkaOffset=301, timestampType=CreateTime} ConnectRecord{topic='dns', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={dns={AA=false, qclass_name=C_INTERNET, id.orig_p=22793, qtype_name=AAAA, qtype=28, rejected=false, id.resp_p=53, query=connectivity-check.ubuntu.com.sphairon.box, trans_id=17766, rcode=3, rcode_name=NXDOMAIN, TC=false, RA=false, uid=Ckb97wAbf0MDgGVW7, RD=true, proto=udp, id.orig_h=192.168.2.35, Z=0, qclass=1, ts=1.644393059147673E9, id.resp_h=192.168.2.1}}, valueSchema=null, timestamp=1644393059667, headers=ConnectHeaders(headers=)}: Primary key column uid cannot be mapped to null. Check that your mapping setting matches your dataset contents. (com.datastax.oss.kafka.sink.CassandraSinkTask:305)

我不知道为什么这么说Primary key column uid cannot be mapped to null。我尝试将映射更改为,ts=dns.value, uid=dns.value etc.但没有帮助。

4

0 回答 0