我整天都在苦苦挣扎,没有找到解决办法。
我正在尝试使用 spark-cassandra 连接器通过 Spark Streaming 应用程序连接远程 Cassandra 节点,并且该应用程序存在异常。任何帮助将非常感激。
2015-02-17 19:13:58 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:13:58 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:13:59 DEBUG ControlConnection:262 - [Control connection] Refreshing schema
2015-02-17 19:14:00 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:14:00 DEBUG ControlConnection:172 - [Control connection] Successfully connected to /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:00 INFO Cluster:1267 - New Cassandra host /<MY_PUBLIC_IP>:9042 added
2015-02-17 19:14:00 INFO CassandraConnector:51 - Connected to Cassandra cluster: Test Cluster
2015-02-17 19:14:00 INFO LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:14:01 DEBUG Session:304 - Added connection pool for /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:01 INFO LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Schema:55 - Retrieving database schema from cluster Test Cluster...
2015-02-17 19:14:01 DEBUG Schema:55 - 1 keyspaces fetched from cluster Test Cluster: {vehicles}
2015-02-17 19:14:02 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <MY_PUBLIC_IP>:9160
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 DEBUG Cluster:1340 - Shutting down
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 INFO CassandraConnector:51 - Disconnected from Cassandra cluster: Test Cluster
2015-02-17 19:14:03 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.491517 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:56 - [actor] handled message (69.725123 ms) Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.348586 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:56 - [actor] handled message (2.020429 ms) Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:24 ERROR ServerSideTokenRangeSplitter:88 - Failure while fetching splits from Cassandra
java.io.IOException: Failed to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:132)
at com.datastax.spark.connector.cql.CassandraConnector.withCassandraClientDo(CassandraConnector.scala:141)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.com$datastax$spark$connector$rdd$partitioner$ServerSideTokenRangeSplitter$$fetchSplits(ServerSideTokenRangeSplitter.scala:33)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1$$anonfun$apply$2.apply(ServerSideTokenRangeSplitter.scala:45)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1$$anonfun$apply$2.apply(ServerSideTokenRangeSplitter.scala:45)
at scala.util.Try$.apply(Try.scala:161)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1.apply(ServerSideTokenRangeSplitter.scala:45)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$1.apply(ServerSideTokenRangeSplitter.scala:44)
at scala.collection.immutable.Stream.map(Stream.scala:376)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:44)
at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:77)
at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:76)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect
at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(TFramedTransportFactory.java:41)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:47)
at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:127)
... 41 more
Caused by: java.net.ConnectException: Connection timed out: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 45 more
Exception in thread "main" java.io.IOException: Failed to fetch splits of TokenRange(0,0,Set(CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)),None) from all endpoints: CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split$2.apply(ServerSideTokenRangeSplitter.scala:55)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split$2.apply(ServerSideTokenRangeSplitter.scala:49)
at scala.Option.getOrElse(Option.scala:120)
at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:49)
at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:77)
at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf$1.apply(CassandraRDDPartitioner.scala:76)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-02-17 19:14:24 DEBUG DiskBlockManager:63 - Shutdown hook called
一开始它看起来很好(连接成功,获取密钥空间......)但是,当它尝试打开节俭连接时,它失败,断开连接并关闭。
我已经打开了端口 9160、9042 和 7000。
在 cassandra.yaml 我设置
listen_address: <AWS_LOCAL_IP>
broadcast_address: <MY_PUBLIC_IP>
我错过了什么?