3

我用 Java 编写了一个 servlet 代码,用于从存储在 Google Cloud Storage 中的文件中读取一行。一旦我阅读了每一行,我就会将它传递给 prediction API 。一旦我得到通过文本的预测。我将它附加到原始行并将其存储在 Google 云存储中的其他文件中。

这个源文件是一个 csv 文件,有超过 10,000 条记录。由于我是单独解析它,将其传递给预测 API,然后存储回 Cloud Storage。这样做需要很多时间。由于 App Engine 有 30 个部分的限制,并且任务队列也有限制。任何人都可以建议我一些选择吗?由于重新启动程序不是一个选项,因为我将无法从我停止的位置启动预测。

这是我的代码:

@SuppressWarnings("serial")
public class PredictionWebAppServlet extends HttpServlet {

    private static final String APPLICATION_NAME = "span-test-app";

    static final String MODEL_ID = "span-senti";
    static final String STORAGE_DATA_LOCATION = "/bigdata/training_set/";
    private static HttpTransport httpTransport;
    private static final JsonFactory JSON_FACTORY = JacksonFactory
            .getDefaultInstance();

    public static final String INPUT_BUCKETNAME = "bigdata";
    public static final String INPUT_FILENAME = "abc.csv";

    public static final String OUTPUT_BUCKETNAME = "bigdata";
    public static final String OUTPUT_FILENAME = "def.csv";

    private static Credential authorize() throws Exception {

        Credential cr = new GoogleCredential.Builder()
                .setTransport(httpTransport)
                .setJsonFactory(JSON_FACTORY)
                .setServiceAccountId(
                        "878482284233-aacp8vd5297aqak7v5r0f507qr63mab4@developer.gserviceaccount.com")
                .setServiceAccountScopes(
                        Collections.singleton(PredictionScopes.PREDICTION))
                .setServiceAccountPrivateKeyFromP12File(
                        new File(
                                "28617ba6faac0a51eb2208edba85d2e20e6081b4-privatekey.p12"))
                .build();
        return cr;
    }



    public void doGet(HttpServletRequest req, HttpServletResponse resp)
            throws IOException {
        try {
            httpTransport = GoogleNetHttpTransport.newTrustedTransport();
            Credential credential = authorize();

            Prediction prediction = new Prediction.Builder(httpTransport,
                    JSON_FACTORY, credential).setApplicationName(APPLICATION_NAME)
                    .build();


            GcsService gcsService = GcsServiceFactory.createGcsService();

            GcsFilename filename = new GcsFilename(INPUT_BUCKETNAME, INPUT_FILENAME);
            GcsFilename filename1 = new GcsFilename(OUTPUT_BUCKETNAME,
                    OUTPUT_FILENAME);
            GcsFileOptions options = new GcsFileOptions.Builder()
                    .mimeType("text/html").acl("public-read")
                    .addUserMetadata("myfield1", "my field value").build();


            GcsOutputChannel writeChannel = gcsService.createOrReplace(filename1, options);

            PrintWriter writer = new PrintWriter(Channels.newWriter(writeChannel,
                    "UTF8"));


            GcsInputChannel readChannel = null;
            BufferedReader reader = null;

            readChannel = gcsService.openReadChannel(filename, 0);
            reader = new BufferedReader(Channels.newReader(readChannel, "UTF8"));
            String line;
            String cvsSplitBy = ",";
            String temp_record = "";
            Input input = new Input();
            InputInput inputInput = new InputInput();


            while ((line = reader.readLine()) != null) {

                String[] post = line.split(cvsSplitBy);

                inputInput.setCsvInstance(Collections
                        .<Object> singletonList(post[1]));
                input.setInput(inputInput);

                Output output = prediction.trainedmodels()
                        .predict("878482284233", MODEL_ID, input).execute();
                for (int i = 0; i < 10; i++) {
                    temp_record = temp_record + post[i] + ",";
                }
                temp_record = temp_record + output.getOutputLabel();


                 writer.println(temp_record);

            }

            writer.flush();
            writer.close();

            //resp.getWriter().println(temp_record);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally{

        }
    }
}
4

2 回答 2

5

你自己在暗示。

如果您认为您的工作可以在 10 分钟内完成,您可以单独使用任务队列来完成。

如果没有,您将需要结合使用任务队列和后端。您需要将其推送到后端实例中。看看推送队列和后端

更新 - 使用模块而不是后端

不推荐使用后端以支持模块。使用模块的一种方法是:

  1. 将您的应用程序转换为模块结构
  2. 定义一个手动缩放的模块
  3. 处理该模块中的“/_ah/start”网址
  4. 在“/_ah/start”处理程序中执行所有工作

手动扩展实例对它们可以运行多长时间没有限制。如果实例具有手动扩展,您可以在“/_ah/start”请求中“永远”运行。嘿,如果你愿意,你甚至可以启动线程。但这对于这项工作应该不是必需的。运行直到完成。

于 2014-07-14T14:43:53.820 回答
0

这种事情正是MapReduce 框架的用途。

于 2014-07-14T15:04:02.537 回答