1

我正在尝试对使用 SQLAlchemy 访问的 SQlite 数据库中的大约 200,000 个条目进行一些文本处理。我想将它并行化(我正在查看 Parallel Python),但我不确定该怎么做。

我想在每次处理条目时提交会话,这样如果我需要停止脚本,我就不会丢失它已经完成的工作。但是,当我尝试将 session.commit() 命令传递给回调函数时,它似乎不起作用。

from assignDB import *
from sqlalchemy.orm import sessionmaker
import pp, sys, fuzzy_substring

def matchIng(rawIng, ingreds):
maxScore = 0
choice = ""
for (ingred, parentIng) in ingreds.iteritems():
    score = len(ingred)/(fuzzy_substring(ingred,rawIng)+1)
    if score > maxScore:
        maxScore = score
        choice = ingred
        refIng = parentIng  
return (refIng, choice, maxScore)

def callbackFunc(match, session, inputTuple):
    print inputTuple
    match.refIng_id = inputTuple[0]
    match.refIng_name = inputTuple[1]
    match.matchScore = inputTuple[2]
    session.commit()

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

ingreds = {}
for synonym, parentIng in session.query(IngSyn.synonym, IngSyn.parentIng): 
    ingreds[synonym] = parentIng

jobs = []
for match in session.query(Ingredient).filter(Ingredient.refIng_id == None):
    rawIng = match.ingredient
    jobs.append((match, job_server.submit(matchIng,(rawIng,ingreds),    (fuzzy_substring,),callback=callbackFunc,callbackargs=(match,session))))

会话是从 导入的assignDB。我没有收到任何错误,只是没有更新数据库。

谢谢你的帮助。

更新 这是模糊子字符串的代码

def fuzzy_substring(needle, haystack):
    """Calculates the fuzzy match of needle in haystack,
    using a modified version of the Levenshtein distance
    algorithm.
    The function is modified from the levenshtein function
    in the bktree module by Adam Hupp"""
    m, n = len(needle), len(haystack)

    # base cases
    if m == 1:
        return not needle in haystack
    if not n:
        return m

    row1 = [0] * (n+1)
    for i in range(0,m):
        row2 = [i+1]
        for j in range(0,n):
            cost = ( needle[i] != haystack[j] )

            row2.append( min(row1[j+1]+1, # deletion
                               row2[j]+1, #insertion
                               row1[j]+cost) #substitution
                           )
        row1 = row2
    return min(row1)

我从这里得到的:Fuzzy Substring。就我而言,“needle”是大约 8000 种可能的选择之一,而 haystack 是我要匹配的原始字符串。我遍历所有可能的“针”并选择得分最高的一根。

4

2 回答 2

3

在不查看您的特定代码的情况下,可以公平地说:

  1. 使用无服务器 SQLite 和
  2. 通过并行寻求提高写入性能

是互不相容的欲望。引用SQLite 常见问题解答

… 但是,客户端/服务器数据库引擎(例如 PostgreSQL、MySQL 或 Oracle)通常支持更高级别的并发性,并允许多个进程同时写入同一个数据库。这在客户端/服务器数据库中是可能的,因为始终有一个受良好控制的服务器进程可用于协调访问。如果您的应用程序需要大量并发,那么您应该考虑使用客户端/服务器数据库。但经验表明,大多数应用程序需要的并发性远低于其设计者的想象。…</p>

即使没有 SQLAlchemy 使用的任何门控和排序。Parallel Python 作业何时完成(如果有的话)也完全不清楚。

我的建议:先让它正常工作,然后再寻找优化。尤其是当pp秘制酱汁可能根本不会给你买太多东西时,即使它工作得很好。

添加以回应评论

如果fuzzy_substring匹配是瓶颈,它似乎与数据库访问完全分离,您应该牢记这一点。在没有看到fuzzy_substring正在做什么的情况下,一个好的开始假设是您可以进行算法改进,这可能使单线程编程在计算上可行。近似字符串匹配是一个经过深入研究的问题,选择正确的算法通常比“投入更多的处理器”要好得多。

从这个意义上说,更好的是你有更干净的代码,不要浪费分割和重组问题的开销,最后有一个更可扩展和可调试的程序。

于 2012-07-15T10:05:26.320 回答
0

@msw 提供了一个很好的问题概述,提供了一种考虑并行化的一般方法。

尽管有这些评论,但这是我最终要做的工作:

from assignDB import *
from sqlalchemy.orm import sessionmaker
import pp, sys, fuzzy_substring  

def matchIng(rawIng, ingreds):
    maxScore = 0
    choice = ""
    for (ingred, parentIng) in ingreds.iteritems():
        score = len(ingred)/(fuzzy_substring(ingred,rawIng)+1)
        if score > maxScore:
            maxScore = score
            choice = ingred
            refIng = parentIng  
    return (refIng, choice, maxScore)

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

ingreds = {}
for synonym, parentIng in session.query(IngSyn.synonym, IngSyn.parentIng): 
    ingreds[synonym] = parentIng

rawIngredients = session.query(Ingredient).filter(Ingredient.refIng_id == None)
numIngredients = session.query(Ingredient).filter(Ingredient.refIng_id == None).count()
stepSize = 30

for i in range(0, numIngredients, stepSize):
    print i
    print numIngredients

    if i + stepSize > numIngredients:
        stop = numIngredients
    else:
        stop = i + stepSize

    jobs = []
    for match in rawIngredients[i:stop]:
        rawIng = match.ingredient
        jobs.append((match, job_server.submit(matchIng,(rawIng,ingreds),    (fuzzy_substring,))))

    job_server.wait()

    for match, job in jobs:
        inputTuple = job()
        print match.ingredient
        print inputTuple
        match.refIng_id = inputTuple[0]
        match.refIng_name = inputTuple[1]
        match.matchScore = inputTuple[2]
    session.commit()

本质上,我已经把问题分成了几块。并行匹配 30 个子字符串后,将结果返回并提交到数据库。我有点随意地选择了 30,所以优化这个数字可能会有所收获。它似乎已经加快了一点,因为我现在正在使用处理器中的所有 3(!) 个内核。

于 2012-07-15T18:49:27.583 回答