1

我有一些 Spark 经验,但刚开始使用 Cassandra。我正在尝试进行非常简单的读取并获得非常糟糕的性能-无法说出原因。这是我正在使用的代码:

sc.cassandraTable("nt_live_october","nt")
  .where("group_id='254358'")
  .where("epoch >=1443916800 and epoch<=1444348800")
  .first

在此处输入图像描述

所有 3 个参数都是表中键的一部分:

PRIMARY KEY ( group_id, epoch, group_name, auto_generated_uuid_field) ) WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)

我从驱动程序看到的输出是这样的:

15/10/07 15:05:02 信息 CassandraConnector:连接到 Cassandra 集群:shakassandra 15/10/07 15:07:02错误会话:在 attila./198.xxx:9042 com.datastax.driver 创建池时出错。 core.ConnectionException: [attila./198.xxx:9042] 传输初始化期间出现意外错误(com.datastax.driver.core.OperationTimedOutException: [attila /198.xxx:9042] 操作超时)

15/10/07 15:07:02 信息 SparkContext:开始工作:参加 CassandraRDD.scala:121

2007 年 15 月 10 日 15:07:03 信息 BlockManagerInfo:在 osd09的内存中添加了 broadcast_5_piece0 :39903(大小:4.8 KB,免费:265.4 MB)

15/10/07 15:08:23 INFO TaskSetManager:在osd09 (1/1)上的 80153 毫秒内完成阶段 6.0 (TID 8) 中的任务 0.0

15/10/07 15:08:23 INFO TaskSetManager:在 osd09 (1/1) 上的 80153 毫秒内完成阶段 6.0 (TID 8) 中的任务 0.0

15/10/07 15:08:23 INFO DAGScheduler:ResultStage 6(取于 CassandraRDD.scala:121)在 80.958 秒内完成 15/10/07 15:08:23 INFO TaskSchedulerImpl:删除 TaskSet 6.0,其任务已全部完成, 从池中

2007 年 15 月 10 日 15:08:23 信息 DAGScheduler:作业 5 完成:在 CassandraRDD.scala:121 上进行,耗时81.043413

我希望这个查询非常快,但它需要一分钟多的时间。有几件事突然出现在我身上

  1. 获取会话错误几乎需要两分钟——我将 3 个节点的 IP 传递给 Spark Cassandra 连接器——有没有办法告诉它更快地跳过失败的连接?
  2. 任务被发送到不是 Cassandra 节点的 Spark 工作人员——这对我来说似乎很奇怪——有没有办法获取有关调度程序为什么选择将任务发送到远程节点的信息?
  3. 即使任务被发送到远程节点,该工作人员的输入大小(最大值)显示为 334.0 B / 1,但执行程序时间为 1.3 分钟(见图)。这似乎真的很慢——我希望时间花在反序列化上,而不是计算上...... 在此处输入图像描述

任何关于如何调试这个,在哪里寻找潜在问题的提示都非常感谢。使用带有连接器 1.4.0-M3 的 Spark 1.4.1,cassandra ReleaseVersion:2.1.9,可调整连接器参数的所有默认值

4

1 回答 1

1

我认为问题在于分区之间的数据分布。您的表有一个集群(分区)键 - groupId,epoch 只是一个集群列。数据仅按 groupId 分布在集群节点上,因此集群上的一个节点上有一个 groupId='254358' 的巨大分区。当您运行查询时,Cassandra 使用 groupId='254358' 达到非常快的分区,然后过滤所有行以查找纪元在 1443916800 和 1444348800 之间的记录。如果有很多行,查询将非常慢。实际上这个查询不是分布式的,它总是在一个节点上运行。

更好的做法是提取日期甚至小时并将其添加为分区键,在您的情况下类似于

PRIMARY KEY ((group_id, date), epoch, group_name, auto_generated_uuid_field) 
WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)

要验证我的假设,您可以在 cqlsh 中运行当前查询并打开跟踪阅读此处的操作方法。所以这个问题与 Spark 无关。

关于错误和获取它的时间,一切都很好,因为您在超时发生后收到错误。

我还记得 spark-cassandra-connector 的建议,将 Spark 从节点连接到 Cassandra 节点,以通过分区键来分配查询。

于 2015-10-21T16:58:13.927 回答