2

我想在 Django 中为具有 celery send_task 函数的函数编写单元测试。

我如何在 Django 和模拟 celery 中编写它的 UT?

def push_to_queue(self, data):
        """ function to add task in the backend queue
        Args:
            Accept data dictionary
        returns: Nothing
        """
        try:
            LOG.info(
                "Pushing %s to queue %s " % (data, settings.SCHEDULER_STATUS_QUEUE_NAME)
            )
            # sqs handler
            handle = Celery("scheduler_service", broker="sqs://")
            handle.send_task(
                settings.BACKEND_TASK,
                queue=settings.SCHEDULER_STATUS_QUEUE_NAME,
                kwargs=data,
            )
            LOG.info(
                "Pushed message successfully to %s queue."
                % settings.SCHEDULER_STATUS_QUEUE_NAME
            )
        except Exception as err:
            LOG.error("Exception while pusing meesage to queue %s " % str(err))
4

0 回答 0