3

我一直在努力sparklyr将大型 cassandra 表带入 spark,用 R 注册这些表并对dplyr它们进行操作。

我已经使用如下代码成功导入了 cassandra 表:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间来导入/注册,有些会导致内存溢出,即使有 6 个节点运行总共 60 个内核和 192gb RAM。但是,我通常只需要每个 cassandra 数据库中的一些列。

我的问题是:

  1. 是否可以在导入/注册时过滤 cassandra 数据库,使其仅导入某些列,或者在主键上对其进行过滤(即通过传递SQL/CQL类型查询,例如SELECT name FROM cass_table WHERE id = 5)?
  2. 在上面的代码中,这样的查询会放在哪里,语法采用什么形式?

我尝试在选项列表中添加这样的查询作为附加选项,即:

list(. . . , select = "id")

以及在之前将其作为单独的管道调用%>% invoke("load"),即:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

但这些都行不通。有什么建议么?

4

1 回答 1

4

您可以跳过急切的缓存并选择感兴趣的列:

session <- spark_session(sc)

# Some columns to select
cols <- list("x", "y", "z")

cass_df <- session %>% 
  invoke("read") %>% 
  invoke("format", "org.apache.spark.sql.cassandra") %>% 
  invoke("options", as.environment(list(keyspace="test"))) %>% 
  invoke("load") %>% 
  # We use select(col: String, cols* String) so the first column
  # has to be used separately. If you want only one column the third argument
  # has to be an empty list 
  invoke("select", cols[[1]], cols[2:length(cols)]) %>%
  # Standard lazy cache if you need one
  invoke("cache")

如果您使用的谓词可以显着减少获取的数据集pushdown选项的数量"true"(默认)并在缓存filter 之前使用。

如果要传递更复杂的查询,请注册临时视图和sql方法:

session %>%
  invoke("read") %>% 
  ...
  invoke("load") %>% 
  invoke("createOrReplaceTempView", "some_name")

cass_df <- session %>% 
  invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
  invoke("cache")
于 2017-03-03T14:07:13.100 回答