2

谁能告诉我apache spark中是否有任何方法可以在mysql数据库上存储JavaRDD?我从 2 个 csv 文件中获取输入,然后在对其内容进行连接操作后,我需要将输出(输出 JavaRDD)保存在 mysql 数据库中。我已经能够在 hdfs 上成功保存输出,但我没有找到任何与 apache Spark-MYSQL 连接相关的信息。下面我发布 spark sql 的代码。这可以作为那些正在寻找 spark-sql 示例的人的参考。

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;


public class Spark_Mysql {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");


          JavaRDD<String> result = fs.map(new Function<Row, String>() {
              public String call(Row row) {
                return row.getString(0);
              }
            });

              result.saveAsTextFile("hdfs://path/to/hdfs/dir-name");          //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

    }



}

最后,我将结果成功保存在 HDFS 中。但现在我想保存到 MYSQL 数据库中。请帮帮我。谢谢

4

2 回答 2

4

您可以使用两种方法将结果写回数据库。一种是使用 DBOutputFormat 之类的东西并对其进行配置,另一种是在要保存的 RDD 上使用 foreachPartition 并传入一个创建与 MySQL 的连接并将结果写回的函数。

于 2014-07-23T18:32:41.127 回答
0

这是一个使用 DBOutputFormat 的示例。

创建一个代表您的表格行的类 -

public class TableRow implements DBWritable
{
    public String column1;
    public String column2;

    @Override
    public void write(PreparedStatement statement) throws SQLException
    {
        statement.setString(1, column1);
        statement.setString(2, column2);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        throw new RuntimeException("readFields not implemented");
    }
}

然后配置你的工作并编写一个 mapToPair 函数。该值似乎未使用。如果有人知道,请发表评论。

String tableName = "YourTableName";
String[] fields = new String[] { "column1", "column2" };

JobConf job = new JobConf();
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/DatabaseNameHere", "username", "password");
DBOutputFormat.setOutput(job, tableName, fields);

// map your rdd into a table row
JavaPairRDD<TableRow, Object> rows = rdd.mapToPair(...);

rows.saveAsHadoopDataset(job);
于 2015-01-20T02:19:43.927 回答