2

我们使用 Scalding 进行 ETL 并将输出生成为带有分区的 Hive 表。因此,我们希望分区的目录名称类似于“state=CA”。我们使用 TemplatedTsv 如下:

pipe
   // some other ETL
   .map('STATE -> 'hdfs_state) { state: Int => "State=" + state }
   .groupBy('hdfs_state) { _.pass }
   .write(TemplatedTsv(baseOutputPath, "%s", 'hdfs_state,
          writeHeader = false,
          sinkMode = SinkMode.UPDATE,
          fields = ('all except 'hdfs_state)))

我们采用How to bucket outputs in Scalding 中的代码示例。这是我们遇到的两个问题:

  • 除了IntelliJ 无法解决:我错过了一些进口吗?我们不想在“fields = ()”语句中显式输入所有字段,因为字段是从 groupBy 语句中的代码派生的。如果明确输入,它们很容易不同步。
  • 这种方法看起来太 hacky,因为我们正在创建一个额外的列,以便 Hive/Hcatalog 可以处理目录名称。我们想知道实现它的正确方法应该是什么?

非常感谢!

4

1 回答 1

0

抱歉,前面的示例是伪代码。下面我将给出一个带有输入数据示例的小代码。

请注意,这仅适用于 Scalding 0.12.0 或更高版本

让我们想象一下我们输入如下定义一些购买数据,

user1   1384034400  6   75
user1   1384038000  6   175
user2   1383984000  48  3
user3   1383958800  48  281
user3   1384027200  9   7
user3   1384027200  9   11
user4   1383955200  37  705
user4   1383955200  37  15
user4   1383969600  36  41
user4   1383969600  36  21

制表符分隔,第 3 列是州编号。这里我们有整数,但对于基于字符串的状态,您可以轻松适应。

此代码将读取输入并将它们放入“State=stateid”输出文件夹存储桶中。

class TemplatedTsvExample(args: Args) extends Job(args) {

  val purchasesPath = args("purchases")
  val outputPath    = args("output")

  // defines both input & output schema, you can also make separate for each of them
  val ioSchema = ('USERID, 'TIMESTAMP, 'STATE, 'PURCHASE)

  val Purchases =
     Tsv(purchasesPath, ioSchema)
     .read
     .map('STATE -> 'STATENAME) { state: Int => "State=" + state } // here you can make necessary changes
     .groupBy('STATENAME) { _.pass } // this is optional
     .write(TemplatedTsv(outputPath, "%s", 'STATENAME, false, SinkMode.REPLACE, ioSchema))
} 

我希望这是有帮助的。请问我是否有任何不清楚的地方。

你可以在这里找到完整的代码

于 2015-02-25T08:53:07.177 回答