3

我开发了一款软件,可以将记录写入 Amazon kinesis Stream Web 服务。我想了解我们是否有任何软件工具可以让我测量我的代码在一秒钟内为 Kinesis Stream 生成 1 个 Shard 的最大吞吐量。是的,我同意这也取决于硬件配置。但是对于开始我想知道通用机器然后可能我将能够看到水平可扩展性

有了这个,我试图达到每秒 25k 条记录写入 kinesis 流

参考:Kinesis http://aws.amazon.com/kinesis/

4

2 回答 2

3

我相信您可以为此使用Apache JMeter

  1. 下载并安装 JMeter
  2. 下载Amazon Kinesis Java 客户端库并将 jar 放入 JMeter 类路径(您可以使用 JMeter 安装的 /lib 文件夹)
  3. 使用JSR223 Sampler,“groovy”作为一种语言,AmazonKinesisRecordProducerSample作为参考,实现了将记录写入流的代码

请参阅Beanshell 与 JSR223 与 Java JMeter 脚本:您一直在等待的性能下降!有关安装“groovy”引擎支持和脚本最佳实践的说明指南。

于 2015-06-19T15:09:00.797 回答
1

感谢您的提示。我已经找到了一种方法,让 groovy 中的工作代码使用 AWS-Java-SDK 使用 Kinesis Stream 发送记录:这是示例代码:

/*
 * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.nio.ByteBuffer
import java.util.List
import java.util.concurrent.TimeUnit

import com.amazonaws.AmazonClientException
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.CreateStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamResult
import com.amazonaws.services.kinesis.model.ListStreamsRequest
import com.amazonaws.services.kinesis.model.ListStreamsResult
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.model.PutRecordResult
import com.amazonaws.services.kinesis.model.ResourceNotFoundException
import com.amazonaws.services.kinesis.model.StreamDescription

 class AmazonKinesisRecordProducerSample {

    /*
     * Before running the code:
     *      Fill in your AWS access credentials in the provided credentials
     *      file template, and be sure to move the file to the default location
     *      (~/.aws/credentials) where the sample code will load the
     *      credentials from.
     *      https://console.aws.amazon.com/iam/home?#security_credential
     *
     * WARNING:
     *      To avoid accidental leakage of your credentials, DO NOT keep
     *      the credentials file in your source directory.
     */

     def kinesis

    def init() {
        /*
         * The ProfileCredentialsProvider will return your [default]
         * credential profile by reading from the credentials file located at
         * (~/.aws/credentials).
         */
            AWSCredentials credentials = null
            credentials = new ProfileCredentialsProvider().getCredentials()
             kinesis = new AmazonKinesisClient(credentials)         
    }    
}

 def amazonKinesisRecordProducerSample= new AmazonKinesisRecordProducerSample() 
amazonKinesisRecordProducerSample.init()

  def myStreamName="<KINESIS STREAM NAME>"

        println("Press CTRL-C to stop.")
        // Write records to the stream until this program is aborted.
        while (true) {
            def createTime = System.currentTimeMillis()
            def data='<Data IN STRING FORMAT>'
            def partitionkey="<PARTITION KEY>"
            def putRecordRequest = new PutRecordRequest()
            putRecordRequest.setStreamName(myStreamName)
           putRecordRequest.setData(ByteBuffer.wrap(String.valueOf(data).getBytes()))
          putRecordRequest.setPartitionKey(partitionkey)
            def putRecordResult = new PutRecordResult()
            putRecordResult = amazonKinesisRecordProducerSample.kinesis.putRecord(putRecordRequest)
            printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
                    putRecordRequest.getPartitionKey(),
                    putRecordResult.getShardId(),
                    putRecordResult.getSequenceNumber())
        }

注意:此代码仅在您已创建并启用 Kinesis 流时才有效。如果您需要创建流然后使用它,请参考 aws-java-sdk src 文件夹中给出的代码示例。

于 2016-05-22T15:55:42.937 回答