0

我编写了一个 Spark 结构化流应用程序,它从 Kafka 主题中读取数据并将这些消息实时写入 Oracle。一切正常,除非作业被终止并且正在写入的当前消息被破坏(即它是用疯狂的值写入的)。

在这里你可以看到我的代码的快照。有人知道我该如何解决这样的问题吗?PS。这个问题也不会引发异常。

class JDBCSink(sql_stmt: String, sparkSession: SparkSession) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
    val driver = "oracle.jdbc.driver.OracleDriver"
    val url =  "jdbc:oracle:thin:@xxx.net:1234:DB"
    val user = "usr"
    val pwd = "pwd"

    var statement2: java.sql.PreparedStatement = _
    var clob: java.sql.Clob = _
    Class.forName(driver)
    lazy val dbConnection = SingletonConnection

    def open(partitionId: Long, version: Long): Boolean = {
      val connection = dbConnection.getConnection(url, user, pwd)
      statement2 = dbConnection.getPreparedStatement(sql_stmt)
      clob = dbConnection.getClob()
      true
    }

    def process(value: org.apache.spark.sql.Row): Unit = {
      val sw = new StringWriter
      value.schema.fields.zipWithIndex.foreach {
        //((name, type, nullable), position)
        case (StructField(_, StringType, _, _), i) => statement2.setString(i + 1, value.getAs(i))
        case (StructField(_, TimestampType, _, _), i) => statement2.setTimestamp(i + 1, value.getAs[Timestamp](i))
        case (StructField(_, IntegerType, _, _), i) => statement2.setInt(i + 1, value.getAs[Int](i))
        case (StructField(_, LongType, _, _), i) => statement2.setLong(i + 1, value.getAs[Long](i))
        case (StructField(_, DateType, _, _), i) => statement2.setDate(i + 1, value.getDate(i))
        case _ => statement2
      }
      try {
        statement2.execute
      }catch{
        case i: java.sql.SQLIntegrityConstraintViolationException => {
          i.printStackTrace(new PrintWriter(sw))
          log.info(s"Duplicate key: ${value} due to ${sw.toString}")
        }
        case e: java.sql.SQLException => {
          e.printStackTrace(new PrintWriter(sw))
          log.warn(s"Could not process message: ${value} due to ${sw.toString}")
        }
        case t: Throwable => {
          t.printStackTrace(new PrintWriter(sw))
          log.error(sw.toString)
        }
      }
    }

    def close(errorOrNull: Throwable): Unit = {
    }
  }

/*... Additional code to load kafka stream ...*/

var v_sql = "INSERT INTO msgs VALUES (hextoraw(?), ?, ?, ?, ?, ?, ?, ?, ?, ?)"
val writerRawMsgs = new JDBCSink(v_sql, spark)
val query_msgs = {
  kafkaMsgsFormatted
    .writeStream
    .outputMode("append")
    .option("checkpointLocation", outputHDFSFolder)
    .foreach(writerRawMsgs)
    .start()
}
4

0 回答 0