我是新来的火花,我有一个问题。我正在处理使用 textFile() 生成的 RDD,它是一个 csv 文件。对于每一行,我想将多行返回到一个新的 RDD(一个而不是多个)。这是我的代码:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
我在这里所做的是过滤初始 csv 以仅获取 LinearAccelerationEvent,然后我想将这些对象映射到 LinearAccelerationEvent 类并生成 LinearAccelerationEvent 对象的新 RDD。对于初始 csv 文件的每一行,我必须生成多个 LinearAccelerometerEvent 对象,但我不知道该怎么做。我之所以要这样做是因为稍后这个RDD会像这样被推送到cassandra:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
所以理想的解决方案是这样的:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
我可以使用该foreachPartition()
函数并将 for 循环的每个事件推送到 Cassandra,但我发现这种方法要慢得多。是否可以不让用户 foreach 做我想做的事?谢谢