0

我正在尝试通过最小化它执行的步骤数来优化Zipkin Dependencies Spark 作业以在更少的阶段运行。reduceByKey数据从下表中读取:

CREATE TABLE IF NOT EXISTS zipkin.traces (
    trace_id  bigint,
    ts        timestamp,
    span_name text,
    span      blob,
    PRIMARY KEY (trace_id, ts, span_name)
)

在那里,单个分区trace_id包含完整的跟踪,并且包含从几行到几百行不等的任何地方。但是,整个分区由 Spark 作业转换为非常简单RDD[((String, String), Long)]的,将条目数量从数十亿减少到数百。

不幸的是,当前代码是通过独立读取所有行来完成的

sc.cassandraTable(keyspace, "traces")

并使用两个reduceByKey步骤来提出RDD[((String, String), Long)]. 如果有一种方法可以一次性读取整个分区,在一个 Spark 工作进程中,并在内存中进行处理,这将是一个巨大的速度提升,无需存储/流式传输来自当前的大量数据集第一阶段。

- 编辑 -

澄清一下,该作业必须从表中读取所有数据,数十亿个分区。

4

1 回答 1

1

将所有分区数据保留在同一个 spark worker 上而不进行 shuffle 的关键是使用spanByKey

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));

sc.cassandraTable("test", "events")
  .spanBy(row => (row.getInt("year"), row.getInt("month")))

sc.cassandraTable("test", "events")
  .keyBy(row => (row.getInt("year"), row.getInt("month")))
  .spanByKey

如果没有洗牌,那么所有修改都将在原地完成并作为迭代器一起流水线化。

请务必注意以下警告:

注意:这仅适用于顺序排序的数据。因为数据在 Cassandra 中是按集群键排序的,所以所有可行的 span 都必须遵循自然的集群键顺序。

于 2016-03-31T00:38:54.440 回答