0

我正在尝试创建 Spark Kafka Cassandra 集成。现在我可以连接到 cassandra 但是当我尝试使用创建 SparkStreamingContext 对象时

val ssc = new StreamingContext(sparkConf, Seconds(60))

我能够导入和编写上面的代码。但是当我尝试构建和运行代码时,我面临以下错误:

org/apache/spark/SparkConf
at KafkaSparkCassandra$.main(KafkaSparkCassandra.scala:38)
at KafkaSparkCassandra.main(KafkaSparkCassandra.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at       sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at        sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more

现在我无法理解为什么我无法在运行时创建 SparkStreaming 对象。

请帮忙。因为我是使用整个 scala 和 lambda 架构堆栈的新手。

下面是 build.sbt 里面的配置:

libraryDependencies ++=Seq(
  "org.apache.spark"    %     "spark-core_2.10"                 %   "1.4.1",
  "com.datastax.spark"  %     "spark-cassandra-connector_2.10"  %   "1.4.0",
  "org.apache.spark" % "spark-sql_2.10" % "1.4.1",
  "mysql" % "mysql-connector-java" % "5.1.12")

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided"


libraryDependencies += ("org.apache.spark" %% "spark-streaming-kafka" % "1.6.0").exclude("org.spark-project.spark", "unused")


/*
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"  % "provided"
*/


javaOptions ++= Seq("-Xmx5G", "-XX:MaxPermSize=5G", "-XX:+CMSClassUnloadingEnabled"

以下是日志。现在 M 无法打印字数并将其存储到 cassandra db 中。

log4j:WARN No appenders could be found for logger (com.datastax.driver.core.SystemProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/11/15 18:54:52 INFO SparkContext: Running Spark version 1.6.0
16/11/15 18:54:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/15 18:54:52 INFO SecurityManager: Changing view acls to: romit.srivastava
16/11/15 18:54:52 INFO SecurityManager: Changing modify acls to: romit.srivastava
16/11/15 18:54:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(romit.srivastava); users with modify permissions: Set(romit.srivastava)
16/11/15 18:54:53 INFO Utils: Successfully started service 'sparkDriver' on port 53789.
16/11/15 18:54:53 INFO Slf4jLogger: Slf4jLogger started
16/11/15 18:54:53 INFO Remoting: Starting remoting
16/11/15 18:54:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:53802]
16/11/15 18:54:53 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 53802.
16/11/15 18:54:54 INFO SparkEnv: Registering MapOutputTracker
16/11/15 18:54:54 INFO SparkEnv: Registering BlockManagerMaster
16/11/15 18:54:54 INFO DiskBlockManager: Created local directory at C:\Users\romit.srivastava\AppData\Local\Temp\blockmgr-c60aeba8-a317-4066-99ce-71ec3595bdf3
16/11/15 18:54:54 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/11/15 18:54:54 INFO SparkEnv: Registering OutputCommitCoordinator
16/11/15 18:54:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
16/11/15 18:54:54 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
16/11/15 18:54:54 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
16/11/15 18:54:54 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
16/11/15 18:54:54 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
16/11/15 18:54:54 INFO Utils: Successfully started service 'SparkUI' on port 4045.
16/11/15 18:54:54 INFO SparkUI: Started SparkUI at http://192.168.56.1:4045
16/11/15 18:54:54 INFO Executor: Starting executor ID driver on host localhost
16/11/15 18:54:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53814.
16/11/15 18:54:54 INFO NettyBlockTransferService: Server created on 53814
16/11/15 18:54:54 INFO BlockManagerMaster: Trying to register BlockManager
16/11/15 18:54:54 INFO BlockManagerMasterEndpoint: Registering block manager localhost:53814 with 2.4 GB RAM, BlockManagerId(driver, localhost, 53814)
16/11/15 18:54:54 INFO BlockManagerMaster: Registered BlockManager
16/11/15 18:54:55 INFO VerifiableProperties: Verifying properties
16/11/15 18:54:55 INFO VerifiableProperties: Property group.id is overridden to 
16/11/15 18:54:55 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/11/15 18:54:58 INFO Cluster: New Cassandra host /136.243.174.23:9042 added
16/11/15 18:54:58 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/11/15 18:55:00 INFO ForEachDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO ShuffledDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO FilteredDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@2d1a0e90
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@678a042d
16/11/15 18:55:00 INFO FlatMappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO FlatMappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO FlatMappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@7d8e7cf5
16/11/15 18:55:00 INFO FilteredDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO FilteredDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO FilteredDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO FilteredDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@183e79df
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@652d8ac6
16/11/15 18:55:00 INFO ShuffledDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO ShuffledDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO ShuffledDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@52b15122
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@5c56f655
16/11/15 18:55:00 INFO ForEachDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO ForEachDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO ForEachDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@37cd8c81
16/11/15 18:55:00 INFO ForEachDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO ShuffledDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO FilteredDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO MappedDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@2d1a0e90
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@678a042d
16/11/15 18:55:00 INFO FlatMappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO FlatMappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO FlatMappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@7d8e7cf5
16/11/15 18:55:00 INFO FilteredDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO FilteredDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO FilteredDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO FilteredDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO FilteredDStream: Initialized and validated org.apache.spark.streaming.dstream.FilteredDStream@183e79df
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@652d8ac6
16/11/15 18:55:00 INFO ShuffledDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO ShuffledDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO ShuffledDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@52b15122
16/11/15 18:55:00 INFO MappedDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO MappedDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO MappedDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@5c56f655
16/11/15 18:55:00 INFO ForEachDStream: Slide time = 60000 ms
16/11/15 18:55:00 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/11/15 18:55:00 INFO ForEachDStream: Checkpoint interval = null
16/11/15 18:55:00 INFO ForEachDStream: Remember duration = 60000 ms
16/11/15 18:55:00 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@3e3f4b04
16/11/15 18:55:00 INFO RecurringTimer: Started timer for JobGenerator at time 1479216360000
16/11/15 18:55:00 INFO JobGenerator: Started JobGenerator at 1479216360000 ms
16/11/15 18:55:00 INFO JobScheduler: Started JobScheduler
16/11/15 18:55:00 INFO StreamingContext: StreamingContext started
16/11/15 18:55:00 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/11/15 18:55:30 INFO JobGenerator: Stopping JobGenerator immediately
16/11/15 18:55:30 INFO RecurringTimer: Stopped timer for JobGenerator after time -1
16/11/15 18:55:30 INFO JobGenerator: Stopped JobGenerator
16/11/15 18:55:30 INFO JobScheduler: Stopped JobScheduler
16/11/15 18:55:30 INFO StreamingContext: StreamingContext stopped successfully
16/11/15 18:55:30 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4045
16/11/15 18:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/11/15 18:55:30 INFO MemoryStore: MemoryStore cleared
16/11/15 18:55:30 INFO BlockManager: BlockManager stopped
16/11/15 18:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/15 18:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/11/15 18:55:30 INFO SparkContext: Successfully stopped SparkContext
16/11/15 18:55:30 WARN StreamingContext: StreamingContext has already been stopped
16/11/15 18:55:30 INFO SparkContext: SparkContext already stopped.
16/11/15 18:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/11/15 18:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/11/15 18:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
4

1 回答 1

1

这主要是由于火花模块的不同版本。通过修复版本,我能够运行代码。

此外,我能够进行字数统计并将其保存到 cassandra。

于 2018-05-17T10:48:12.567 回答