1

程序 1 将一些作业插入到表 job_table 中。

方案 2 需要:

1. get the job from the table
2. handle the job
   -> this needs to be multi-threaded (because each job involves urllib waiting time, which should run in parallel)
3. insert the results into my_other_table, commiting the result

有什么好的(标准?)方法来实现这个吗?问题是在一个线程内提交,也会提交其他线程。

4

4 回答 4

1

我能够从 mysql 表中选择记录并将它们放入队列中,然后从队列中获取它们,但无法插入新的 mysql 表中。

在这里,当它们落入表格时,我只能拾取新记录。希望这可以帮助你。任何错误请帮助我。

from threading import Thread
import time
import Queue
import csv
import random
import pandas as pd
import pymysql.cursors
from sqlalchemy import create_engine
import logging

queue = Queue.Queue(1000)

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s', )

conn = pymysql.connect(conn-details)
cursor = conn.cursor()


class ProducerThread(Thread):
    def run(self):
        global queue
        cursor.execute("SELECT ID FROM multi ORDER BY ID  LIMIT 1")
        min_id = cursor.fetchall()
        min_id1 = list(min_id[0])
        while True:
            cursor.execute("SELECT ID FROM multi ORDER BY ID desc LIMIT 1")
            max_id = cursor.fetchall()
            max_id1 = list(max_id[0])
            sql = "select * from multi where ID between '{}' and '{}'".format(min_id1[0], max_id1[0])
            cursor.execute(sql)
            data = cursor.fetchall()
            min_id1[0] = max_id1[0] + 1
            for row in data:
                num = row
                queue.put(num)  # acquire();wait()
                logging.debug('Putting ' + str(num) + ' : ' + str(queue.qsize()) + ' items in queue')


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            num = queue.get()
            print num
            logging.debug('Getting ' + str(num) + ' : ' + str(queue.qsize()) + ' items in queue')
            **sql1 = """insert into multi_out(ID,clientname) values ('%s','%s')""",num[0],num[1]
            print sql1
            # cursor.execute(sql1, num)
            cursor.execute("""insert into multi_out(ID,clientname) values ('%s','%s')""",(num[0],num[1]))**
            # conn.commit()
            # conn.close()
def main():
    ProducerThread().start()
    num_of_consumers = 20
    for i in range(num_of_consumers):
        ConsumerThread().start()


main()
于 2018-08-27T09:28:11.860 回答
0

如果您有 X 个线程正在运行,定期从作业表中读取,那么 MySQL 将为您执行并发操作。或者,如果您需要更多保证,您可以在阅读下一个可用条目之前自行锁定作业表。这样,您可以 100% 确定单个作业只会被处理一次。

正如@Martin 所说,将所有线程的连接分开。他们可以使用相同的凭据。

简而言之:

  1. 程序一 -> 插入作业
  2. 程序二 -> 在作业表上创建一个写锁,这样其他人就不能从中读取
  3. 程序二 -> 读取下一个可用作业
  4. 程序二 -> 解锁表
  5. 一切照常进行,MySQL 将处理并发
于 2013-07-15T15:00:37.100 回答
0

这是进行某种网络爬虫时的常见任务。我已经实现了一个线程,它抓取一个作业,等待 http 响应,然后将响应写入数据库表。

我的方法遇到的问题是,您需要锁定从中获取作业的表,并将它们标记为正在进行或完成,以便多个线程不会尝试获取相同的任务。

刚刚在python中使用了threading.Thread并覆盖了run方法。

每个线程使用 1 个数据库连接。(python中的一些数据库库不是线程安全的)

于 2013-07-15T14:58:52.810 回答
0

可能发生的是您在两个线程之间共享 MySQL 连接。尝试在每个线程内创建一个新的 MySQL 连接。

对于程序 2,请查看http://www.celeryproject.org/ :)

于 2013-07-15T14:51:48.970 回答