我的程序需要每天分析从每个应用程序服务器每小时生成的大量日志文件。
因此,如果我有 2 个应用服务器,我将处理 48 个文件(24 个文件 * 2 个应用服务器)。
文件大小范围为 100-300 mb。每个文件中的每一行都是一个日志条目,其格式为
[标识符]-[件数]-[件]-[部分日志]
例如
xxx-3-1-ABC
xxx-3-2-ABC
xxx-3-3-ABC
这些可以分布在我提到的 48 个文件上,我需要像这样合并这些日志
xxx-PAIR-ABCABCABC
我的实现使用线程池并行读取文件,然后使用 ConcurrentHashMap 聚合它们
我定义了一个类 LogEvent.scala
class LogEvent (val id: String, val total: Int, var piece: Int, val json: String) {
var additions: Long = 0
val pieces = new Array[String](total)
addPiece(json)
private def addPiece (json: String): Unit = {
pieces(piece) = json
additions += 1
}
def isDone: Boolean = {
additions == total
}
def add (slot: Int, json: String): Unit = {
piece = slot
addPiece(json)
}
主要处理发生在多个线程上,代码类似于
//For each file
val logEventMap = new ConcurrentHashMap[String, LogEvent]().asScala
Future {
Source.fromInputStream(gis(file)).getLines().foreach {
line =>
//Extract the id part of the line
val idPart: String = IDPartExtractor(line)
//Split line on '-'
val split: Array[String] = idPart.split("-")
val id: String = split(0) + "-" + split(1)
val logpart: String = JsonPartExtractor(line)
val total = split(2) toInt
val piece = split(3) toInt
def slot: Int = {
piece match {
case x if x - 1 < 0 => 0
case _ => piece - 1
}
}
def writeLogEvent (logEvent: LogEvent): Unit = {
if (logEvent.isDone) {
//write to buffer
val toWrite = id + "-PAIR-" + logEvent.pieces.mkString("")
logEventMap.remove(logEvent.id)
writer.writeLine(toWrite)
}
}
//The LOCK
appendLock {
if (!logEventMap.contains(id)) {
val logEvent = new LogEvent(id, total, slot, jsonPart)
logEventMap.put(id, logEvent)
//writeLogEventToFile()
}
else {
val logEvent = logEventMap.get(id).get
logEvent.add(slot, jsonPart)
writeLogEvent(logEvent)
}
}
}
}
主线程阻塞,直到所有期货完成
使用这种方法,我已经能够将处理时间从一个多小时缩短到大约 7-8 分钟。
我的问题如下 -
- 这可以以更好的方式完成吗,我正在使用不同的线程读取多个文件,并且我需要锁定发生聚合的块,有没有更好的方法来做到这一点?
- Map 在内存中增长非常快,对于这种用例的堆外存储的任何建议
- 还有别的建议吗。
谢谢