6

我是 Kinesis 的新手。阅读我发现的文档,我可以创建 Kinesis Stream 以从 Producer 获取数据。然后使用 KCL 将从 Stream 中读取此数据以进行进一步处理。我了解如何通过实现 IRecordProcessor 来编写 KCL 应用程序。

然而,我仍然不清楚如何将数据放在 Kinesis 流上的第一阶段。我们是否有一些 AWS API 需要实施才能实现这一点。

场景:我有一台服务器,它不断地从文件夹中的各种来源获取数据。每个文件夹都包含文本文件,其行包含进一步分析工作所需的属性。我必须将所有这些数据推送到 Kinesis Stream。

我需要如下代码,类 putData 方法将用于在 Kinesis 流中输出

public class Put {

    AmazonKinesisClient kinesisClient;

    Put()
    {
        String accessKey = "My Access Key here" ;
        String secretKey = "My Secret Key here" ;
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
       kinesisClient = new AmazonKinesisClient(credentials);
       kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
        System.out.println("starting the Put Application");
    }

    public void putData(String fileContent,String session) throws Exception
    {
         final String myStreamName = "ClickStream";

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(myStreamName);
            String putData = fileContent;
            putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
            putRecordRequest.setPartitionKey("session"+session);
            PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
            System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
                    + ", ShardID : " + putRecordResult.getShardId());
            System.out.println(fileContent);
            System.out.println("Sequence Number: "+putRecordResult.getSequenceNumber());

            System.out.println("Data has been PUT successfully");


    }
}

但是从服务器的源文件夹中读取文件,然后我应该使用什么设计来调用 putData 以获取 Kinesis 流上的记录。我是否需要无限循环并读取所有文件,然后执行此操作或某些框架会更好地做到这一点,同时兼顾容错性和单点故障。任何帮助将不胜感激。

简而言之:我需要一种更好的技术来将定期生成的数据放到 Kinesis Stream 中,这些数据会定期生成到服务器。谢谢

4

4 回答 4

3

如果您要拖尾一些文件,请尝试使用 Fluentd。http://www.fluentd.org/

Amazon Kinesis 有一个非常好的插件。https://github.com/awslabs/aws-fluent-plugin-kinesis

于 2014-11-10T09:05:09.110 回答
2

所以看来您已经在使用... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

您想要的具体方法如下。

您需要一个流名称、记录和流密钥。 http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

但似乎你拥有这一切?

然后,您将需要一个始终跟踪您的服务器日志文件的程序运行,并且当有新行时它会推送它。

但是您的数据只能保留 24 小时。然后,您需要一个工作程序来使用数据并将其放置在其他一些 AWS 资源中。

于 2014-06-23T01:56:42.440 回答
2

您可以使用 Amazon kinesis 代理来监控一组文件,它们可以将数据流式传输到 kinesis。

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html

于 2016-08-26T16:36:44.883 回答
0

如果您尝试摄取日志文件,请尝试 Fluentd。Fluentd可以连续tail log文件,做数据缓冲、加密、压缩、重试。

Fluentd 的 Kinesis 插件是由 Amazon Web Services 自己开发的。

于 2016-03-11T12:16:32.873 回答