我需要将记录插入 Cassandra,所以我编写了一个函数,其输入是一个 csv 文件。假设 csv 文件的名称是 test.csv。在 Cassandra 我有一个表格测试。我需要将 csv 文件的每一行存储到测试表中。由于我使用的是 spark java api ,因此我还创建了一个 POJO 类或 DTO 类来映射 Pojo 的字段和 Cassandra 的列。
这里的问题是 test.csv 有大约 50 个逗号分隔值,这些值必须存储在 cassandra 的测试表的 50 列中,总共有 400 列。所以在我的测试 POJO 类中,我创建了这 50 个字段的构造函数。
JavaRDD<String> fileRdd = ctx.textFile("home/user/test.csv");
JavaRDD fileObjectRdd = fileRdd.map(
new Function<String, Object>() {
//do some tranformation with data
switch(fileName){
case "test" :return new TestPojo(1,3,4,--50); //calling the constructor with 50 fields .
}
});
switch(fileName){
test : javaFunctions(fileObjectRdd).writerBuilder("testKeyspace", "test", mapToRow(TestPojo.class)).saveToCassandra();
}
所以在这里我总是将 test.csv 文件每一行的 TestPojo 类的 Object 返回到 Objects 的 Rdd 。完成后,我将使用 TestPojo 映射将该 rdd 保存到 Cassandra 表测试。
我的问题是,如果 test.csv 将来有 60 列,那么我的代码将无法工作,因为我调用的构造函数只有 50 个字段。
我的问题是如何创建一个包含 TestPojo 中所有 400 个字段的构造函数,这样无论 test.csv 有多少字段,我的代码都应该能够处理它。
我试图创建一个包含所有 400 个字段的通用构造函数,但最终出现编译错误,说构造函数参数的限制只有 255 个字段。
或者有没有更好的方法来处理这个用例?
问题 2:如果 test.csv 中的数据将发送到 cassandra 中的多个表,例如 test.csv 的 5 个 cols 将发送到 cassandra 中的 test 表,而其他 5 个 cols 将发送到 cassandra 中的 test2 表,该怎么办?
这里的问题是当我在做
JavaRDD fileObjectRdd = fileRdd.map(
new Function<String, Object>() {
//do some tranformation with data
switch(fileName){
case "test" :return new TestPojo(1,3,4,--50); //calling the constructor with 50 fields .
}
});
我只返回一个 TestPojo 对象。如果 test.csv 中的数据将发送到 test table 和 test2 table ,我将需要返回两个对象,一个是 TestPojo,另一个是 Test2Pojo。