在 mapPartition 中执行多项选择时。我按行做了 2 个准备好的请求。
寻求建议,代码如下所示
source.mapPartitions { partition =>
lazy val prepared: PreparedStatement = ...
cc.withSessionDo { session =>
partition.map{ row =>
session.execute(prepared.bind(row.get("id"))
}
}
}
当批次达到〜400行时,它会抛出一个
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /localhost:9042 (com.datastax.driver.core.ConnectionException: [/localhost:9042] Pool is CLOSING))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:216)
at com.datastax.driver.core.RequestHandler.access$900(RequestHandler.java:45)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:276)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:552)
at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:589)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:97)
... 25 more
它已尝试更改配置以查看是否可以执行某些操作,但错误仍在弹出
.set("spark.cassandra.output.batch.size.rows", "auto")
.set("spark.cassandra.output.concurrent.writes", "500")
.set("spark.cassandra.output.batch.size.bytes", "100000")
.set("spark.cassandra.read.timeout_ms", "120000")
.set("spark.cassandra.connection.timeout_ms" , "120000")
这种代码在spark cassandra 连接器中工作,但可能有些我没见过
引发异常后,下一批流连接到 cassandra 没有问题。
我是否因同时请求太多而使我的 cassandra 超时?
我使用带有火花连接器 1.4.0-M3 和驱动程序 2.1.7.1 的 cassandra 2.1.3