3

我正在尝试从 Kafka 读取数据并通过 Spark RDD 存储到 Cassandra 表中。

编译代码时出错:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

下面的代码:当我通过交互式手动运行代码时,spark-shell它工作正常,但是在编译代码时spark-submit出现错误。

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 
4

2 回答 2

3

kafka 中的所有消息都是键控的。在这种情况下messages,原始的 Kafka 流是元组流(key,value)

正如编译错误指出的那样,元组没有split方法。

我们在这里要做的是:

messages.map{ case (key, value)  => value.split(','))} ...
于 2017-06-13T10:03:17.103 回答
2

KafkaUtils.createDirectStream返回一个键和值的元组(因为 Kafka 中的消息是可选的键控)。在您的情况下,它是 type (String, String)。如果要拆分value,则必须先将其取出:

val lines = 
  messages
   .map(line => line._2.split(','))
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

或使用部分函数语法:

val lines = 
  messages
   .map { case (_, value) => value.split(',') }
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))  
于 2017-06-13T10:02:49.700 回答