2

我正在尝试从 Apache Spark 2.2.1 Structured Streaming 连接到 IBM Message Hub。

连接代码非常基本:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

import spark.implicits._

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("security.protocol", "SASL_SSL").
                option("sasl.mechanism", "PLAIN").
                option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"*****\";").
                option("ssl.protocol", "TLSv1.2").
                option("ssl.enabled.protocols", "TLSv1.2").
                option("ssl.endpoint.identification.algorithm", "HTTPS").
                option("auto.offset.reset","earliest").
                option("group.id", System.currentTimeMillis).
                load()

val query = df.writeStream.format("console").start()

我正在启动火花外壳:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

但是,我收到一个断开连接的错误:

scala> 18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
<<repeats forever>>

我增加了调试输出,sc.setLogLevel("DEBUG")我得到:

<<log output ommitted for brevity>>
18/01/09 08:38:28 DEBUG SessionState: SessionState user: null
18/01/09 08:38:28 DEBUG SessionState: HDFS root scratch dir: /tmp/hive with schema null, permission: rwx-wx-wx
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9_resources
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9/_tmp_space.db
18/01/09 08:38:28 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/home/snowch/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/09 08:38:28 DEBUG SessionState: Session is using authorization class class org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider
18/01/09 08:38:28 DEBUG StateStoreCoordinatorRef: Retrieved existing StateStoreCoordinator endpoint
18/01/09 08:38:28 DEBUG StreamExecution: Starting Trigger Calculation
18/01/09 08:38:28 INFO StreamExecution: Starting new streaming query.
18/01/09 08:38:28 DEBUG UserGroupInformation: PrivilegedAction as:snowch (auth:SIMPLE) from:org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
18/01/09 08:38:28 DEBUG KafkaSource$$anon$1: Unable to find batch /tmp/temporary-fb3e6e1a-fbbe-4098-991c-0b29f63ecade/sources/0/0
18/01/09 08:38:28 DEBUG AbstractCoordinator: Sending coordinator request for group spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0 to broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 (id: -5 rack: null)
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -5 at kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -3 at kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -5
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -3
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -5
18/01/09 08:38:28 DEBUG Selector: Connection with kafka05-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.153 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:174)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:263)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:261)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:230)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:171)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:132)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:129)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
    at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:163)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:520)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:518)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:518)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:492)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/01/09 08:38:28 DEBUG NetworkClient: Node -5 disconnected.
18/01/09 08:38:28 WARN NetworkClient: Bootstrap broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:38:28 DEBUG ConsumerNetworkClient: Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4eacac4e, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0}), createdTimeMs=1515487108479, sendTimeMs=1515487108598) with correlation id 0 due to node -5 being disconnected
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -3
18/01/09 08:38:28 DEBUG Selector: Connection with kafka01-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.149 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    <<repeated>>

我见过以下类似的问题:

我知道某些输出只是“噪音”,但是,我的 spark 流应用程序似乎没有接收任何数据。我已与控制台客户端连接,并且能够查看数据。


更新 1 - 我尝试配置 JaaS,但仍然遇到相同的错误。问题可能是 JaaS 代码需要在每个工作节点上运行,但没有在它们上运行。

sc.setLogLevel("DEBUG")

def jaasClientConfig(username: String, password: String): Unit = {

    import javax.security.auth.login.AppConfigurationEntry
    import javax.security.auth.login.Configuration
    import javax.security.auth.login.LoginException

    import scala.collection.JavaConversions._

    System.setProperty("java.security.auth.login.config", "")

    Configuration.setConfiguration(new Configuration() {
        def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
            val idMap = Map(
                "serviceName" -> "kafka",
                "username" -> username,
                "password" -> password
                )
            val ace = new AppConfigurationEntry(
               "org.apache.kafka.common.security.plain.PlainLoginModule",
               AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
               idMap
            )
            return Array(ace)
        }
    })
}

def startSparkStreaming(): Unit = {

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

    import spark.implicits._

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("security.protocol", "SASL_SSL").
                    option("sasl.mechanism", "PLAIN").
                    option("ssl.protocol", "TLSv1.2").
                    option("ssl.enabled.protocols", "TLSv1.2").
                    option("ssl.endpoint.identification.algorithm", "HTTPS").
                    option("auto.offset.reset","earliest").
                    option("group.id", System.currentTimeMillis).
                    load()

    val query = df.writeStream.format("console").start()
}

jaasClientConfig("****","****")

startSparkStreaming()

更新 2

我也尝试过使用 jaas.conf:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.conf=jaas.conf" --files "jaas.conf"

和 ...

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="*****"
    password="*****";
    }

还是一样的问题...

4

3 回答 3

2

首先,我需要运行 spark-shell --conf,将执行程序和驱动程序指向我的 jaas.conf:

   ./bin/spark-shell --master local[1] \
        --jars external/kafka-0-10-sql/target/spark-sql-kafka-0-10_2.11-2.2.2-SNAPSHOT.jar,external/kafka-0-10-assembly/target/spark-streaming-kafka-0-10-assembly_2.11-2.2.2-SNAPSHOT.jar \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --num-executors 1  --executor-cores 1 

接下来我必须添加一些 kafka 选项:

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                option("kafka.sasl.mechanism", "PLAIN").
                option("kafka.ssl.protocol", "TLSv1.2").
                option("kafka.ssl.enabled.protocols", "TLSv1.2").
                load()

请注意,kafka 选项需要以 为前缀kafka.,例如:

  • security.protocol=>kafka.security.protocol

这些更改为我解决了连接问题。

于 2018-01-18T12:51:29.680 回答
1

这似乎是身份验证失败。在当前 MH 版本的 Kafka 中,服务器只是在身份验证失败时关闭连接

于 2018-01-09T09:32:23.937 回答
1

Kafka 客户端从 0.10.2 版开始使用“sasl.jaas.config”设置

Spark 2.2.1 使用的 Kafka 客户端是 0.10.0,因此身份验证失败。

您可以使用java.security.auth.login.config系统属性来指定一个 jaas 文件

或者,您可以使用这样的代码段以编程方式为客户端设置凭据

public static void jaasClientConfig(final String username, final String password) throws Exception {
    System.setProperty("java.security.auth.login.config", "");

    Configuration.setConfiguration(new Configuration() {
        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
            HashMap<String, String> idMap = new HashMap<>();
            idMap.put("serviceName", "kafka"); // Seems to be optional
            idMap.put("username", username);
            idMap.put("password", password);

            AppConfigurationEntry ace = new AppConfigurationEntry("org.apache.kafka.common.security.plain.PlainLoginModule",
                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, idMap);
            AppConfigurationEntry[] entry = { ace };
            return entry;
        }
    });
}
于 2018-01-09T10:26:16.777 回答