0

我想使用 SDK Amazon 的 batchWriteItem 方法将很多项目放入表中。我从 Kinesis 中检索了这些项目,它有很多碎片。我将此方法用于一项:

public static void addSingleRecord(Item thingRecord) {


    // Add an item
    try
    {

        DynamoDB dynamo = new DynamoDB(dynamoDB); 
        Table table = dynamo.getTable(dataTable);
        table.putItem(thingRecord);

    } catch (AmazonServiceException ase) {
        System.out.println("addThingsData request  "
                + "to AWS was rejected with an error response for some reason.");
        System.out.println("Error Message:    " + ase.getMessage());
        System.out.println("HTTP Status Code: " + ase.getStatusCode());
        System.out.println("AWS Error Code:   " + ase.getErrorCode());
        System.out.println("Error Type:       " + ase.getErrorType());
        System.out.println("Request ID:       " + ase.getRequestId());
    } catch (AmazonClientException ace) {
        System.out.println("addThingsData - Caught an AmazonClientException, which means the client encountered "
                + "a serious internal problem while trying to communicate with AWS, "
                + "such as not being able to access the network.");
        System.out.println("Error Message: " + ace.getMessage());
    }
}

public static void addThings(String thingDatum) {
    Item itemJ2;
    itemJ2 = Item.fromJSON(thingDatum);
    addSingleRecord(itemJ2);

}

该项目来自:

private void processSingleRecord(Record record) {
    // TODO Add your own record processing logic here


    String data = null;
    try {


        // For this app, we interpret the payload as UTF-8 chars.
        data = decoder.decode(record.getData()).toString();
        System.out.println("**processSingleRecord - data " + data);
        AmazonDynamoDBSample.addThings(data);

    } catch (NumberFormatException e) {
        LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
    } catch (CharacterCodingException e) {
        LOG.error("Malformed data: " + data, e);
    }
}

现在,如果我想放很多记录,我将使用:

    public static void writeMultipleItemsBatchWrite(Item thingRecord) {
    try {                    


            dataTableWriteItems.addItemToPut(thingRecord);
        System.out.println("Making the request.");
        BatchWriteItemOutcome outcome = dynamo.batchWriteItem(dataTableWriteItems);
        do {

                // Check for unprocessed keys which could happen if you exceed provisioned throughput

                Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();

                if (outcome.getUnprocessedItems().size() == 0) {
                    System.out.println("No unprocessed items found");
                } else {
                    System.out.println("Retrieving the unprocessed items");
                    outcome = dynamo.batchWriteItemUnprocessed(unprocessedItems);
                }

            } while (outcome.getUnprocessedItems().size() > 0);

    }  catch (Exception e) {
        System.err.println("Failed to retrieve items: ");
        e.printStackTrace(System.err);
    }  

}

但我怎样才能发送最后一组?因为我只在有 25 件商品时才发送,但最后数量较少。

4

1 回答 1

1

您可以使用 PutItem 或 UpdateItem附加到Kinesis StreamLambda 函数中的Document SDK一次将项目写入 DynamoDB 表。这样,您可以在 Stream Records 出现在 Stream 中时对其做出反应,而不必担心是否还有更多记录要处理。在幕后,BatchWriteItem 消耗与相应的 PutItem 调用相同数量的写入容量单位。BatchWriteItem 将与批处理中耗时最长的 PUT 一样潜在。因此,使用 BatchWriteItem,您可能会遇到比并行 PutItem/UpdateItem 调用更高的平均延迟。

于 2015-06-04T02:39:03.887 回答