我正在使用 Dynamo DB 流 + Lamdba 作为触发器来调用 kinesis,它将我的 Dynamo DB 数据放入 Redshift。
有人可以建议一种使用发电机流将 Dynamo DB 数据加载到不同区域的 Redshift 的方法。
我正在使用 Dynamo DB 流 + Lamdba 作为触发器来调用 kinesis,它将我的 Dynamo DB 数据放入 Redshift。
有人可以建议一种使用发电机流将 Dynamo DB 数据加载到不同区域的 Redshift 的方法。
我编写了一个程序,可以将数据从 Dynamo db 移动到 Redshift,但没有流就无法工作,您可以查看代码,看看这是否有助于您的案例,或者您是否得到任何与此相对应的想法。
1.创建与 Redshift 的连接。2.创建用于插入 Redshift 的 Prepeared Statement。3.使用分页从Dynamo批量获取数据。4.将批量数据批量插入Resdhift。
public void createConnectionWithRedshift() {
final String DB_URL = "jdbc:redshift://ao.cepuhmobd.us-west-2.redshift.amazonaws.com:5439/events";
// final String DB_URL = args[0];
// Database credentials
final String USER = "abc";
final String PASS = "abc";
Connection conn = null;
try {
// STEP 3: Open a connection
System.out.println("Connecting to database...");
conn = DriverManager.getConnection(DB_URL, USER, PASS);
// createNewTable(conn);
// STEP 4: Execute a query
preparedStatement = conn.prepareStatement("insert into Events " + "(Vin,timestamp,eventtype,source,data)" + "VALUES (?,?,?,?,?)");
} catch (SQLException se) {
se.printStackTrace();
}
}// end main
public void replicateDynamoToRedshidt(int pages, int batchSize, int scanSize)
throws TableNeverTransitionedToStateException, InterruptedException {
createConnectionWithRedshift();//Redshift Connection
for (int i = 0; i < pages; i = i + 1) {
List<EventLogEntity> results = findAll(new PageRequest(i, batchSize));//Fetching the data from Dynamo in batches
List<HeadUnitData> headUnitDataList = headUnitEvents(results);
for (int j = 0; j < headUnitDataList.size(); j++) {
HeadUnitData headUnitData = headUnitDataList.get(j);
insertData(headUnitData.getVin(), headUnitData.getType(), headUnitData.getSource(), headUnitData.getData());//Inserting the data into Redshidt in batches
}
try {
preparedStatement.executeBatch();
System.out.println("Inserted in Database : " + results.size());
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}