谁能告诉我apache spark中是否有任何方法可以在mysql数据库上存储JavaRDD?我从 2 个 csv 文件中获取输入,然后在对其内容进行连接操作后,我需要将输出(输出 JavaRDD)保存在 mysql 数据库中。我已经能够在 hdfs 上成功保存输出,但我没有找到任何与 apache Spark-MYSQL 连接相关的信息。下面我发布 spark sql 的代码。这可以作为那些正在寻找 spark-sql 示例的人的参考。
package attempt1;
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
public class Spark_Mysql {
@SuppressWarnings("serial")
public static class CompleteSample implements Serializable {
private String ASSETNUM;
private String ASSETTAG;
private String CALNUM;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getASSETTAG() {
return ASSETTAG;
}
public void setASSETTAG(String aSSETTAG) {
ASSETTAG = aSSETTAG;
}
public String getCALNUM() {
return CALNUM;
}
public void setCALNUM(String cALNUM) {
CALNUM = cALNUM;
}
}
@SuppressWarnings("serial")
public static class ExtendedSample implements Serializable {
private String ASSETNUM;
private String CHANGEBY;
private String CHANGEDATE;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getCHANGEBY() {
return CHANGEBY;
}
public void setCHANGEBY(String cHANGEBY) {
CHANGEBY = cHANGEBY;
}
public String getCHANGEDATE() {
return CHANGEDATE;
}
public void setCHANGEDATE(String cHANGEDATE) {
CHANGEDATE = cHANGEDATE;
}
}
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
new Function<String, CompleteSample>() {
public CompleteSample call(String line) throws Exception {
String[] parts = line.split(",");
CompleteSample cs = new CompleteSample();
cs.setASSETNUM(parts[0]);
cs.setASSETTAG(parts[1]);
cs.setCALNUM(parts[2]);
return cs;
}
});
JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
new Function<String, ExtendedSample>() {
public ExtendedSample call(String line) throws Exception {
String[] parts = line.split(",");
ExtendedSample es = new ExtendedSample();
es.setASSETNUM(parts[0]);
es.setCHANGEBY(parts[1]);
es.setCHANGEDATE(parts[2]);
return es;
}
});
JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
complete.registerAsTable("cs");
JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
extended.registerAsTable("es");
JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
JavaRDD<String> result = fs.map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0);
}
});
result.saveAsTextFile("hdfs://path/to/hdfs/dir-name"); //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection
}
}
最后,我将结果成功保存在 HDFS 中。但现在我想保存到 MYSQL 数据库中。请帮帮我。谢谢