2

我需要在 Django 应用程序中运行一些异步任务,我开始研究 Google Cloud Tasks。我想我已经遵循了所有的指示——以及我能想到的所有可能的变化,但到目前为止没有成功。

问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。为简单起见,我将相同的代码部署到 App Engine(标准)的两个服务中,并将任务请求仅路由到其中一个。

看起来代码本身运行良好。当我转到“ https://[proj].appspot.com/api/v1/tasks ”时,例程执行得很好,并且根据 DevTools/Network 没有重定向。当 Cloud Tasks 尝试调用“/api/v1/tasks”时,每次都会失败。

如果有人可以查看下面的代码并指出可能导致此失败的原因,我将不胜感激。

谢谢你。

#--------------------------------
# [proj]/.../urls.py
#--------------------------------
from [proj].api import tasks

urlpatterns += [
    # tasks api
    path('api/v1/tasks', tasks, name='tasks'),
]
#--------------------------------
#   [proj]/api.py:
#--------------------------------
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
def tasks(request):
    print('Start api')
    payload = request.body.decode("utf-8")
    print (payload)
    print('End api')
    return HttpResponse('OK')

#--------------------------------
# [proj]/views/manut.py
#--------------------------------
from django.views.generic import View
from django.shortcuts import redirect
from [proj].tasks import TasksCreate

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks'
        testa_task = TasksCreate()
        resp = testa_task.send_task(
            url=relative_url,
            schedule_time=5,
            payload={'task_type': 1, 'id': 21}
        )
        print(resp)
        return redirect(request.META['HTTP_REFERER'])

#--------------------------------
# [proj]/tasks/tasks.py:
#--------------------------------
from django.conf import settings
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from typing import Dict, Optional, Union
import json
import time

class TasksCreate:

    def send_task(self,
        url: str,
        payload: Optional[Union[str, Dict]] = None,
        schedule_time: Optional[int] = None,    # in seconds
        name: Optional[str] = None,
        ) -> None:

        client = tasks_v2.CloudTasksClient()
        parent = client.queue_path(
            settings.GCP_PROJECT,
            settings.GCP_LOCATION,
            settings.GCP_QUEUE,
        )

        # App Engine task:
        task = {
            'app_engine_http_request': {  # Specify the type of request.
                'http_method': 'POST',
                'relative_uri': url,
                'app_engine_routing': {'service': 'tasks'}
            }
        }

        if name:
            task['name'] = name
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        if payload is not None:
            converted_payload = payload.encode()
            # task['http_request']['body'] = converted_payload
            task['app_engine_http_request']['body'] = converted_payload
        if schedule_time is not None:
            now = time.time() + schedule_time
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)
            # Create Timestamp protobuf.
            timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
            # Add the timestamp to the tasks.
            task['schedule_time'] = timestamp

        resp = client.create_task(parent, task)

        return resp

# --------------------------------
# [proj]/dispatch.yaml:
# --------------------------------
dispatch:
  - url: "*/api/v1/tasks"
    service: tasks

  - url: "*/api/v1/tasks/"
    service: tasks

  - url: "*appspot.com/*"
    service: default

#--------------------------------
# [proj]/app.yaml & tasks.yaml:
#--------------------------------
runtime: python37

instance_class: F1

automatic_scaling:
  max_instances: 2

service: default

#handlers:
#- url: .*
#  secure: always
#  redirect_http_response_code: 301
#  script: auto

entrypoint: gunicorn -b :$PORT --chdir src server.wsgi

env_variables:
...

更新:

以下是执行的日志:

