我想在 Django 中为具有 celery send_task 函数的函数编写单元测试。
我如何在 Django 和模拟 celery 中编写它的 UT?
def push_to_queue(self, data):
""" function to add task in the backend queue
Args:
Accept data dictionary
returns: Nothing
"""
try:
LOG.info(
"Pushing %s to queue %s " % (data, settings.SCHEDULER_STATUS_QUEUE_NAME)
)
# sqs handler
handle = Celery("scheduler_service", broker="sqs://")
handle.send_task(
settings.BACKEND_TASK,
queue=settings.SCHEDULER_STATUS_QUEUE_NAME,
kwargs=data,
)
LOG.info(
"Pushed message successfully to %s queue."
% settings.SCHEDULER_STATUS_QUEUE_NAME
)
except Exception as err:
LOG.error("Exception while pusing meesage to queue %s " % str(err))