我制作了一个具有邮件功能(异步)的 API。当我在本地运行 API 时可以接收电子邮件,但在通过 AWS Lambda 部署时无法接收电子邮件。是否需要额外的配置?
所有进口
# coding=utf-8
import datetime
import os
from functools import wraps
from threading import Thread
from flask import Flask, request
from flask_cors import CORS
from flask_mail import Mail, Message
from flask_oslolog import OsloLog
from flask_restful import Api, Resource
配置类定义
class BaseConfig(): #configuration Base Class
ENV = 'development'
MAIL_SERVER = 'HOST_STRING'
MAIL_PORT = 'PORT_NUMBER_INT'
MAIL_USE_SSL = True
MAIL_USERNAME = "USERNAME"
MAIL_PASSWORD = "PASSWORD"
MAIL_DEFAULT_SENDER = MAIL_USERNAME
MAIL_DEBUG = True
MAIL_SUPPRESS_SEND = False
# Flask
DEBUG = True
SECRET_KEY = os.urandom(24)
log_file = '/var/log/APP_PROJECT/flask_g.log'
class ProdConfig(BaseConfig): # Prod Config Class
ENV = 'production'
DEBUG = False
MAIL_DEBUG = False
初始化应用程序
app = Flask(__name__)
app.config.from_object(BaseConfig)
mail = Mail(app) # Initialising mail
cors = CORS(app)
api = Api(app, catch_all_404s=Flask)
log = OsloLog(app)
异步包装函数定义
def async_task(f):
""" Takes a function and runs it in a thread """
@wraps(f)
def _decorated(*args, **kwargs):
thr = Thread(target=f, args=args, kwargs=kwargs)
thr.start()
return _decorated
def send_email(recipients):
sender = 'user@email.in'
subject = 'TEST MAIL : SUBJECT'
text_body = "TEST MAIL : TEXT_BODY"
html_body = None
app.logger.info("send_email(subject='{subject}', recipients=['{recp}'], text_body='{txt}')".format(sender=sender,
subject=subject,
recp=recipients,
txt=text_body))
msg = Message(subject, sender=sender, recipients=[recipients])
msg.body = text_body
msg.html = html_body
_send_async_email(app, msg)
@async_task
def _send_async_email(flask_app, msg):
""" Sends an send_email asynchronously
Args:
flask_app (flask.Flask): Current flask instance
msg (Message): Message to send
Returns:
None
"""
with flask_app.app_context():
mail.send(msg)
资源类定义
class indexResource(Resource): # Index Resource
def get(self):
app.logger.info('GET : indexResource')
return {"message": 'WORKING FINE', 'ip': request.remote_addr}
class mail_resource(Resource):
def get(self, email): # GET '/mail/mail_to_address@email.com
try:
send_email(email)
return "Mail Sent"
except Exception as e:
app.logger.warning('Error : {}'.format(e.message))
return "Error {}".format(e.message)
注册/添加资源
api.add_resource(indexResource, '/')
api.add_resource(mail_resource, '/mail/<email>')
运行脚本
if __name__ == '__main__':
app.run()
我在 AWS-Lambda 上使用Zappa进行无服务器部署。