15

我需要创建一个可以接收和存储 SMTP 消息的类,即 E-Mails。为此,我asyncore根据此处发布的示例使用。但是,asyncore.loop()是阻塞的,所以我不能在代码中做任何其他事情。

所以我想到了使用线程。这是一个示例代码,显示了我的想法:

class MyServer(smtpd.SMTPServer):
    # derive from the python server class

    def process_message(..):
        # overwrite a smtpd.SMTPServer method to be able to handle the received messages
        ...
        self.list_emails.append(this_email)

    def get_number_received_emails(self):
        """Return the current number of stored emails"""
        return len(self.list_emails)


    def start_receiving(self):
        """Start the actual server to listen on port 25"""

        self.thread =   threading.Thread(target=asyncore.loop)
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver from itself
        self.close()
        self.thread.join()

我希望你能得到这张照片。该类MyServer应该能够以非阻塞方式开始和停止侦听端口 25,能够在侦听(或不侦听)时查询消息。该start方法启动asyncore.loop()侦听器,当接收到电子邮件时,将其附加到内部列表。类似地,该stop方法应该能够停止该服务器,如此处所建议

尽管这段代码没有像我期望的那样工作(asyncore 似乎永远运行,即使我调用了上面的stop方法。error我 raise 被捕获在 内stop,但不在包含 的target函数内asyncore.loop()),我不确定我的方法是否问题是有道理的。对于修复上述代码或提出更可靠的实现(使用第三方软件)的任何建议,我们都表示赞赏。

4

5 回答 5

17

提供的解决方案可能不是最复杂的解决方案,但它工作合理并且已经过测试。

首先,正如用户Wessie在之前的评论中指出的那样asyncore.loop(),它会一直阻塞直到所有asyncore频道都关闭。参考前面提到的smtp 示例,结果是继承自(如smtpd 文档中所述),它回答了要关闭哪个通道的问题。smtpd.SMTPServerasyncore.dispatcher

因此,可以使用以下更新的示例代码来回答原始问题:

class CustomSMTPServer(smtpd.SMTPServer):
    # store the emails in any form inside the custom SMTP server
    emails = []
    # overwrite the method that is used to process the received 
    # emails, putting them into self.emails for example
    def process_message(self, peer, mailfrom, rcpttos, data):
        # email processing


class MyReceiver(object):
    def start(self):
        """Start the listening service"""
        # here I create an instance of the SMTP server, derived from  asyncore.dispatcher
        self.smtp = CustomSMTPServer(('0.0.0.0', 25), None)
        # and here I also start the asyncore loop, listening for SMTP connection, within a thread
        # timeout parameter is important, otherwise code will block 30 seconds after the smtp channel has been closed
        self.thread =  threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} )
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver to ensure no channels connect to asyncore
        self.smtp.close()
        # now it is save to wait for the thread to finish, i.e. for asyncore.loop() to exit
        self.thread.join()

    # now it finally it is possible to use an instance of this class to check for emails or whatever in a non-blocking way
    def count(self):
        """Return the number of emails received"""
        return len(self.smtp.emails)        
    def get(self):
        """Return all emails received so far"""
        return self.smtp.emails
    ....

所以最后,我有start一个stop方法可以在非阻塞环境中启动和停止监听端口 25。

于 2013-01-24T08:24:54.940 回答
4

来自另一个问题asyncore.loop 在没有更多连接时不会终止

我认为你对线程的思考有点过头了。asyncore.loop使用来自另一个问题的代码,您可以通过以下代码片段启动一个运行的新线程:

import threading

loop_thread = threading.Thread(target=asyncore.loop, name="Asyncore Loop")
# If you want to make the thread a daemon
# loop_thread.daemon = True
loop_thread.start()

这将在一个新线程中运行它,并将继续运行,直到所有asyncore通道都关闭。

于 2013-01-23T17:40:49.457 回答
3

您应该考虑改用 Twisted。 http://twistedmatrix.com/trac/browser/trunk/doc/mail/examples/emailserver.tac演示了如何使用可自定义的交付挂钩设置 SMTP 服务器。

于 2013-01-23T16:32:11.920 回答
0

亚历克斯的答案是最好的,但对于我的用例来说是不完整的。我想测试 SMTP 作为单元测试的一部分,这意味着在我的测试对象中构建假 SMTP 服务器,并且服务器不会终止 asyncio 线程,所以我必须添加一行将其设置为守护线程以允许其余的单元测试在不阻塞等待该异步线程加入的情况下完成。我还添加了所有电子邮件数据的完整记录,以便我可以断言通过 SMTP 发送的任何内容。

