0

我有一个名为“未处理”的表,我想读取 2000 行,通过 HTTP 将它们发送到另一台服务器,然后将这些行插入“已处理”表并将它们从“未处理”表中删除。

我的python代码大致是这样的:

db = MySQLdb.connect("localhost","username","password","database" )

# prepare a cursor object using cursor() method
cursor = db.cursor()

# Select all the records not yet sent
sql = "SELECT * from unprocessed where SupplierIDToUse = 'supplier1' limit 0, 2000"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
  id = row[0]
  <code is here here for sending to other server - it takes about 1/2 a second>
  if sentcorrectly="1":
     sql = "INSERT into processed (id, dateprocessed) VALUES ('%s', NOW()')" % (id)
     try:
        inserted = cursor.execute(sql)
     except:
        print "Failed to insert"
     if inserted:
        print "Inserted"
        sql = "DELETE from unprocessed where id = '%s'" % (id)
        try:
            deleted = cursor.execute(sql)
        except:
            print "Failed to delete id from the unprocessed table, even though it was saved in the processed table."
db.close()
sys.exit(0)

我希望能够同时运行此代码,以便提高通过 HTTP 将这些记录发送到其他服务器的速度。目前,如果我尝试同时运行代码,我会在另一个服务器上发送相同数据的多个副本并保存到“已处理”表中,因为选择查询在代码的多个实例中获得相同的 id。

如何在选择记录时锁定记录,然后将每条记录作为一行处理,然后再将它们移动到“已处理”表?该表是 MyISAM,但我今天已转换为 innoDB,因为我意识到可能有一种方法可以更好地使用 innoDB 锁定记录。

4

1 回答 1

1

根据您的评论回复。

两种解决方案之一是客户端 python 主进程,用于收集所有 2000 条记录的记录 ID,然后将其分成块以供子工作人员处理。

短版,您的选择是委派工作或依赖可能棘手的资产锁定机制。我会推荐前一种方法,因为它可以借助消息队列进行扩展。

委托逻辑将使用多处理

import multiprocessing
records = get_all_unprocessed_ids()
pool = multiprocessing.Pool(5) #create 5 workers
pool.map(process_records, records) 

这将创建 2000 个任务并一次运行 5 个任务,或者您可以使用此处概述的解决方案将记录拆分为块 如何将列表拆分为大小均匀的块?

pool.map(process_records, chunks(records, 100)) 

将创建 20 个包含 100 条记录的列表,这些记录将以 5 条为一组进行处理

编辑:语法错误 - 签名是 map(func, iterable[, chunksize]) 我省略了 func 的参数。

于 2013-04-16T17:01:13.247 回答