阅读上述帖子后,我能够创建自定义 FileInputFormat 类来读取 excel.xlsx 文件并将其流式传输到 flink。代码如下
/*
* Custom format output I was expecting from the records converted into records
*/
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,...., FileName : String)
/*
* Custom class to read Excel spreadsheet using flink FileInputFormat class
*/
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{
var running : Boolean = false
var excelData : Seq[ExcelSheetData] = null
unsplittable = true
override def open(fileSplit : FileInputSplit) = {
println(fileSplit.getPath.toString.drop(6))
val myFile = new File(fileSplit.getPath.toString.drop(6))
val fileName = fileSplit.getPath.toString.drop(6)
val fis = new FileInputStream(myFile)
try{
val myWorkbook = new XSSFWorkbook(fis)
// println(s"Sheet Name: ${mySheet.getSheetName()}")
// reading multiple sheets having identical data
val mySheets = myWorkbook.iterator().asScala
val exData = for(s <- mySheets
if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield {
val rowItr = s.rowIterator().asScala
for(e <- rowItr
if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
(e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
,fileName)
}
}
excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})
running = if(excelData.length >= 1) true else false
} finally { fis.close()}
}
override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = {
val head = excelData.head
excelData = excelData.tail
running = if (excelData.length == 0) false else true
head
}
override def reachedEnd(): Boolean = ! running
}
/*
* Initialize custom class to read Excel Input
*/
val excelInput = new ExcelInputFormat()
// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.uid("Excel File Read")
//Windowing code down below...