2

我编写了这个脚本,它使用 exchangelib python API 登录到我的 Microsoft Exchange 帐户并下载包含特定主题的已发送消息。

我将它分成两个函数,每个函数作为一个单独的进程运行:

功能 A 从我的帐户中获取电子邮件并将与特定主题匹配的电子邮件放入队列中。

功能 B 从队列中获取这些电子邮件并将它们保存到磁盘。

这些函数在单个进程中运行时可以正常工作,但是当我决定在子进程中运行函数 A 而函数 B 在主进程中运行时,函数 A 只是挂在下载电子邮件的行。

我的代码如下。

from exchangelib import Credentials, Account, Mailbox, HTMLBody, Message

import multiprocessing as mp
from multiprocessing import Process, Queue


def authenticate(user_id, passw):
    credentials = Credentials(user_id, passw)
    exchange = Account(account_id, credentials=credentials, autodiscover=True)
    return exchange

def fetch_bounces(exchange, output_q, progress):
    print('fetcher process started')
    while exchange.inbox.total_count != 0:
        exchange.inbox.refresh()
        inbox=exchange.inbox.all()
        for msg in inbox:
            if 'Undeliverable:' in msg.subject:
                output_q.put(msg.display_to)
        inbox.move_to_trash()
    progress.put('done')

def save_bounced(outputfile, output_q):
    bounces = []
    while output_q.empty() is False:
        lead = output_q.get()
        print('bounce: '+lead)
        bounces.append(lead)
    if len(bounces)>0:
        existing = []
        try:
            with open(outputfile, 'r+') as f:
                lines = f.readlines()
                existing = [line.strip().lower() for line in lines]
        except:
            pass
        with open(outputfile, 'a+') as f:
            for line in bounces:
                if line.strip().lower() in existing:
                    continue
                else:
                    f.write(line.strip().lower()+'\n')

if __name__ == '__main__':
    #credentials
    account_id =
    password = 


    #set Queues to store bounces and progress
    bounced = Queue()
    progress = Queue()

    #login to exhcnage
    exchange = authenticate(account_id, password)
    print('logged in successfully')

    #spawn bounce fetching process and start it
    f_process = Process(target=fetch_bounces, args=(exchange, bounced, progress))
    f_process.start()

    #define file path where bounces will be stored
    output_f = 'bounces.txt'

    #while new bounce emails are being fetch, remain in loop and also specify flag to know when all emails have been fetched

    complete = False
    while not complete:
        save_bounced(output_f, bounced)
        try:
            msg = progress.get(block=False)
            if msg == 'done':
                complete =  True
        except:
            pass

    #terminate fetcher process

    f_process.join()

    print('bounces completely removed')
4

1 回答 1

0

当您使用多处理时,所有参数都必须是可序列化的。但是Account不能安全地序列化实例。您必须Account在每个进程中创建实例。

于 2019-01-03T12:29:23.213 回答