5

我们有相当大的文件,大约 1-1.5 GB 与原始数据相结合(主要是日志文件),这些原始数据很容易解析为 csv,随后应该将其绘制成图形以生成一组图形图像。

目前,我们正在使用 bash 脚本将原始数据转换为 csv 文件,其中仅包含需要绘制的数字,然后将其输入到 gnuplot 脚本中。但是这个过程非常缓慢。我试图通过用单个命令替换一些管道cuts、 s 等来加速 bash 脚本,虽然这提高了速度,但整个事情仍然很慢。trawk

所以,我开始相信这个过程有更好的工具。我目前正在寻找用 python+numpy 或 R 重写这个过程。我的一个朋友建议使用 JVM,如果我要这样做,我会使用 clojure,但不确定 JVM 将如何执行。

我在处理这类问题方面没有太多经验,所以任何关于如何进行的建议都会很棒。谢谢。

编辑:另外,我想存储(到磁盘)生成的中间数据,即 csv,所以我不必重新生成它,如果我选择我想要一个不同的图形。

编辑 2:原始数据文件每行有一条记录,其字段由分隔符 ( |) 分隔。并非所有字段都是数字。我在输出 csv 中需要的每个字段都是通过对输入记录应用某个公式获得的,该公式可能使用输入数据中的多个字段。输出 csv 每行将有 3-4 个字段,我需要在(可能是)条形图中绘制 1-2、1-3、1-4 字段的图表。我希望这能提供更好的画面。

编辑 3:我稍微修改了@adirau 的脚本,它似乎运行良好。我已经走得够远了,我正在读取数据,发送到处理器线程池(伪处理,将线程名称附加到数据),并通过另一个收集器线程将其聚合到输出文件中。

PS:我不确定这个问题的标签,请随时更正。

4

2 回答 2

4

python 听起来是一个不错的选择,因为它有一个很好的线程 API(虽然实现是有问题的)、matplotlib 和 pylab。我错过了你的更多规范,但也许这对你来说是一个很好的起点:matplotlib: async plotting with threads. 我会选择一个线程来处理批量磁盘 i/o 读取并将队列同步到线程池以进行数据处理(如果您有固定的记录长度,通过预先计算读取偏移量并将偏移量传递给线程池,事情可能会变得更快) ; 使用 diskio 线程,我将映射数据源文件,读取预定义的 num 字节 + 一次读取,最终将最后一个字节抓取到当前数据源 lineinput 的末尾;numbytes 应该选择在您的平均 lineinput 长度附近的某个地方;接下来是通过队列进行池馈送以及在线程池中进行的数据处理/绘图;我这里没有一张好照片(你到底在画什么),但我希望这会有所帮助。

编辑:有 file.readlines([sizehint]) 一次抓取多行;好吧,它可能不会那么快,因为文档说它在内部使用 readline()

编辑:一个快速的骨架代码

import threading
from collections import deque
import sys
import mmap


class processor(Thread):
    """
        processor gets a batch of data at time from the diskio thread
    """
    def __init__(self,q):
        Thread.__init__(self,name="plotter")
        self._queue = q
    def run(self):
        #get batched data 
        while True:
            #we wait for a batch
            dataloop = self.feed(self._queue.get())
            try:
                while True:
                    self.plot(dataloop.next())
            except StopIteration:
                pass
            #sanitizer exceptions following, maybe

    def parseline(self,line):
        """ return a data struct ready for plotting """
        raise NotImplementedError

    def feed(self,databuf):
        #we yield one-at-time datastruct ready-to-go for plotting
        for line in databuf:
            yield self.parseline(line)

    def plot(self,data):
        """integrate
        https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
        maybe
        """
class sharedq(object):
    """i dont recall where i got this implementation from 
    you may write a better one"""
    def __init__(self,maxsize=8192):
        self.queue = deque()
        self.barrier = threading.RLock()
        self.read_c = threading.Condition(self.barrier)
        self.write_c = threading.Condition(self.barrier)
        self.msz = maxsize
    def put(self,item):
        self.barrier.acquire()
        while len(self.queue) >= self.msz:
            self.write_c.wait()
        self.queue.append(item)
        self.read_c.notify()
        self.barrier.release()
    def get(self):
        self.barrier.acquire()
        while not self.queue:
            self.read_c.wait()
        item = self.queue.popleft()
        self.write_c.notify()
        self.barrier.release()
        return item



q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
    p = processor(q)
    p.start()
for fn in sys.argv[1:]
    with open(fn, "r+b") as f:
        #you may want a better sizehint here
        map = mmap.mmap(f.fileno(), 0)
        #insert a loop here, i forgot
        q.put(map.readlines(numbytes))

#some cleanup code may be desirable
于 2011-03-29T07:22:29.507 回答
1

我认为 python+Numpy 将是最有效的方式,就速度和易于实施而言。Numpy经过高度优化,因此性能不错,而python会简化算法实现部分。

这个组合应该适用于您的情况,前提是您优化内存上文件的加载,尝试找到处理数据块之间的中间点,该数据块不是太大但足够大以最小化读取和写入周期,因为这是什么会减慢程序的速度

如果您觉得这需要更多加速(我真诚地怀疑),您可以使用 Cython 来加速缓慢的部分。

于 2011-03-29T07:33:33.560 回答