3

我试图在单个实体中保留大约 28,000 个“行”,例如 EMPLOYEE

基本上,我的目标是通过使用超过 30 秒的 PUT 来避免被终止/超时 - 如果我只是通过调用发送到 servlet 的 doPost() 请求来执行 28,000 个 PUT,则可能会发生这种情况。

所以我正在考虑使用 Google App Engine 文档中描述的任务。

本质上,我想在 war 目录中上传一个包含 28,000 名“员工”的 csv 文件。然后创建一个任务,将这 28,000 个员工行异步 PUT 到 EMPLOYEE 实体。

  • Q1:这是一个可行的解决方案还是有更好的方法?同样,目标是执行 PUT 以避免由于 30 秒的限制而被终止。

  • Q2:我应该使用什么 queue.xml 配置来确保我可以尽快执行这些 PUT?

  • Q3:现在,我已经尝试过了,类似于博客条目: http: //gaejexperiments.wordpress.com/2009/11/24/episode-10-using-the-task-queue-service/但我得到了大约 23 秒后出现以下错误:

    SEVERE: Job default.task1 threw an unhandled Exception: 
    com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.
        at com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        at com.google.appengine.api.taskqueue.dev.LocalTaskQueue$UrlFetchServiceLocalTaskQueueCallback.execute(LocalTaskQueue.java:471)
        at com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    16/02/2011 12:12:55 PM org.quartz.core.ErrorLogger schedulerError
    SEVERE: Job (default.task1 threw an exception.
    org.quartz.SchedulerException: Job threw an unhandled exception. [See nested exception: com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.]
        at org.quartz.core.JobRunShell.run(JobRunShell.java:214)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    * Nested Exception (Underlying Cause) ---------------
    com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.
        at com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        at com.google.appengine.api.taskqueue.dev.LocalTaskQueue$UrlFetchServiceLocalTaskQueueCallback.execute(LocalTaskQueue.java:471)
        at com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    
  • Q4:我还检查了http://localhost:8888/_ah/admin上的数据存储查看器,它似乎只在该实体中创建了 1000 个结果。1000是极限吗?

  • Q5:如何摆脱上述错误?

  • Q6: 可以确认一个任务的最大允许时间是 10 分钟吗?还是30秒?我确实遇到过这个:http ://code.google.com/appengine/docs/java/taskqueue/overview.html#Task_Execution

4

3 回答 3

2

这是使用 mapreduce 解析 CSV 文件的示例/教程,似乎与您的需求相似:

http://ikaisays.com/2010/08/11/using-the-app-engine-mapper-for-bulk-data-import/

于 2011-02-17T00:02:08.823 回答
0

如果您的目标只是自己上传一堆数据,而不是让您的用户这样做,我认为更简单的工具是批量上传器。你可以在本地机器上运行一个 python 程序,它会为你处理请求限制和故障恢复。

http://ikaisays.com/2010/06/10/using-the-bulkloader-with-java-app-engine/

于 2011-02-16T14:08:20.437 回答
0

我会通过批量保存来做到这一点DeferredTask,大致是这样的:

List<Employee> employees=...
EmployeeWriter qr = new EmployeeWriter (employees);
TaskHandle task = QueueFactory.getDefaultQueue().add(withPayload(qr));

在哪里

public class EmployeeWriter implements DeferredTask {
   public EmployeeWriter () {    }
   public EmployeeWriter (List<Employee> employees) { 
          this.employees=new LinkedList(employees);  
   }

    private LinkedList<Employee> employees;

    @Override
    public void run() {
      Stopwatch sw = Stopwatch.createStarted();
      try {
        do {
           List employeesTosave=Pull100EmployeesFromLinkedList(employees)
           ofy().save(employeesTosave).now();

        } while (sw.elapsed(TimeUnit.MINUTES) < 9);
     finally {
        sw.stop();
        if (!employees.isEmpty()) {
            QueueFactory.getDefaultQueue().add(withPayload(this));
        }
    }

}
于 2013-10-31T20:12:57.107 回答