这是我的假 SMTP 类:

class TestingSMTP(smtpd.SMTPServer):
    def __init__(self, *args, **kwargs):
        super(TestingSMTP, self).__init__(*args, **kwargs)
        self.emails = []

    def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
        msg = {'peer': peer,
               'mailfrom': mailfrom,
               'rcpttos': rcpttos,
               'data': data}
        msg.update(kwargs)
        self.emails.append(msg)


class TestingSMTP_Server(object):

    def __init__(self):
        self.smtp = TestingSMTP(('0.0.0.0', 25), None)
        self.thread = threading.Thread()

    def start(self):
        self.thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
        self.thread.daemon = True
        self.thread.start()

    def stop(self):
        self.smtp.close()
        self.thread.join()

    def count(self):
        return len(self.smtp.emails)

    def get(self):
        return self.smtp.emails

下面是 unittest 类调用它的方式:

smtp_server = TestingSMTP_Server()
smtp_server.start()

# send some emails

assertTrue(smtp_server.count() == 1) # or however many you intended to send
assertEqual(self.smtp_server.get()[0]['mailfrom'], 'first@fromaddress.com')

# stop it when done testing
smtp_server.stop()
于 2019-02-25T03:18:48.943 回答
0

万一其他人需要更充实一点,这就是我最终使用的。这将 smtpd 用于电子邮件服务器,将 smtpblib 用于电子邮件客户端,使用 Flask 作为 http 服务器 [ gist ]:

应用程序.py

from flask import Flask, render_template
from smtp_client import send_email
from smtp_server import SMTPServer

app = Flask(__name__)

@app.route('/send_email')
def email():
  server = SMTPServer()
  server.start()
  try:
    send_email()
  finally:
    server.stop()
  return 'OK'

@app.route('/')
def index():
  return 'Woohoo'

if __name__ == '__main__':
  app.run(debug=True, host='0.0.0.0')

smtp_server.py

# smtp_server.py
import smtpd
import asyncore
import threading

class CustomSMTPServer(smtpd.SMTPServer):
  def process_message(self, peer, mailfrom, rcpttos, data):
    print('Receiving message from:', peer)
    print('Message addressed from:', mailfrom)
    print('Message addressed to:', rcpttos)
    print('Message length:', len(data))
    return

class SMTPServer():
  def __init__(self):
    self.port = 1025

  def start(self):
    '''Start listening on self.port'''
    # create an instance of the SMTP server, derived from  asyncore.dispatcher
    self.smtp = CustomSMTPServer(('0.0.0.0', self.port), None)
    # start the asyncore loop, listening for SMTP connection, within a thread
    # timeout parameter is important, otherwise code will block 30 seconds
    # after the smtp channel has been closed
    kwargs = {'timeout':1, 'use_poll': True}
    self.thread = threading.Thread(target=asyncore.loop, kwargs=kwargs)
    self.thread.start()

  def stop(self):
    '''Stop listening to self.port'''
    # close the SMTPserver to ensure no channels connect to asyncore
    self.smtp.close()
    # now it is safe to wait for asyncore.loop() to exit
    self.thread.join()

  # check for emails in a non-blocking way
  def get(self):
    '''Return all emails received so far'''
    return self.smtp.emails

if __name__ == '__main__':
  server = CustomSMTPServer(('0.0.0.0', 1025), None)
  asyncore.loop()

smtp_client.py

import smtplib
import email.utils
from email.mime.text import MIMEText

def send_email():
  sender='author@example.com'
  recipient='6142546977@tmomail.net'

  msg = MIMEText('This is the body of the message.')
  msg['To'] = email.utils.formataddr(('Recipient', recipient))
  msg['From'] = email.utils.formataddr(('Author', 'author@example.com'))
  msg['Subject'] = 'Simple test message'

  client = smtplib.SMTP('127.0.0.1', 1025)
  client.set_debuglevel(True) # show communication with the server
  try:
    client.sendmail('author@example.com', [recipient], msg.as_string())
  finally:
    client.quit()

然后启动服务器python app.py并在另一个请求中模拟一个对/send_emailwith的请求curl localhost:5000/send_email。请注意,要实际发送电子邮件(或短信),您需要跳过此处详述的其他环节:https ://blog.codinghorror.com/so-youd-like-to-send-some-email-through-code /

于 2019-09-11T12:21:32.543 回答