1

我使用 cdh hbase-spark 从 mapwithstate 中的 hbase 读取数据,但我无法获得 hbasecontext,因为 hbasecontext 需要 sparkcontext 而我无法在 mapwith 状态下获得它。

主代码

val snapState = jsonO.map(x => (x.get("liveId").toString, x)).groupByKey().mapWithState(LiveState2.get.state).cache()

LiveState2 中的代码

class LiveState2 extends StateMg with Serializable {
  val stateLive = List("END", "LIVING", "PAUSE", "WAIT", "NOT_FOUND")
  logger.info(s"meetstatePath===$statePath")
  val state: StateSpec[String, scala.Iterable[JSONObject], LiveInfo, (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)])] = ToHdfs.getInstance.isExist(statePath + "/_SUCCESS") match {
    case true =>
      logger.info("hdfs does have meetStat::::")
      val kk = sc.objectFile[(String, LiveInfo)](statePath)
      if (!Try(kk.partitions.isEmpty).isFailure) StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong)).initialState(kk) else StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
    case false =>
      logger.info("hdfs does not have meetStat::::")
      StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
  }



def updateFun(LiveId: String, one: Option[scala.Iterable[JSONObject]], state: State[LiveInfo]): (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)]) = {
    //    val stateInit = LiveInfo(Array((0, 0, "END"), (0, 0, "LIVING"), (0, 0, "PAUSE"), (0, 0, "WAIT"), (0, 0, "NOT_FOUND")), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 10000, 0.0001))
    val stateInit = LiveInfo(Array((0, 0, stateLive(0)), (0, 0, stateLive(1)), (0, 0, stateLive(2)), (0, 0, stateLive(3)), (0, 0, stateLive(4))), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 100000, 0.0001))
    val uniqueStrMem = ArrayBuffer[String]()
    val liveState = if (!state.exists()) {
      val dbStatisState = new LiveStateReco().getState(LiveId)
      val dbbloomState = new LiveStateBloomfilter().getState(LiveId) //read hbase code

LiveStateBloomfilter 中的代码

class LiveStateBloomfilter extends LoggingTime with Serializable {

  def getState(liveId: String): Array[String] = {
    val hbaseCon = HbaseConfCDH.hbaseContest    //can not get hbasecontext for it need sparkcontext
    val tablename: String = "live_state"
    val scan = new Scan
    val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(liveId + "*"))
    scan.setFilter(filter)
    scan.setCaching(100)
    val result = ArrayBuffer[String]()
    logger.warn(HbaseConfCDH.sc)
    val resultBroadCast = HbaseConfCDH.sc.broadcast(result)
    logger.warn("state recovery!!!")
    hbaseCon.hbaseRDD(TableName.valueOf(tablename), scan)
4

0 回答 0