我有一个带有两个节点的 cassandra 集群。我已经设置了 spark 作业来从这个具有 3651568 个键的 cassandra 集群中查询。
import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)
val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)
我能够查询前 1000 行,但我无法找到从1001th行读取到第 2000 行的方法,以便我可以使用 spark 作业从 Cassandra 表中批量读取数据。
根据我开始使用 java 驱动程序的建议
这是完整的解释
我必须使用datastax java驱动程序从cassandra数据库查询。我正在使用datastax java驱动程序版本cassandra-java-driver-3.5.1和apache-cassandra版本apache-cassandra-3.0.9,我尝试通过安装jar来解决依赖关系我还检查了yaml文件种子,listen_address,rpc_address都指向到我的主机并且 start_native_transport 设置为 true 这是我的 java 代码,用于建立与 cassandra 数据库的连接`
import java.net.InetAddress;
import com.datastax.driver.core.Metadata;
import java.net.UnknownHostException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
public class Started {
public void connect()
{
try
{
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
System.out.println("Connected to cluster:");
session= cluster.connect("demo");
Row row = session.execute("SELECT ename FROM demo.emp").one();
System.out.println(row.getString("ename"));
cluster.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
Started st = new Started();
st.connect();
}
}
`
我在 cassandra 集群中只有一个节点,它已启动并正在运行。我也可以在 9042 端口上对其进行 cqlsh .. 到目前为止一切顺利,但是当我运行我的 java 程序时,我收到此错误或异常消息...
Connected to cluster:
`
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
at com.datastax.driver.core.Cluster.init(Cluster.java:160)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
at Started.connect(Started.java:22)
at Started.main(Started.java:34)
`
谁能帮忙!!