问题标签 [spark-cassandra-connector]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 当数据很大时,如何将数据从 cassandra 缓存到 spark?
我从cassandra取一些数据到spark,当数据足够大并且无法一次缓存在内存中时,我必须使用spark.cassandra.input.split.size_in_mb来设置机器一次可以获得多大的数据。但是我还想使用 缓存数据更多时间,代码如下:
这是正确的吗?如果正确,它是如何工作的?当它出错时,正确的方法是什么?
cassandra - cassandra - 查找谁做了什么的日志
我是 cassandra 的新手,主要来自 Devops 方面,只是安装和配置 cassandr 并提供它以供使用。
在某些情况下,表格被截断,我不确定是谁在做,有没有办法检查谁做了截断或对 cassandra 的更改?
apache-spark - Spark RDD 映射 1 到多个
我是新来的火花,我有一个问题。我正在处理使用 textFile() 生成的 RDD,它是一个 csv 文件。对于每一行,我想将多行返回到一个新的 RDD(一个而不是多个)。这是我的代码:
我在这里所做的是过滤初始 csv 以仅获取 LinearAccelerationEvent,然后我想将这些对象映射到 LinearAccelerationEvent 类并生成 LinearAccelerationEvent 对象的新 RDD。对于初始 csv 文件的每一行,我必须生成多个 LinearAccelerometerEvent 对象,但我不知道该怎么做。我之所以要这样做是因为稍后这个RDD会像这样被推送到cassandra:
所以理想的解决方案是这样的:
我可以使用该foreachPartition()
函数并将 for 循环的每个事件推送到 Cassandra,但我发现这种方法要慢得多。是否可以不让用户 foreach 做我想做的事?谢谢
scala - 无法从 Spark 连接到 Cassandra(接触点包含多个数据中心)
我正在尝试运行我的第一个 spark 作业(访问 Cassandra 的 Scala 作业),该作业失败并显示以下错误:
我们在这里做错了什么?
我在用 :
- 火花 1.5.2
- 阿帕奇卡桑德拉 2.1.10
- spark-cassandra 连接器 1.3.1 /1.5.0-M2(尝试了两个连接器)
- 斯卡拉版本 2.10.4
apache-spark - 使用 Spark Streaming 对 Cassandra 进行原子写入
我对 Cassandra (2.1.11) 和 Spark (1.4.1) 都很陌生,并且有兴趣知道是否有人已经看到/开发了使用 Spark Streaming 对两个不同 Cassandra 表进行原子写入的解决方案。
我目前有两个表,它们保存相同的数据集,但具有不同的分区键。为简单起见,我将使用熟悉的 User 表示例进行说明:
该email_address
列将具有高基数(实际上它将在user_id
值数量的 50% 到 100% 之间)。高基数使二级索引表现不佳,因此需要第二张表。
我正在使用 Spark Streaming 处理num
列中的更改并更新这两个表。据我了解,该saveToCassandra()
方法在 UNLOGGED BATCH 中为 RDD 中的每个项目执行写入,从而执行原子写入(如此处的“保存对象集合”部分所述)。但是,saveToCassandra()
只能用于保存到单个表。为了使 theschema1.user_by_user_id
和schema1.user_by_email_address
表保持同步,我必须发出两个单独的saveToCassandra()
调用:
每次调用中发生的写入都以原子方式完成,但两个调用一起不是原子的。第二次调用中的一些错误将使两个表不同步。
显然我的数据集和实际表结构比这更复杂,但我试图以尽可能简单的方式传达我的问题的要点。虽然我的问题是针对能够保存到两个表的,但我欢迎任何有关数据模型更改的替代建议,这将完全消除这种需求。
apache-spark - 如何为 Scala 2.11 组装 Cassandra 连接器?
有人可以告诉我如何构建 Spark-Cassandra 连接器组件吗?我已经尝试按照 Github 页面https://github.com/datastax/spark-cassandra-connector上的说明进行操作,但我只收到数百个“重复数据删除”错误。
我将 Scala 2.11.7 与 Spark 1.5.1(我为 Scala 2.11 构建)和 SBT 13.8 一起使用。
我做了以下事情:
构建过程运行了一段时间,但随后开始吐出数百个“重复数据删除”错误并失败。我不知道从哪里开始解决这个问题,但据我所知,这个项目的程序集构建过程现在不起作用。
关于如何解决这个问题的任何提示?
java - Spark java.lang.NoClassDefFoundError 中的 spark-cassandra-connector 出错:com/datastax/driver/core/ProtocolOptions$Compression
当我尝试使用 spark-cassandra-connector 连接到 cassandra 时出现此错误:
线程“main”中的异常 java.lang.NoClassDefFoundError: com/datastax/driver/core/ProtocolOptions$Compression at com.datastax.spark.connector.cql.CassandraConnectorConf$.(CassandraConnectorConf.scala:112) at com.datastax.spark .connector.cql.CassandraConnectorConf$.(CassandraConnectorConf.scala) at com.datastax.spark.connector.cql.CassandraConnector$.apply(CassandraConnector.scala:192) at com.datastax.spark.connector.SparkContextFunctions.cassandraTable$default$3 (SparkContextFunctions.scala:48) 在 main.scala.TestSpark$.main(TestSpark.scala:19) 在 main.scala.TestSpark.main(TestSpark.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun .reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl。在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain( SparkSubmit.scala:672) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 在 org. apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:java.lang.ClassNotFoundException: com.datastax.driver.core .ProtocolOptions$Compression at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java .net.URLClassLoader。findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader .java:358) ... 还有 15 个我在 spark 类路径 spark-cassandra-connector_2.11-1.5.0-M2.jar 中添加了 jar
我在 sbt 文件中添加了依赖项:
这是我尝试执行的 scala 程序:
这就是我运行它的方式:
你能帮我理解我做错了什么吗?
谢谢!
编辑:
我尝试在依赖项列表和 spark 类路径中添加 Datastax 驱动程序:
最后一个错误不再出现,但现在我有另一个错误:
线程“主”java.lang.NoSuchMethodError 中的异常:scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; 在 com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala) 在 com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2。在 com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150) 在 com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache. scala:31) 在 com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 在 com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) 在 com.datastax .spark.connector.cql.CassandraConnector。
编辑2:在编译时制作scala 2.10.6(与spark的scala版本相同)以前的错误不再出现,但我有这个新错误:
com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:36) 处的 com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:36) 处的线程“main”中的异常 java.lang.NoClassDefFoundError: com/google/common/util/concurrent/AsyncFunction。 spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala: 155) 在 com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150) 在 com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala: 150) 在 com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 在 com.datastax。spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 在 com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) 在 com.datastax.spark.connector.cql.CassandraConnector。 withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:241 ) 在 com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) 在 com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51)。 datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) 在 com.datastax.spark.connector.rdd。CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:150) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD. scala:143) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD. scala:237) 在 org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala) 的 scala.Option.getOrElse(Option.scala:120) :1919) 在 org.apache.spark.rdd.RDD.count(RDD.scala:1121) 在 main.scala.TestSpark$.main(TestSpark.scala:20) 在 main.scala.TestSpark.main(TestSpark.scala ) 在 sun.reflect.NativeMethodAccessorImpl。在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:43) 的 invoke0(Native Method) 606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit. scala:180) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 在 org.apache.spark .deploy.SparkSubmit.main(SparkSubmit.scala) 引起:java.lang.ClassNotFoundException:com.google.common.util.concurrent.AsyncFunction at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java。网。URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java :425) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 34 更多
apache-spark - 与 CQL 相比,Spark 有哪些额外的好处?
我们正在为 cassandra 探索 SPARK,以克服 CQL 的限制。
我们最初仅限于 CQL,但在 RDBMS 上几乎没有遇到障碍/障碍。仅举几例如下
- 为了在列上比较>(大于)和<(小于),我们被限制为具有 Clustering 键中的列。即使我在集群中有一个列,我仍然应该提供分区键来对集群键执行 < 或 >。
- 无法检查任何列值的 NULL
- 为了查询其他分区键的任何列,我们必须在该列上创建索引
- ORDER BY 不是 CLUSTERING KEY 的列
- 分组限制
- 连接表
我是 cassandra 的新手,由于限制,我经常重新访问我的模式。
因此类似于 HDFS 的 HIVE/PIG,Spark 比 CQL 有什么额外的好处?
scala - 如何使用 Spark DataFrames 查询 JSON 数据列?
我有一个 Cassandra 表,为了简单起见,它看起来像:
我可以为此使用 spark 和 spark-cassandra-connector 创建一个基本数据框:
我正在努力将 JSON 数据扩展到其底层结构。我最终希望能够根据 json 字符串中的属性进行过滤并返回 blob 数据。类似 jsonData.foo = "bar" 并返回 blobData。这目前可能吗?
apache-spark - 启动spark任务时一个节点cassanda过载
我有 20 个节点集群,带有 cassandra 2.1.11、独立 spark 1.5 和 cassandra-connector 2.10:1.5。当我运行 spark 任务时,它们成功完成,但在启动任务的最初时刻,其中一个节点非常过载(操作系统负载约为 90%,cassandra 堆使用率为 100%),否则另一个节点保持 10% 的 cpu 使用率和 20% 的堆使用率。几秒钟后,堆正常出现(3-4Gib),但在某些情况下,我有 OutOfMemory(按堆)或/和 GC 长时间暂停。在所有测试中,单个重载节点是相同的。
所有节点都具有相同的 cassandra 参数和 xmx=19Gib。
如何解决单个 cassandra 节点过载?