1

我正在尝试使用 Spark Streaming 将 HDFS 中的镶木地板文件复制到 MS Sql Server。我正在为 MS SQL Server 使用 JDBC 驱动程序。我的代码是:

val spark = SparkSession.builder().master("yarn").appName("StreamAFile").getOrCreate();
val userSchema = new StructType().add("mandt","string").add("lifnr","string").add("land1","string").add("name1","string").add("name2","string");
val myDF = spark.readStream.format("parquet").schema(userSchema).load("/parquetfilepath/*");
val query = myDF.writeStream.format("jdbc").option("driver","net.sourceforge.jtds.jdbc.Driver").option("dbtable","mytable").option("user","username").option("password","password").option("checkpointLocation","/homedirectory/").start("jdbc:jtds:sqlserver://SQLServer1:1433;DatabaseName=MyDB");

我收到错误:

java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing

如果有人以前在此工作过,请提供修复。

4

2 回答 2

1

Spark of Structured Streaming 提供了一个foreach()函数,你可以使用它自己定义一个JDBCSink()此链接是在结构化流中使用 mysql 的一个很好的演示。 https://github.com/cynthia1wang/jdbcsink/blob/master/src/main/scala/DNSstat.scala

class JDBCSink() extends ForeachWriter[Row] {
    val driver = "com.mysql.jdbc.Driver"
    var connection:Connection = _
    var statement:Statement = _

    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection("jdbc:mysql://10.88.1.102:3306/aptwebservice", "root", "mysqladmin")
        statement = connection.createStatement
        true
    }
    def process(value: Row): Unit = {
        statement.executeUpdate("replace into DNSStat(ip,domain,time,count) values(" 
                                + "'" + value.getString(0) + "'" + ","//ip
                                + "'" + value.getString(1) + "'" + ","//domain
                                + "'" + value.getTimestamp(2) + "'" + "," //time
                                + value.getLong(3) //count
                                + ")") 
}
    def close(errorOrNull: Throwable): Unit = {
        connection.close
    }
}
于 2017-08-07T08:11:33.537 回答
0

正如错误所说:

数据源jdbc不支持流式写入

这在结构化流中是不可能的。使用较旧的 Spark Streaming API 可能会获得更好的结果(但我不建议采用这种方式,因为它越来越过时了)。

为什么你甚至为此使用结构化流?你为什么不写一个批处理 Spark 应用程序,即spark.readspark.write?这应该可行,并且使用 cron 您可以定期执行。

ps 我不认为我会使用 Spark 来完成这样的工作(双关语)。我认为 Oozie 或类似的东西可能更适合这个用例。恐怕没有分布式处理可以让 Spark 大放异彩。

于 2017-04-27T13:44:50.107 回答