0

我有两台机器:机器-A 和机器-B。我正在将一个 .csv 文件(例如:大小为 2GB 的 test.csv)从机器 A 传输(SFTP)到机器 B。我有一个 python 脚本(如下所示),它在机器 B 上 24x7 运行。该脚本监视当前文件夹,如果它看到一个 .csv 文件,它会将数据发送到另一台装有 MySQL 的机器上(如果有兴趣,请参见下面的代码)。但是,我只希望这个 python 脚本只有在 test.csv 完全从机器 A 转移到机器 B 之后才能完成它的工作。我在 stackoverflow 上找到了一个帖子,但不能完全使用它。任何想法?

Python脚本:

import MySQLdb
import os
import sys
import string
import traceback
import time  
from watchdog.observers import Observer  
from watchdog.events import PatternMatchingEventHandler 

if not os.path.exists('Archive'):
    os.mkdir("Archive")
if not os.path.exists('Failed'):
    os.mkdir("Failed")

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.csv"]

    def process(self, event):
        """
        event.event_type 
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """

        # Open database connection
        db = MySQLdb.connect (host="1.2.3.4",port=3333,user="userAdmin",\
                              passwd="passAdmin",db="tableAdmin",local_infile=1)
        cursor=db.cursor()

        #Query under testing
        sql = open('test.sql','r').read()

        # the file will be processed there
        print 'Processing .csv file '+event.src_path+' as it is '+event.event_type
        try:
            cursor.execute(sql.format(event.src_path))
            db.commit()
            print 'Executed and Commited the QUERY on .csv file '+event.src_path
            os.rename(event.src_path,(event.src_path).split('/')[0]+'/Archive/'+(event.src_path).split('/')[1]+'.archive')
        except:
            # Rollback in case there is any error
            os.rename(event.src_path,(event.src_path).split('/')[0]+'/Failed/'+(event.src_path).split('/')[1]+'.fail-db')
            print 'ERROR-See Traceback message below'
            traceback.print_exc()
            db.rollback()   

        db.close()
        print "Disconnected from the MySQL server"
        print '-----------#*#*#*-----------#*#*#*-----------'

    def on_created(self, event):
        self.process(event)

if __name__ == '__main__':
    observer = Observer()
    observer.schedule(MyHandler(), path='.')
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()
4

2 回答 2

1

您可以在传输 CSV 后传输另一个(零字节)文件,并注意它。您的脚本可以在完成后将其删除。

目前,您的脚本实际上不可能知道传输何时完成,因为它不知道文件的大小。来自机器 A 的某种其他触发器将是必要的。

您也可以在机器 A 上运行此代码,连接到机器 B 上的 MySQL,从而无需 SFTP 传输。(如果 MySQL 通信在它们之间是可能的且安全的。)

于 2016-01-04T20:33:23.667 回答
0

您可以覆盖 MyHandler 类的一些方法,这可以解决您的问题。

1)on_created()
当在给定目录中创建(完成传输)新文件时触发它,这反过来又调用 process 函数来处理文件。

这个方法对我有用,我正在使用 bt-sync。

2)on_moved()
当有新文件被rsynced时,它会被触发,这反过来会调用 process 函数来处理文件。

您可以修改代码以覆盖on_created方法:-

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.csv"]
    def on_created(self, event):
        self.process(event)
    def on_modified(self, event):
        print "file modified " + event.src_path
        self.process(event)
    def on_moved(self, event):
        print "file moved" + event.src_path
        self.process(event)
    def on_deleted(self, event):
        print "file deleted" + event.src_path
        self.process(event)
    def process(self, event):
        ....

更多详情http://pythonhosted.org/watchdog/api.html

于 2016-02-01T11:36:29.533 回答