1

在 mapPartition 中执行多项选择时。我按行做了 2 个准备好的请求。

寻求建议,代码如下所示

 source.mapPartitions { partition =>
   lazy val prepared: PreparedStatement = ...
   cc.withSessionDo { session =>
       partition.map{ row =>
          session.execute(prepared.bind(row.get("id"))
       }
   }
 }

当批次达到〜400行时,它会抛出一个

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /localhost:9042 (com.datastax.driver.core.ConnectionException: [/localhost:9042] Pool is CLOSING))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:216)
at com.datastax.driver.core.RequestHandler.access$900(RequestHandler.java:45)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:276)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:552)
at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:589)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:97)
... 25 more

它已尝试更改配置以查看是否可以执行某些操作,但错误仍在弹出

  .set("spark.cassandra.output.batch.size.rows", "auto")
  .set("spark.cassandra.output.concurrent.writes", "500")
  .set("spark.cassandra.output.batch.size.bytes", "100000")
  .set("spark.cassandra.read.timeout_ms", "120000")
  .set("spark.cassandra.connection.timeout_ms" , "120000")

这种代码在spark cassandra 连接器中工作,但可能有些我没见过

引发异常后,下一批流连接到 cassandra 没有问题。

我是否因同时请求太多而使我的 cassandra 超时?

我使用带有火花连接器 1.4.0-M3 和驱动程序 2.1.7.1 的 cassandra 2.1.3

4

0 回答 0