1

我需要为 Kafka Consumer 的客户端身份验证提供证书,但是,它总是失败并出现以下异常(Failed to load SSL keystore):

    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /etc/logstash/truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2020-10-13T10:20:40,578][ERROR][logstash.inputs.kafka    ][module-arcsight][47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>org.apache.kafka.common.KafkaException: 

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts of type JKS}

[2020-10-13T10:20:40,596][ERROR][logstash.javapipeline    ][module-arcsight][47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:module-arcsight
  Plugin: <LogStash::Inputs::Kafka ssl_keystore_location=>"/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts", topics=>["th-cef"], ssl_truststore_location=>"/etc/logstash/truststore.jks", ssl_truststore_password=><password>, ssl_truststore_type=>"JKS", type=>"_doc", bootstrap_servers=>"server:9093", codec=><LogStash::Codecs::CEF id=>"cef_51af8920-59eb-4309-9e7f-4ebb1f774df6", enable_metric=>true, vendor=>"Elasticsearch", product=>"Logstash", version=>"1.0", signature=>"Logstash", name=>"Logstash", severity=>"6", reverse_mapping=>false>, ssl_keystore_password=><password>, security_protocol=>"SSL", id=>"47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2", ssl_keystore_type=>"JKS", enable_metric=>true, auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"default", client_id=>"logstash", connections_max_idle_ms=>540000, consumer_threads=>1, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, group_id=>"logstash", heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, request_timeout_ms=>40000, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", decorate_events=>false>
  Error: Failed to construct kafka consumer
  Exception: Java::OrgApacheKafkaCommon::KafkaException
  Stack: org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:666)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:646)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:253)
org.jruby.RubyClass.newInstance(org/jruby/RubyClass.java:939)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.create_consumer(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:346)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.RUBY$method$create_consumer$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:243)
org.jruby.RubyEnumerable$22.call(org/jruby/RubyEnumerable.java:902)
org.jruby.RubyEnumerator$2.call(org/jruby/RubyEnumerator.java:404)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:291)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:393)
org.jruby.RubyEnumerator.__each__(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:497)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:487)
org.jruby.RubyEnumerable.callEach19(org/jruby/RubyEnumerable.java:119)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:894)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:886)
org.jruby.RubyEnumerable$INVOKER$s$0$0$map.call(org/jruby/RubyEnumerable$INVOKER$s$0$0$map.gen)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:243)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.RUBY$method$run$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:378)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash//usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:369)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)
java.lang.Thread.run(java/lang/Thread.java:748)

我已经尝试了所有可能的选项:

  • JKS 和 PKCS12 密钥库
  • Java 默认密钥库(/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts)和我自己的密钥库(例如,在 /etc/logstash/keystore.jks 中)
  • 以 logstash 用户等身份检查所有权限。

在 Logstash ArcSight 模块中,密钥库设置指定如下(在 logstash.yml 中):

modules:
  - name: arcsight
    var.input.eventbroker.bootstrap_servers: "server:9093"
    var.input.eventbroker.security_protocol: "SSL"
    var.input.eventbroker.topics: "th-cef"
    var.input.eventbroker.ssl_keystore_location: "/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts"
    var.input.eventbroker.ssl_keystore_password: "changeit"
    var.input.eventbroker.ssl_keystore_type: "JKS"
    var.input.eventbroker.ssl_truststore_location: "/etc/logstash/truststore.jks"
    var.input.eventbroker.ssl_truststore_password: "changeit"
    var.input.eventbroker.ssl_truststore_type: "JKS"
    var.elasticsearch.hosts: "https://host1:9200, https://host2:9200"
    var.kibana.host: "kibana:5601"
    var.elasticsearch.username: "user"
    var.elasticsearch.password: "pw"
    var.kibana.username: "k"
    var.kibana.password: "p"
    var.elasticsearch.ssl.enabled: "true"
    var.cacert: "/etc/logstash/CA.crt"
    var.kibana.ssl.enabled: "true"
    var.manage_template: "false"

您知道为什么 KafkaConsumer 拒绝加载任何密钥库吗?

4

0 回答 0