0

我正在尝试将一些数据从 API(谷歌股票/金融 API)写入我的 AWS Firehose 流。我已经在 Eclipse 上下载并安装了 AWS 插件,在 AWS 上设置了我的 Firehose 流,一切似乎都设置正确。不过,我遇到了一些问题。以下行似乎已被弃用...我尝试了亚马逊 SDK 的不同变体,但我似乎无法获得正确的代码。

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);

接下来,我遇到以下错误。具体错误是“对于 PutRecordRequest 类型的方法 setRecord(Record) 未定义”,尽管我直接从 Amazon 的 API 参考中获取了它。

request.setRecord(记录);

firehoseClient.putRecord(request);

上面第二行也出现错误:“AmazonKinesisFirehoseClient 类型的 putRecord(com.amazonaws.services.kinesisfirehose.model.PutRecordRequest) 方法不适用于参数 (com.amazonaws.services.kinesis.model.PutRecordRequest) "

package com.amazonaws.samples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;

import org.apache.http.client.CredentialsProvider;

import com.amazonaws.*;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;

public class FirehoseExample {

    public static void main(String[] args) {
        AWSCredentials credentials = null;

        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        }

        catch (Exception e) {
            throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
                    + "Please make sure that your credentials file is at the correct "
                    + "location (/Users/elybenari/.aws/credentials), and is in valid format.", e);
        }

        AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        PutRecordRequest request = new PutRecordRequest();
        request.setStreamName("project-stream");

        Record record = new Record();

        for (int i = 0; i < 20*60; i++){
            try {
                URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN");
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder response = new StringBuilder();
                String line;

                while ((line = reader.readLine()) != null) {
                    response.append(line);  
                }
                reader.close();

                System.out.println(response.toString().replace("\n", "").replaceAll(" ", ""));
                System.out.println("****\n");

                ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes());
                record.setData(buff);

                request.setRecord(record);

                firehoseClient.putRecord(request);

                Thread.sleep(2000);


            }
            catch(Exception e){
                e.printStackTrace();
            }
        }   

    }




    }
4

1 回答 1

4

问题是您包含了 Kinesis 中的一些类,而不是Kinesis Firehose,Java 包。例如,您使用过:

import com.amazonaws.services.kinesis.model.PutRecordRequest;

鉴于,您应该使用过:

import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;

Kinesis、Kinesis Firehose 和 Kinesis Analytics 是不同的服务,尽管它们属于 AWS 上的一个流服务保护伞。因此,它们在 Java SDK 中具有不同的包命名空间。如果您从此处的官方文档开始,您将在此处获得正确的 Java SDK参考

编辑:回答您的其他问题:是的,不推荐使用以下内容:

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);

您应该改用以下内容:

 AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build();

有关如何正确初始化 AmazonKinesisFirehoseClient的信息,请参阅此处的官方文档。

于 2017-04-05T12:33:15.437 回答