4

我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对于数据分析很重要。

数据集的结构:

folder1
   +-----file11
            +-----column1
            +-----column2

每个文件都包含描述一个对象的数据。文件的格式是一致的。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。

文件的大小非常小。最多只有 20 kb。每个文件夹包含大约 200 个文件。

所需的输出对象应该是:

{
    a: "folder1",              // name of parent folder
    b: "file11",               // name of content file
    c: Seq[(String, String)]   // content of file1
}

如何处理在 Scala 中读取此数据集?

4

3 回答 3

6

有两种方法可以解决这个问题:

a) 如果文件夹中的数据非常小(小于几兆字节),您可以在本地进行读取并使用该ExecutionEnvironment.fromCollection()方法将数据带入 Flink 作业。

b) 您创建一个自定义 InputFormat。InputFormat 允许解析自定义文件格式。在您的情况下,我将扩展TextInputFormat并覆盖该readRecord()方法。此方法将文件中的每一行作为字符串提供给您。然后,您可以手动解析来自 String 的数据,并将解析结果与 Tuple3 中的目录信息一起返回。您可以从filePath变量访问路径。对于带有s的递归读取文件FileInputFormat,有recursive.file.enumeration配置值。

于 2015-06-02T15:20:32.803 回答
0

阅读上述帖子后,我能够创建自定义 FileInputFormat 类来读取 excel.xlsx 文件并将其流式传输到 flink。代码如下

/*
 * Custom format output I was expecting from the records converted into records
 */
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,....,  FileName : String)

/*
 * Custom class to read Excel spreadsheet using flink FileInputFormat class 
 */
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{

    var running : Boolean = false
    var excelData : Seq[ExcelSheetData] = null
    unsplittable = true

    override def open(fileSplit : FileInputSplit) = {

        println(fileSplit.getPath.toString.drop(6))
        val myFile = new File(fileSplit.getPath.toString.drop(6)) 
        val fileName = fileSplit.getPath.toString.drop(6)

        val fis = new FileInputStream(myFile)
        try{
            val myWorkbook = new XSSFWorkbook(fis)

            // println(s"Sheet Name: ${mySheet.getSheetName()}")
            // reading multiple sheets having identical data
            val mySheets = myWorkbook.iterator().asScala   
            val exData = for(s <- mySheets
                            if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield  {
                            val rowItr = s.rowIterator().asScala
                                for(e <- rowItr
                                    if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
                                    (e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
                                    ,fileName)
                            }
                }

            excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})

            running = if(excelData.length >= 1) true else false

        } finally { fis.close()}

    }

    override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = { 
        val head = excelData.head 
        excelData = excelData.tail 
        running = if (excelData.length == 0) false else true 
        head 
    }

    override def reachedEnd(): Boolean = ! running  

}

/*
 * Initialize custom class to read Excel Input
 */
val excelInput = new ExcelInputFormat()

// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
                            .uid("Excel File Read")

//Windowing code down below... 
于 2017-12-21T00:35:37.363 回答
0

HadoopOffice 库原生支持 Flink Table API 以及用于 Excel 文件的 Flink DataSource/DataSink。

https://github.com/ZuInnoTe/hadoopoffice/wiki

于 2018-03-18T14:49:41.430 回答