我使用 cdh hbase-spark 从 mapwithstate 中的 hbase 读取数据,但我无法获得 hbasecontext,因为 hbasecontext 需要 sparkcontext 而我无法在 mapwith 状态下获得它。
主代码
val snapState = jsonO.map(x => (x.get("liveId").toString, x)).groupByKey().mapWithState(LiveState2.get.state).cache()
LiveState2 中的代码
class LiveState2 extends StateMg with Serializable {
val stateLive = List("END", "LIVING", "PAUSE", "WAIT", "NOT_FOUND")
logger.info(s"meetstatePath===$statePath")
val state: StateSpec[String, scala.Iterable[JSONObject], LiveInfo, (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)])] = ToHdfs.getInstance.isExist(statePath + "/_SUCCESS") match {
case true =>
logger.info("hdfs does have meetStat::::")
val kk = sc.objectFile[(String, LiveInfo)](statePath)
if (!Try(kk.partitions.isEmpty).isFailure) StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong)).initialState(kk) else StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
case false =>
logger.info("hdfs does not have meetStat::::")
StateSpec.function(updateFun _).timeout(Seconds(con.get("spark.dsTimeOut").toLong))
}
def updateFun(LiveId: String, one: Option[scala.Iterable[JSONObject]], state: State[LiveInfo]): (String, List[(String, Int, Int, Int, Int, Long)], Array[(Long, String, String, String, String, String, String, String, String, String, String, Int)]) = {
// val stateInit = LiveInfo(Array((0, 0, "END"), (0, 0, "LIVING"), (0, 0, "PAUSE"), (0, 0, "WAIT"), (0, 0, "NOT_FOUND")), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 10000, 0.0001))
val stateInit = LiveInfo(Array((0, 0, stateLive(0)), (0, 0, stateLive(1)), (0, 0, stateLive(2)), (0, 0, stateLive(3)), (0, 0, stateLive(4))), Array(), BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("utf-8")), 100000, 0.0001))
val uniqueStrMem = ArrayBuffer[String]()
val liveState = if (!state.exists()) {
val dbStatisState = new LiveStateReco().getState(LiveId)
val dbbloomState = new LiveStateBloomfilter().getState(LiveId) //read hbase code
LiveStateBloomfilter 中的代码
class LiveStateBloomfilter extends LoggingTime with Serializable {
def getState(liveId: String): Array[String] = {
val hbaseCon = HbaseConfCDH.hbaseContest //can not get hbasecontext for it need sparkcontext
val tablename: String = "live_state"
val scan = new Scan
val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(liveId + "*"))
scan.setFilter(filter)
scan.setCaching(100)
val result = ArrayBuffer[String]()
logger.warn(HbaseConfCDH.sc)
val resultBroadCast = HbaseConfCDH.sc.broadcast(result)
logger.warn("state recovery!!!")
hbaseCon.hbaseRDD(TableName.valueOf(tablename), scan)