1

我下载了一个大的 CSV 远程文件,并希望在所有行进入时将它们推入 MySQL。我csv.reader用来解析远程文件。我将这些行按 1000 批添加到 MySQL 中。

问题是与对等方的连接在 5 分钟后超时,尽管文件可以在不到一分钟的时间内下载,但推送到 MySQL 需要更多时间。

有没有办法让下载作业和推送作业异步工作,以便与对等方的连接不等待 mySQL 约束?

我想避免

  1. 如果不需要,下载内存中的完整文件
  2. 一下载第一行就开始推入mysql
  3. 不得不处理临时文件

基本上,我希望我的 python 脚本执行类似curl file | my_script_that_pushes_values.sh.

这是我所做的说明:

csvReader = csv.reader(distantfile)
valuesBuffer = []
for row in csvReader:
  valuesBuffer.append(getValues(row))
  if len(valuesBuffer) % 1000 = 0:
    pushValuesIntoMySQL(valuesBuffer)
    valuesBuffer = []
pushValuesIntoMySQL(valuesBuffer)
4

1 回答 1

2

我会将整个文件复制到您的服务器然后使用LOAD DATA LOCAL INFILE,因为它支持 csv 输入:

LOAD DATA INFILE 'data.txt' INTO TABLE tbl_name
  FIELDS TERMINATED BY ',' ENCLOSED BY '"'
  LINES TERMINATED BY '\r\n'
  IGNORE 1 LINES;

如果您不喜欢此解决方案,您可以使用mysql_ping()(希望您使用的连接器支持它)自动重新连接。

检查与服务器的连接是否正常。如果连接已断开并且启用了自动重新连接,则会尝试重新连接。如果连接断开并且禁用了自动重新连接,则 mysql_ping() 返回错误。


如果你有问题,你可以下载文件但由于 MySQL 的延迟而超时,你可以在两个线程中运行它并同步它queue

# Prepare queue and end signaling handler
q = queue.Queue()
done = threading.Event()

# Function that fetches items from q and puts them into db after
# certain amount is reached
def store_db():
    items=[]

    # Until we set done
    while not done.is_set():
        try:
            # We may have 500 records and thread be done... prevent deadlock
            items.append(q.get(timeout=5))
            if len(items) > 1000:
                insert_into(items)
                items = []
            q.task_done()
         # If you wait longer then 5 seconds < exception
         except queue.Empty: pass

    if items:
        insert_into(items)

# Fetch all data in a loop
def continous_reading():
    # Fetch row
    q.put(row)

# Start storer thread
t = threading.Thread(target=store_db)
t.daemon = True
t.start()

continous_reading()
q.join() # Wait for all task to be processed
done.set() # Signal store_db that it can terminate
t.join() # to make sure the items buffer is stored into the db
于 2013-04-27T21:40:25.763 回答