我有一项艰巨的任务是从 Cassandra 表中读取数百万行。实际上这个表包含大约 40~50 百万行。数据实际上是我们系统的内部 URL,我们需要触发所有这些 URL。为了启动它,我们使用了 Akka Streams,它运行得非常好,根据需要做一些背压。但是我们仍然没有找到一种有效阅读所有内容的方法。
到目前为止,我们已经尝试过:
使用 Akka Stream 将数据作为 Stream 读取。我们正在使用为特定表提供发布者的 phantom-dsl。但它并没有阅读所有内容,只阅读了一小部分。实际上它在第一个 100 万之后停止阅读。
在特定日期使用 Spark 阅读。我们的表被建模为一个时间序列表,有年、月、日、分钟……列。现在我们是按天选择的,所以 Spark 不会获取很多要处理的东西,但是这些天选择是一件很痛苦的事情。
代码如下:
val cassandraRdd =
sc
.cassandraTable("keyspace", "my_table")
.select("id", "url")
.where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
不幸的是,我无法遍历分区以获取更少的数据,我必须使用收集,因为它抱怨演员不可序列化。
val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
val source =
Source
.actorRef[CassandraRow](10000000, OverflowStrategy.fail)
.map(row => makeUrl(row.getString("id"), row.getString("url")))
.map(url => HttpRequest(uri = url) -> url)
val ref = Flow[(HttpRequest, String)]
.via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
.to(Sink.actorRef(httpHandlerActor, IsDone))
.runWith(source)
cassandraRdd.collect().foreach { row =>
ref ! row
}
我想知道你们中是否有人有这样的经验来阅读数百万行来做与聚合等不同的事情。
我还考虑阅读所有内容并发送到 Kafka 主题,在那里我将使用 Streaming(spark 或 Akka)接收,但问题是一样的,如何有效地加载所有这些数据?
编辑
目前,我正在一个具有合理内存量 100GB 的集群上运行,并对其进行收集和迭代。
此外,这与使用 spark 获取大数据并使用 reduceByKey、aggregateByKey 等进行分析大不相同。
我需要通过 HTTP 获取和发送所有内容 =/
到目前为止,它的工作方式与我一样,但我担心这些数据会变得越来越大,以至于将所有内容都提取到内存中毫无意义。
流式传输这些数据将是最好的解决方案,分块获取,但我还没有找到一个好的方法。
最后,我正在考虑使用 Spark 来获取所有这些数据,生成一个 CSV 文件并使用 Akka Stream IO 来处理,这样我会驱逐很多东西,因为它需要几个小时来处理每个百万。