0

我制作了一个具有邮件功能(异步)的 API。当我在本地运行 API 时可以接收电子邮件,但在通过 AWS Lambda 部署时无法接收电子邮件。是否需要额外的配置?

所有进口

# coding=utf-8

import datetime 
import os
from functools import wraps
from threading import Thread


from flask import Flask, request
from flask_cors import CORS
from flask_mail import Mail, Message
from flask_oslolog import OsloLog
from flask_restful import Api, Resource

配置类定义

class BaseConfig(): #configuration Base Class
    ENV = 'development'
    MAIL_SERVER = 'HOST_STRING'
    MAIL_PORT = 'PORT_NUMBER_INT'
    MAIL_USE_SSL = True
    MAIL_USERNAME = "USERNAME"
    MAIL_PASSWORD = "PASSWORD"
    MAIL_DEFAULT_SENDER = MAIL_USERNAME
    MAIL_DEBUG = True
    MAIL_SUPPRESS_SEND = False

    # Flask
    DEBUG = True
    SECRET_KEY = os.urandom(24)
    log_file = '/var/log/APP_PROJECT/flask_g.log'


class ProdConfig(BaseConfig): # Prod Config Class
    ENV = 'production'
    DEBUG = False
    MAIL_DEBUG = False

初始化应用程序

app = Flask(__name__)
app.config.from_object(BaseConfig)
mail = Mail(app) # Initialising mail
cors = CORS(app)
api = Api(app, catch_all_404s=Flask)
log = OsloLog(app)

异步包装函数定义

def async_task(f):
    """ Takes a function and runs it in a thread """

    @wraps(f)
    def _decorated(*args, **kwargs):
        thr = Thread(target=f, args=args, kwargs=kwargs)
        thr.start()

    return _decorated


def send_email(recipients):
    sender = 'user@email.in'
    subject = 'TEST MAIL : SUBJECT'
    text_body = "TEST MAIL : TEXT_BODY"
    html_body = None
    app.logger.info("send_email(subject='{subject}', recipients=['{recp}'], text_body='{txt}')".format(sender=sender,
                                                                                                       subject=subject,
                                                                                                       recp=recipients,
                                                                                                       txt=text_body))
    msg = Message(subject, sender=sender, recipients=[recipients])
    msg.body = text_body
    msg.html = html_body

    _send_async_email(app, msg)


@async_task
def _send_async_email(flask_app, msg):
    """ Sends an send_email asynchronously
    Args:
        flask_app (flask.Flask): Current flask instance
        msg (Message): Message to send
    Returns:
        None
    """
    with flask_app.app_context():
        mail.send(msg)

资源类定义

class indexResource(Resource): # Index Resource
    def get(self):
        app.logger.info('GET : indexResource')
        return {"message": 'WORKING FINE', 'ip': request.remote_addr}


class mail_resource(Resource):
    def get(self, email): # GET '/mail/mail_to_address@email.com
        try:
            send_email(email)
            return "Mail Sent"
        except Exception as e:
            app.logger.warning('Error : {}'.format(e.message))
            return "Error {}".format(e.message)    

注册/添加资源

api.add_resource(indexResource, '/')
api.add_resource(mail_resource, '/mail/<email>')

运行脚本

if __name__ == '__main__':
    app.run()

我在 AWS-Lambda 上使用Zappa进行无服务器部署。

4

0 回答 0