我是 python 新手,这是我第一个使用烧瓶的 REST 项目。我正在尝试将 REST 集成到移动应用程序中。我想做的是考虑一个预订应用程序,它可以向 REST api 下订单。预订对象的日期表示预订日期(未来的日期)。我想要的是当我的python资源被调用时,在数据库中添加预订对象并安排firebase推送通知在给定日期发送到手机。一切都像一个魅力,除了我预定的通知。我正在集成 PyFCM。如果我创建一个端点来触发通知,它可以正常工作,但不能安排它。在互联网上研究后,我发现我可以使用来自 flask_apscheduler lib 的 APScheduler。我有一个 app.py 文件,如下所示:
from flask import Flask
from flask_restful import Api
from resources.trip import TripResource, TripsList, SearchTripsByDestinations
from flask_jwt_extended import JWTManager
from flask_migrate import Migrate
from resources.userResource import UserLogin, UserRegister, UserResource, UserConfirm, UserLogout
from resources.booking import Booking, BookingList, TripBookings, BookingPay, DeleteBooking
from resources.seat import SeatList, SeatResource
from resources.registerTokenResource import RegisterToken, ListPaymentMethods
from resources.pushTokenResource import PushTokenResource
from ma import ma
from db import db
from dotenv import load_dotenv
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
load_dotenv(".env")
app.config['DEBUG'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///data.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['PROPAGATE_EXCEPTIONS'] = True
app.config['JWT_BLACKLIST_ENABLED'] = True
app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = ['access', 'refresh']
api = Api(app)
jwt = JWTManager(app)
migrate = Migrate(app, db)
api.add_resource(TripResource, '/create_trip')
api.add_resource(TripsList, '/trips')
api.add_resource(UserRegister, '/register')
api.add_resource(UserResource, '/users')
api.add_resource(UserLogin, '/login')
api.add_resource(BookingList, '/bookings')
api.add_resource(TripBookings, '/get_trip_bookings')
api.add_resource(Booking, '/add_booking')
api.add_resource(SeatList, '/seats')
api.add_resource(BookingPay, '/book')
api.add_resource(UserConfirm, '/user_confirm/<int:user_id>')
api.add_resource(SeatResource, '/add_seats_for_booking')
api.add_resource(RegisterToken, '/add_payment_token')
api.add_resource(ListPaymentMethods, '/payment_methods')
api.add_resource(SearchTripsByDestinations, '/search_trips')
api.add_resource(DeleteBooking, '/delete_booking')
api.add_resource(UserLogout, '/logout')
api.add_resource(PushTokenResource, '/register_push_token')
if __name__ == '__main__':
from db import db
db.init_app(app)
db.app = app
ma.init_app(app)
if app.config['DEBUG']:
@app.before_first_request
def create_tables():
# with app.app_context():
db.create_all()
app.run(port=5000)
为了添加我的预订,我有一个资源文件,它有一个 post 方法,其中包含以下内容:
class Booking(Resource):
def post(self):
try:
booking_item = booking_schema.load(request.get_json())
except ValidationError as err:
return err.messages, 500
try:
user = UserModel.find_by_id(booking_item.user_id)
if booking_item.transaction_time:
try:
token = PushToken.search_token_for_user(booking_item.user_id)
if token:
schedule_notif(booking_item.transaction_time, token, str(booking_item.transaction_time))
else:
return {'success': False, 'errors': ['could not send notification']}
except Exception as e:
return {'success': False, 'errors': ['An error has occured scheduleing notification {}'.format(e)]}
except Exception as err:
return {'success': False, 'errors': ['An error has occured inserting item {}'.format(err)]}
return {'payload': booking_schema.dump(booking_item)}, 200
def schedule_notif(timestamp, token, n_id):
date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
app.apscheduler.add_job(id=n_id, func=send_push, trigger='date', run_date=date, args=(token,
"Your have 30 "
"minutes "
"untill your "
"booking "
,
"Your have 30 "
"minutes untill "
"your booking"
, n_id))
def send_push(token, title, message, n_id):
push_service = FCMNotification(api_key=api_key)
try:
registration_id = token.token
if registration_id:
message_title = title
message_body = message
data = {
"title": title,
"body": message,
"type": "booking_reminder"
}
push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
message_body=message_body, data_message=data, timeout=30)
except Exception as err:
return {'payload': {}, 'success': False, 'errors': ["{}".format(err)]}, 400
我不知道是什么问题,也许该服务器停止了我的调度程序。你能帮我理解我做错了什么吗?
谢谢!