1

我正在使用带有 Spyglass 的 Scalding 来读取/写入 HBase。

我正在对 table1 和 table2 进行左外连接,并在转换列后写回 table1。table1 和 table2 都被声明为 Spyglass HBaseSource。

这工作正常。但是,我需要使用 rowkey 访问 table1 中的不同行来计算转换后的值。

我为 HBase 尝试了以下获取: val hTable = new HTable(conf, TABLE_NAME) val result = hTable.get(new Get(rowKey.getBytes()))

我可以访问此链接中提到的 Scalding 作业中的配置:

https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-access-the-jobconf

当我在本地运行烫伤作业时,这有效。但是,当我在集群中运行它时,在 Reducer 中执行此代码时,conf 为空。

对于这种情况,是否有更好的方法在 Scalding/Cascading 作业中进行 HBase 获取/扫描?

4

1 回答 1

0

这样做的方法...

1)您可以使用托管资源

class SomeJob(args: Args) extends Job(args) {      
  val someConfig = HBaseConfiguration.create().addResource(new Path(pathtoyourxmlfile))
  lazy val hPool = new HTablePool(someConfig, 3)

  def getConf = {
    implicitly[Mode] match {
      case Hdfs(_, conf) => conf
      case _ => whateveryou are doing for a local conf...
    }
  }
  ... somePipe.someOperation.... {
        val gets = key.map { key => new Get(key) }
        managed(hPool.getTable("myTableName")) acquireAndGet { table => 
          val results = table.get(gets)
          ...do something with these results
        }
     }    
}

2)您可以使用一些更具体的级联代码,在其中编写自定义方案,并在其中覆盖源方法,并可能根据需要覆盖其他一些方法。在那里你可以像这样访问 JobConf:

class MyScheme extends Scheme[JobConf, SomeRecordReader, SomeOutputCollector, ..] {

  @transient var jobConf: Configuration = super.jobConfiguration

  override def source(flowProcess: FlowProcess[JobConf], ...): Boolean = {
   jobConf = flowProcess match {
     case h: HadoopFlowProcess => h.getJobConf
     case _ => jconf
   }

   ... dosomething with the jobConf here

 }   

}
于 2014-12-17T20:35:32.457 回答