1

我的程序需要每天分析从每个应用程序服务器每小时生成的大量日志文件。

因此,如果我有 2 个应用服务器,我将处理 48 个文件(24 个文件 * 2 个应用服务器)。

文件大小范围为 100-300 mb。每个文件中的每一行都是一个日志条目,其格式为

[标识符]-[件数]-[件]-[部分日志]

例如

xxx-3-1-ABC
xxx-3-2-ABC
xxx-3-3-ABC

这些可以分布在我提到的 48 个文件上,我需要像这样合并这些日志

xxx-PAIR-ABCABCABC

我的实现使用线程池并行读取文件,然后使用 ConcurrentHashMap 聚合它们

我定义了一个类 LogEvent.scala

class LogEvent (val id: String, val total: Int, var piece: Int, val json: String) {

  var additions: Long = 0
  val pieces = new Array[String](total)
  addPiece(json)


  private def addPiece (json: String): Unit = {
    pieces(piece) = json
    additions += 1
  }

  def isDone: Boolean = {
    additions == total
  }


  def add (slot: Int, json: String): Unit = {
    piece = slot
    addPiece(json)
  } 

主要处理发生在多个线程上,代码类似于

//For each file
val logEventMap = new ConcurrentHashMap[String, LogEvent]().asScala
Future {
          Source.fromInputStream(gis(file)).getLines().foreach {
            line =>

                  //Extract the id part of the line
                  val idPart: String = IDPartExtractor(line)
                  //Split line on '-'
                  val split: Array[String] = idPart.split("-")



                    val id: String = split(0) + "-" + split(1)
                    val logpart: String = JsonPartExtractor(line)
                    val total = split(2) toInt
                    val piece = split(3) toInt

                    def slot: Int = {
                      piece match {
                        case x if x - 1 < 0 => 0
                        case _ => piece - 1
                      }
                    }

                    def writeLogEvent (logEvent: LogEvent): Unit = {
                      if (logEvent.isDone) {
                        //write to buffer
                        val toWrite = id + "-PAIR-" + logEvent.pieces.mkString("")
                        logEventMap.remove(logEvent.id)
                        writer.writeLine(toWrite)
                      }
                    }

                    //The LOCK
                    appendLock {
                      if (!logEventMap.contains(id)) {
                        val logEvent = new LogEvent(id, total, slot, jsonPart)
                        logEventMap.put(id, logEvent)
                        //writeLogEventToFile()
                      }
                      else {
                        val logEvent = logEventMap.get(id).get
                        logEvent.add(slot, jsonPart)
                        writeLogEvent(logEvent)

                      }
                    } 
                }
          }

主线程阻塞,直到所有期货完成

使用这种方法,我已经能够将处理时间从一个多小时缩短到大约 7-8 分钟。

我的问题如下 -

  1. 这可以以更好的方式完成吗,我正在使用不同的线程读取多个文件,并且我需要锁定发生聚合的块,有没有更好的方法来做到这一点?
  2. Map 在内存中增长非常快,对于这种用例的堆外存储的任何建议
  3. 还有别的建议吗。

谢谢

4

2 回答 2

1

执行此操作的常用方法是对每个文件进行排序,然后合并排序后的文件。结果是一个文件,其中包含按您想要的顺序排列的各个项目。然后,您的程序只需对文件进行一次遍历,合并相邻的匹配项。

这有一些非常有吸引力的好处:

  1. 排序/合并由您不必编写的标准工具完成
  2. 您的聚合器程序非常简单。或者,甚至可能有一个标准工具可以做到这一点。
  3. 内存需求减少。排序/合并程序知道如何管理内存,并且聚合程序的内存需求最小。

当然也有一些缺点。您将使用更多磁盘空间,并且由于 I/O 成本,该过程会稍微慢一些。

当我遇到这样的事情时,我几乎总是使用标准工具和一个简单的聚合器程序。我从自定义程序中获得的更高性能并不能证明开发该东西所需的时间是合理的。

于 2013-08-22T15:40:32.270 回答
0

对于这种事情,如果可以,请使用Splunk,如果没有,请复制它所做的事情,即索引日志文件以便稍后按需聚合。

对于堆外存储,请查看分布式缓存 - Hazelcast 或 Coherence。两种支持都提供java.util.Map了存储在多个 JVM 上的实现。

于 2013-08-22T09:49:59.290 回答