1

我正在使用 Spark 2.1.0 并尝试连接 Cassandra 集群。我使用了最新的 sparklyr。我已将默认配置设置为以下默认值:

  # local-only configuration
  sparklyr.cores.local: !expr parallel::detectCores()
  spark.sql.shuffle.partitions.local: !expr parallel::detectCores()

  # cassandra settings
spark.cassandra.connection.host:<Cassandra IP>
spark.cassandra.auth.username: <uid>
spark.cassandra.auth.password:<pass>

sparklyr.defaultPackages:
- com.databricks:spark-csv_2.11:1.5.0
- com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-RC1
- com.datastax.cassandra:cassandra-driver-core:3.1.4

jar 位于源文件所在的根目录中。

我执行了以下操作。一切都很顺利,直到我尝试调用读取函数。我已经明确设置了 jar 位置。

   > library(sparklyr)
    > config <- spark_config()
    Warning message:
    In readLines(input, encoding = "UTF-8") :
      incomplete final line found on '/home/bsc/BSCAnalytics/config.yml'
    > config[["sparklyr.jars.default"]] <- c("/home/bsc/BSCAnalytics/cassandra-driver-core-3.1.4.jar")
    > 
    > sc <- spark_connect(master = "local", version = "2.1.0")
    Warning message:
    In readLines(input, encoding = "UTF-8") :
      incomplete final line found on '/home/bsc/BSCAnalytics/config.yml'
    > Spark.session <- sparklyr::invoke_static(sc, "org.apache.spark.sql.SparkSession", "builder") %>% sparklyr::invoke("config", "spark.cassandra.connection.host", "<Cassandra IP>") %>% sparklyr::invoke("getOrCreate")

当我尝试调用读取函数时,运行时无法找到罐子。我目睹了以下错误:

> event_df <- invoke(Spark.session, "read") %>% invoke("format", "org.apache.spark.sql.cassandra") %>% invoke("option", "keyspace", "kps") %>% invoke("option", "table", "tab_event") %>% invoke("load")
Error: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:569)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke$.invoke(invoke.scala:94)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
    at sparklyr.StreamHandler$.read(stream.scala:55)
    at sparklyr.BackendHandler.channelRead0(handler.scala:49)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:554)
    ... 39 more
4

1 回答 1

0

你可以使用这样的东西:

library(sparklyr)

config <- spark_config()
config[["sparklyr.defaultPackages"]] <- c(
   "datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11")

sc <- spark_connect(master = "local", version = "1.6.1", config = config)

df <- sparklyr:::spark_data_read_generic(
  sc,
  "org.apache.spark.sql.cassandra",
  "format", list(
    keyspace = "dev",
    table = "emp"
)) %>% invoke("load")

cassandra_tbl <- sparklyr:::spark_partition_register_df(
  sc,
  df,
  name = "emp",
  repartition = 0,
  memory = FALSE)

cassandra_tbl

另请参阅https://github.com/rstudio/sparklyr/issues/520

于 2017-02-28T23:53:07.660 回答