1

我有一个DStream[String, Int带有成对字数的 ],例如("hello" -> 10). 我想用步长索引将这些计数写入 cassandra。该索引被初始化为var step = 1并随着每个微批处理的处理而递增。

cassandra 表创建为:

CREATE TABLE wordcounts (
    step int,
    word text,
    count int,
primary key (step, word)
);

尝试将流写入表时...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))

......我明白了java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step

如何将step索引添加到流中以便将三列一起写入?

我正在使用 spark 2.0.0、scala 2.11.8、cassandra 3.4.0 和 spark-cassandra-connector 2.0.0-M3。

4

3 回答 3

1

如前所述,虽然 Cassandra 表需要某种形式的东西(Int, String, Int),但 wordCount DStream 是 type DStream[(String, Int)],所以要调用saveToCassandra(...)to 工作,我们需要一个DStreamtype DStream[(Int, String, Int)]

这个问题的棘手部分是如何将本地计数器(根据定义仅在驱动程序中已知)提升到 DStream 的级别。

为此,我们需要做两件事:将计数器“提升”到分布式级别(在 Spark 中,我们的意思是“RDD”或“DataFrame”)并将该值与现有DStream数据连接起来。

与经典的流式字数统计示例不同:

// Split each line into words
val words = lines.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

我们添加一个本地变量来保存微批次的计数:

@transient var batchCount = 0

它被声明为瞬态的,因此当我们声明使用它的转换时,Spark 不会尝试关闭它的值。

现在棘手的一点是:在 DStream 的上下文中transform,我们从那个单一的variable 中创建一个 RDD,并使用笛卡尔积将它与 DStream 的底层 RDD 连接起来:

val batchWordCounts = wordCounts.transform{ rdd => 
  batchCount = batchCount + 1

  val localCount = sparkContext.parallelize(Seq(batchCount))
  rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}

(请注意,一个简单的map函数是行不通的,因为只有variable 的初始值会被捕获和序列化。因此,在查看 DStream 数据时,看起来计数器从未增加过。

最后,现在数据的形状正确,将其保存到 Cassandra:

batchWordCounts.saveToCassandra("keyspace", "wordcounts")
于 2016-11-03T09:30:27.863 回答
0

updateStateByKey函数由 spark 提供用于全局状态处理。对于这种情况,它可能类似于以下内容

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount: Int = runningCount.getOrElse(0) + 1
    Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)

stream.join(step).map{case (key,(count, step)) => (step,key,count)})
   .saveToCassandra("keyspace", "wordcounts")
于 2016-11-16T15:52:42.793 回答
-1

由于您尝试将 RDD 保存到现有的 Cassandra 表中,因此您需要在 RDD 中包含所有主键列值。

您可以做的是,您可以使用以下方法将 RDD 保存到新表中。

saveAsCassandraTable or saveAsCassandraTableEx

有关更多信息,请查看

于 2016-11-03T04:17:10.537 回答