我希望能够使用 SparkR SparkDataFrame 上的 Java 方法将数据写入 Cassandra。
sparklyr
例如,使用扩展,我可以做这样的事情:
sparklyr::invoke(sparklyr::spark_dataframe(spark_tbl), "write") %>>%
sparklyr::invoke("format", "org.apache.spark.sql.cassandra") %>>%
sparklyr::invoke("option", "keyspace", keyspace) %>>%
sparklyr::invoke("option", "table", table) %>>%
sparklyr::invoke("mode", "append") %>%
sparklyr::invoke("save")
它可以实现每秒约 20k 行的写入速度。
然而,对于我的用例,我希望能够使用SparkR::spark.lapply
这样我可以在本地收集我的 Cassandra 表的子集,在它们上运行脚本并将数据写回。我尝试使用的每种方法sparklyr
都以单线程结束,因此根本没有使用 spark。
使用SparkR
,我可以使用以下方式编写数据:
SparkR::saveDF(SparkR::as.DataFrame(dt_local), "",
source = "org.apache.spark.sql.cassandra",
table = table,
keyspace = keyspace,
mode = "append")
但是在这种情况下,写入速度接近每秒 2k 行。我想我可以SparkR::sparkR.callJMethod
用来调用与案例相同的链sparklyr
来实现更高的写入速度,但是我首先需要序列化具有我无法做到SparkDataFrame
的句柄的那些。jobj
这可能吗?
如果可能的话,我也愿意接受任何其他实现这一目标的方法。我已经调查过尝试在两者之间移动sparkR
,sparklyr
但后端似乎太不同了(据我所知)。从这里我也相信,到目前为止还没有类似lapply
的东西sparklyr
。
谢谢你的帮助