我编写了一个 Spark 结构化流应用程序,它从 Kafka 主题中读取数据并将这些消息实时写入 Oracle。一切正常,除非作业被终止并且正在写入的当前消息被破坏(即它是用疯狂的值写入的)。
在这里你可以看到我的代码的快照。有人知道我该如何解决这样的问题吗?PS。这个问题也不会引发异常。
class JDBCSink(sql_stmt: String, sparkSession: SparkSession) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val driver = "oracle.jdbc.driver.OracleDriver"
val url = "jdbc:oracle:thin:@xxx.net:1234:DB"
val user = "usr"
val pwd = "pwd"
var statement2: java.sql.PreparedStatement = _
var clob: java.sql.Clob = _
Class.forName(driver)
lazy val dbConnection = SingletonConnection
def open(partitionId: Long, version: Long): Boolean = {
val connection = dbConnection.getConnection(url, user, pwd)
statement2 = dbConnection.getPreparedStatement(sql_stmt)
clob = dbConnection.getClob()
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
val sw = new StringWriter
value.schema.fields.zipWithIndex.foreach {
//((name, type, nullable), position)
case (StructField(_, StringType, _, _), i) => statement2.setString(i + 1, value.getAs(i))
case (StructField(_, TimestampType, _, _), i) => statement2.setTimestamp(i + 1, value.getAs[Timestamp](i))
case (StructField(_, IntegerType, _, _), i) => statement2.setInt(i + 1, value.getAs[Int](i))
case (StructField(_, LongType, _, _), i) => statement2.setLong(i + 1, value.getAs[Long](i))
case (StructField(_, DateType, _, _), i) => statement2.setDate(i + 1, value.getDate(i))
case _ => statement2
}
try {
statement2.execute
}catch{
case i: java.sql.SQLIntegrityConstraintViolationException => {
i.printStackTrace(new PrintWriter(sw))
log.info(s"Duplicate key: ${value} due to ${sw.toString}")
}
case e: java.sql.SQLException => {
e.printStackTrace(new PrintWriter(sw))
log.warn(s"Could not process message: ${value} due to ${sw.toString}")
}
case t: Throwable => {
t.printStackTrace(new PrintWriter(sw))
log.error(sw.toString)
}
}
}
def close(errorOrNull: Throwable): Unit = {
}
}
/*... Additional code to load kafka stream ...*/
var v_sql = "INSERT INTO msgs VALUES (hextoraw(?), ?, ?, ?, ?, ?, ?, ?, ?, ?)"
val writerRawMsgs = new JDBCSink(v_sql, spark)
val query_msgs = {
kafkaMsgsFormatted
.writeStream
.outputMode("append")
.option("checkpointLocation", outputHDFSFolder)
.foreach(writerRawMsgs)
.start()
}