我们有相当大的文件,大约为 1-1.5 GB(主要是日志文件),其中包含易于解析为 csv 的原始数据,随后应该将其绘制成图表以生成一组图形图像。
目前,我们正在使用 bash 脚本将原始数据转换为 csv 文件,其中仅包含需要绘制图表的数字,然后将其输入到 gnuplot 脚本中。但这个过程极其缓慢。我尝试通过替换一些管道来加速 bash 脚本cut
s, tr
s等与一个单一的awk
命令,虽然这样提高了速度,但是整个事情还是很慢。
所以,我开始相信这个过程有更好的工具。我目前正在考虑用 python+numpy 或 R 重写这个过程。我的一个朋友建议使用 JVM,如果我要这样做,我将使用 clojure,但我不确定 JVM 将如何执行。
我在处理此类问题方面没有太多经验,因此任何有关如何进行的建议都会很好。谢谢。
Edit:另外,我想存储(到磁盘)生成的中间数据,即 csv,所以我不必重新生成它,如果我选择我想要一个不同的图形。
Edit 2:原始数据文件每行一条记录,各字段之间用分隔符 (|
)。并非所有字段都是数字。我在输出 csv 中需要的每个字段都是通过对输入记录应用特定公式获得的,该公式可能使用输入数据中的多个字段。输出 csv 每行有 3-4 个字段,我需要在(可能是)条形图中绘制 1-2、1-3、1-4 个字段的图表。我希望这能提供更好的图片。
Edit 3:我对@adirau 的脚本做了一些修改,看起来效果很好。我已经走得足够远了,我正在读取数据,发送到处理器线程池(伪处理,将线程名称附加到数据),然后通过另一个将其聚合到输出文件中集电极 thread.
PS:我不太清楚这个问题的标签,欢迎指正。
python 听起来是一个不错的选择,因为它有一个很好的线程 API(尽管实现有问题)、matplotlib 和 pylab。我错过了您的更多规格,但这也许对您来说是一个很好的起点:matplotlib:使用线程异步绘图 https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads。
我会选择一个线程来处理批量磁盘 I/O 读取,并将队列同步到线程池进行数据处理(如果您有固定的记录长度,通过预先计算读取偏移量并将偏移量传递到线程池,事情可能会变得更快) ;使用diskio线程,我将映射数据源文件,读取预定义的num字节+再读取一次,以最终获取当前数据源行输入末尾的最后一个字节;应选择接近平均行输入长度的 numbytes;接下来是通过队列进行池馈送以及在线程池中进行的数据处理/绘图;我这里没有很好的图片(您到底在绘制什么),但我希望这会有所帮助。
编辑:有 file.readlines([sizehint]) 可以一次抓取多行;好吧,它可能不会那么快,因为文档说它在内部使用 readline()
编辑:快速骨架代码
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)