{
 insertId: "1lfs38fa9"  
 jsonPayload: {
  @type: "type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog"   
  attemptResponseLog: {
   attemptDuration: "0.008005s"    
   dispatchCount: "5"    
   maxAttempts: 0    
   responseCount: "5"    
   retryTime: "2020-03-09T21:50:33.557783Z"    
   scheduleTime: "2020-03-09T21:50:23.548409Z"    
   status: "UNAVAILABLE"    
   targetAddress: "POST /api/v1/tasks"    
   targetType: "APP_ENGINE_HTTP"    
  }
  task: "projects/[proj]/locations/us-central1/queues/tectaq/tasks/09687434589619534431"   
 }
 logName: "projects/[proj]/logs/cloudtasks.googleapis.com%2Ftask_operations_log"  
 receiveTimestamp: "2020-03-09T21:50:24.375681687Z"  
 resource: {
  labels: {
   project_id: "[proj]"    
   queue_id: "tectaq"    
   target_type: "APP_ENGINE_HTTP"    
  }
  type: "cloud_tasks_queue"   
 }
 severity: "ERROR"  
 timestamp: "2020-03-09T21:50:23.557842532Z"  
}
4

5 回答 5

1

问题是 App Engine 任务处理程序不遵循重定向,因此您必须找出请求被重定向的原因并对 App Engine 请求进行异常处理。在我的情况下,我将 http 重定向到 https 并且必须像这样做出异常:(Node Express)

app.use((req, res, next) => {

   const protocol = req.headers['x-forwarded-proto']
   const userAgent = req.headers['user-agent']

   if (userAgent && userAgent.includes('AppEngine-Google')) {
      console.log('USER AGENT IS GAE, SKIPPING REDIRECT TO HTTPS.')
      return next()
   } else if (protocol === 'http') {
      res.redirect(301, `https://${req.headers.host}${req.url}`)
   } else {
      next()
   }

})
于 2020-08-18T14:07:36.880 回答
1

最后我可以让 Cloud Tasks 工作,但只能使用 http_request 类型(使用绝对 url)。当任务被定义为 app_engine_http_request(相对 url)时,我无法让它们运行。

我已经尝试过使用 POST 的 http_request 类型,但那是在我免除 api 函数之前检查 csrf 令牌之前,这导致了一个错误Forbidden (Referer checking failed - no Referer.): /api/v1/tasks,我无法连接到 csrf 遗漏。

如果将来有人偶然发现这个问题,并找到一种方法让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我仍然非常想知道解决方案。

于 2020-03-13T10:08:03.213 回答
0

如果将来有人偶然发现这个问题,并找到一种方法让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我仍然非常想知道解决方案。

@JCampos 我设法让它在我的 Django 应用程序上运行(我还使用了 DRF,但我认为它不会造成很大的不同)。

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import datetime

class CloudTasksMixin:
@property
def _cloud_task_client(self):
        return tasks_v2.CloudTasksClient()

def send_to_cloud_tasks(self, url, http_method='POST', payload=None,in_seconds=None, name=None):
    """ Send task to be executed """
    parent = self._cloud_task_client.queue_path(settings.TASKS['PROJECT_NAME'], settings.TASKS['QUEUE_REGION'], queue=settings.TASKS['QUEUE_NAME'])

    task = {
        'app_engine_http_request': {
            'http_method': http_method,
            'relative_uri': url
        }
    }

    ...

然后我使用这样的视图:

class CloudTaskView(views.APIView):
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        # Do your stuff
        return Response()

最后,我在 urls.py(来自 DRF)中实现了这个 urlcsrf_exempt(CloudTaskView.as_view())

起初我有 403 错误,但感谢你和你对 csrf_exempt 的评论,它现在可以工作了。

于 2020-05-25T16:46:56.157 回答
0

问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。

也许您的任务端点的请求处理程序需要一个斜杠。

尝试改变这个:

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks'
        ...

对此:

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks/'
        ...

也只需尝试自己点击任务网址,看看是否可以从中运行任务curl

于 2020-03-10T23:41:56.913 回答
0

似乎 Cloud Tasks 使用 HTTP url 调用 App Engine(没关系,因为它们可能在同一个网络中),但如果您使用的是 HTTPs,Django 应该重定向(http -> https)收到的任何请求,包括您的处理程序端点。

为了解决这个问题,你应该告诉 Django 不要重定向你的处理程序。
您可以使用settings.SECURE_REDIRECT_EXEMPT

例如:

SECURE_REDIRECT_EXEMPT = [r"^api/v1/tasks/$"]
于 2021-05-21T00:23:49.900 回答