1

我正在使用 Dynamo DB 流 + Lamdba 作为触发器来调用 kinesis,它将我的 Dynamo DB 数据放入 Redshift。

有人可以建议一种使用发电机流将 Dynamo DB 数据加载到不同区域的 Redshift 的方法。

4

1 回答 1

0

我编写了一个程序,可以将数据从 Dynamo db 移动到 Redshift,但没有流就无法工作,您可以查看代码,看看这是否有助于您的案例,或者您是否得到任何与此相对应的想法。

1.创建与 Redshift 的连接。2.创建用于插入 Redshift 的 Prepeared Statement。3.使用分页从Dynamo批量获取数据。4.将批量数据批量插入Resdhift。

public void createConnectionWithRedshift() {
        final String DB_URL = "jdbc:redshift://ao.cepuhmobd.us-west-2.redshift.amazonaws.com:5439/events";
        // final String DB_URL = args[0];
        // Database credentials
        final String USER = "abc";
        final String PASS = "abc";
        Connection conn = null;
        try {
            // STEP 3: Open a connection
            System.out.println("Connecting to database...");
            conn = DriverManager.getConnection(DB_URL, USER, PASS);
            // createNewTable(conn);
            // STEP 4: Execute a query
            preparedStatement = conn.prepareStatement("insert into Events " + "(Vin,timestamp,eventtype,source,data)" + "VALUES (?,?,?,?,?)");
        } catch (SQLException se) {
            se.printStackTrace();
        }
    }// end main


    public void replicateDynamoToRedshidt(int pages, int batchSize, int scanSize)
            throws TableNeverTransitionedToStateException, InterruptedException {
createConnectionWithRedshift();//Redshift Connection
for (int i = 0; i < pages; i = i + 1) {
            List<EventLogEntity> results = findAll(new PageRequest(i, batchSize));//Fetching the data from Dynamo in batches
            List<HeadUnitData> headUnitDataList = headUnitEvents(results);
            for (int j = 0; j < headUnitDataList.size(); j++) {
                HeadUnitData headUnitData = headUnitDataList.get(j);
                insertData(headUnitData.getVin(), headUnitData.getType(), headUnitData.getSource(), headUnitData.getData());//Inserting the data into Redshidt in batches
            }
            try {
                preparedStatement.executeBatch();
                System.out.println("Inserted in Database : " + results.size());
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
于 2017-05-26T05:21:28.707 回答