4

我是火花和卡桑德拉的新手。我正在尝试使用 spark-cassandra 连接器插入 cassandra 表,如下所示:

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
  val conf = new SparkConf(true).setMaster("local")
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx")
  val sc = new SparkContext(conf)
}
object TestRepo {
  def insertList(list: List[TestEntity]) = {
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
  }
}
object TestApp extends App {
  val start = System.currentTimeMillis()
  TestRepo.insertList(Utility.generateRandomData())
  val end = System.currentTimeMillis()
  val timeDiff = end-start
  println("Difference (in millis)= "+timeDiff)
}

当我使用上述方法(包含 100 个实体的列表)插入时,它需要300-1100 milliseconds. 我尝试使用幻像库插入相同的数据。它只占用不到20-40 milliseconds.

谁能告诉我为什么火花连接器要花这么多时间来插入?我在我的代码中做错了什么还是不建议使用spark-cassandra 连接器进行插入操作?

4

2 回答 2

5

看起来您正在计时中包含并行化操作。此外,由于您的 spark worker 运行在与 Cassandra 不同的机器上,因此 saveToCassandra 操作将是网络写入。

尝试配置您的系统以在 Cassandra 节点上运行 spark 工作程序。然后在单独的步骤中创建一个 RDD,并在其上调用 count() 之类的操作以将数据加载到内存中。此外,您可能希望对 RDD 进行持久化()或缓存()以确保它保留在内存中以进行测试。

然后只为那个缓存的 RDD 的 saveToCassandra 计时。

您可能还想查看 Cassandra 连接器提供的 repartitionByCassandraReplica 方法。这将根据写入需要去哪个 Cassandra 节点对 RDD 中的数据进行分区。通过这种方式,您可以利用数据局部性并经常避免通过网络进行写入和洗牌。

于 2015-08-11T13:15:16.703 回答
2

您的“基准”存在一些严重问题:

  1. 您的数据集是如此之小,以至于您主要只测量作业设置时间。在单个节点上保存 100 个实体应该是毫秒级,而不是秒级。还保存 100 个实体使 JVM 没有机会将您运行的代码编译为优化的机器代码。
  2. 您在测量中包含了火花上下文初始化。JVM 会延迟加载类,所以在开始测量之后才真正调用 spark 初始化的代码。这是一个非常昂贵的元素,通常每个整个 spark 应用程序只执行一次,甚至每个作业都不执行。
  3. 每次启动时您只执行一次测量。这意味着您甚至错误地测量了 spark ctx 设置和作业设置时间,因为 JVM 必须第一次加载所有类,而 Hotspot 可能没有机会启动。

总而言之,您很可能主要测量类加载时间,这取决于加载的类的大小和数量。Spark 是一个相当大的加载过程,几百毫秒一点也不奇怪。

要正确测量插入性能:

  • 使用更大的数据集
  • 从测量中排除一次性设置
  • 多次运行共享相同的火花上下文并丢弃一些初始的,直到达到稳定状态的性能。

顺便说一句,如果您启用调试日志记录级别,则连接器会在执行程序日志中记录每个分区的插入时间。

于 2015-08-12T18:24:53.277 回答