1

我正在使用 Carbon 和 Ceres 作为存储方法进行 Graphite 监控。我在纠正不良数据时遇到了一些问题。看来(由于各种问题)我最终得到了重叠的文件。也就是说,由于 Carbon / Ceres 将数据存储为 timestamp@interval.slice,我可以拥有两个或多个具有重叠时间范围的文件。

有两种重叠:

File A:  +------------+        orig file
File B:      +-----+           subset
File C:          +---------+   overlap

这会导致问题,因为现有的可用工具(ceres-maintenance defrag 和 rollup)无法处理这些重叠。相反,他们跳过目录并继续前进。这显然是个问题。

4

1 回答 1

0

我创建了一个脚本来解决这个问题,如下所示:

  1. 对于子集,只需删除子集文件。

  2. 对于重叠,在下一个文件开始的位置对原始文件使用文件系统“截断”。虽然可以切断重叠文件的开头并正确重命名,但我认为这充满了危险。

我发现可以通过两种方式做到这一点:

  1. 遍历目录并遍历文件,边走边修复,找到文件子集,删除它们;

  2. 在继续之前,遍历目录并解决目录中的所有问题。这是 BY FAR 更快的方法,因为 dir walk 非常耗时。

代码:

#!/usr/bin/env python2.6
################################################################################

import io
import os
import time
import sys
import string
import logging
import unittest
import datetime
import random
import zmq
import json
import socket
import traceback
import signal
import select
import simplejson
import cPickle as pickle
import re
import shutil
import collections
from pymongo import Connection
from optparse import OptionParser
from pprint import pprint, pformat

################################################################################

class SliceFile(object):
    def __init__(self, fname):
        self.name       = fname
        basename        = fname.split('/')[-1]
        fnArray         = basename.split('@')
        self.timeStart  = int(fnArray[0])
        self.freq       = int(fnArray[1].split('.')[0])
        self.size       = None
        self.numPoints  = None
        self.timeEnd    = None
        self.deleted    = False

    def __repr__(self):
        out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
            self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints)
        return out

    def setVars(self):
        self.size       = os.path.getsize(self.name)
        self.numPoints  = int(self.size / 8)
        self.timeEnd    = self.timeStart + (self.numPoints * self.freq)

################################################################################

class CeresOverlapFixup(object):

    def __del__(self):
        import datetime
        self.writeLog("Ending at %s" % (str(datetime.datetime.today())))
        self.LOGFILE.flush()
        self.LOGFILE.close()

    def __init__(self):
        self.verbose            = False
        self.debug              = False
        self.LOGFILE            = open("ceresOverlapFixup.log", "a")
        self.badFilesList       = set()
        self.truncated          = 0
        self.subsets            = 0
        self.dirsExamined       = 0            
        self.lastStatusTime     = 0

    def getOptionParser(self):
        return OptionParser()

    def getOptions(self):
        parser = self.getOptionParser()
        parser.add_option("-d", "--debug",      action="store_true",                 dest="debug",   default=False, help="debug mode for this program, writes debug messages to logfile." )
        parser.add_option("-v", "--verbose",    action="store_true",                 dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout." )
        parser.add_option("-b", "--basedir",    action="store",      type="string",  dest="basedir", default=None,  help="base directory location to start converting." )
        (options, args)     = parser.parse_args()
        self.debug          = options.debug
        self.verbose        = options.verbose
        self.basedir        = options.basedir
        assert self.basedir, "must provide base directory."

    # Examples:
    # ./updateOperations/1346805360@60.slice
    # ./updateOperations/1349556660@60.slice
    # ./updateOperations/1346798040@60.slice

    def getFileData(self, inFilename):
        ret = SliceFile(inFilename)
        ret.setVars()
        return ret

    def removeFile(self, inFilename):
        os.remove(inFilename)
        #self.writeLog("removing file: %s" % (inFilename))
        self.subsets += 1

    def truncateFile(self, fname, newSize):
        if self.verbose:
            self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize)))
        IFD = None
        try:
            IFD = os.open(fname, os.O_RDWR|os.O_CREAT)
            os.ftruncate(IFD, newSize)
            os.close(IFD)
            self.truncated += 1
        except:
            self.writeLog("Exception during truncate: %s" % (traceback.format_exc()))
        try:
            os.close(IFD)
        except:
            pass
        return

    def printStatus(self):
        now = self.getNowTime()
        if ((now - self.lastStatusTime) > 10):
            self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated))
            self.lastStatusTime = now

    def fixupThisDir(self, inPath, inFiles):

        # self.writeLog("Fixing files in dir: %s" % (inPath))
        if not '.ceres-node' in inFiles:
            # self.writeLog("--> Not a slice directory, skipping.")
            return

        self.dirsExamined += 1            

        sortedFiles = sorted(inFiles)
        sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ]
        lastFile    = None
        fileObjList = []
        for thisFile in sortedFiles:
            wholeFilename = os.path.join(inPath, thisFile)
            try:
                curFile = self.getFileData(wholeFilename)
                fileObjList.append(curFile)
            except:
                self.badFilesList.add(wholeFilename)
                self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc()))

        # name is timeStart, really.
        fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name)

        while fileObjList:

            self.printStatus()

            changes = False
            firstFile = fileObjList[0]
            removedFiles = []
            for curFile in fileObjList[1:]:
                if (curFile.timeEnd <= firstFile.timeEnd):
                    # have subset file. elim.
                    self.removeFile(curFile.name)
                    removedFiles.append(curFile.name)
                    self.subsets += 1
                    changes = True
                    if self.verbose:
                        self.writeLog("Subset file situation.  First=%s, overlap=%s" % (firstFile, curFile))
            fileObjList = [x for x in fileObjList if x.name not in removedFiles]
            if (len(fileObjList) < 2):
                break
            secondFile = fileObjList[1]

            # LT is right.  FirstFile's timeEnd is always the first open time after first is done.
            # so, first starts@100, len=2, end=102, positions used=100,101. second start@102 == OK.
            if (secondFile.timeStart < firstFile.timeEnd):
                # truncate first file.
                # file_A (last):    +---------+
                # file_B (curr):         +----------+ 
                # solve by truncating previous file at startpoint of current file.
                newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart)
                newFile_A_datapoints = int(newLenFile_A_seconds / firstFile.freq)
                newFile_A_bytes      = int(newFile_A_datapoints) * 8
                if (not newFile_A_bytes):
                    fileObjList = fileObjList[1:]
                    continue
                assert newFile_A_bytes, "Must have size.  newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes)
                self.truncateFile(firstFile.name, newFile_A_bytes)
                if self.verbose:
                    self.writeLog("Truncate situation.  First=%s, overlap=%s" % (firstFile, secondFile))
                self.truncated += 1
                fileObjList = fileObjList[1:]
                changes = True

            if not changes:
                fileObjList = fileObjList[1:]


    def getNowTime(self):
        return time.time()


    def walkDirStructure(self):

        startTime           = self.getNowTime()
        self.lastStatusTime = startTime
        updateStatsDict     = {}
        self.okayFiles      = 0
        emptyFiles          = 0 

        for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir):
            self.printStatus()
            self.fixupThisDir(thisPath, theseFiles)
            self.dirsExamined += 1

        endTime = time.time()
        # time.sleep(11)
        self.printStatus()
        self.writeLog( "now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime))
        self.writeLog( "Done.")


    def writeLog(self, instring):
        print instring
        print >> self.LOGFILE, instring
        self.LOGFILE.flush()

    def main(self):
        self.getOptions()
        self.walkDirStructure()
于 2012-11-12T18:53:58.940 回答